diff --git a/config.yaml b/config.yaml index 7d0e97a54..359b51a21 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,2 @@ kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" -components_module: tests.pipeline.test_components pipeline_base_dir: tests/pipeline diff --git a/docs/docs/resources/pipeline-config/config.yaml b/docs/docs/resources/pipeline-config/config.yaml index 862a49ac0..d8c5433b7 100644 --- a/docs/docs/resources/pipeline-config/config.yaml +++ b/docs/docs/resources/pipeline-config/config.yaml @@ -1,14 +1,12 @@ # CONFIGURATION # -# Custom Python module defining project-specific KPOps components -components_module: null # Base directory to the pipelines (default is current working directory) pipeline_base_dir: . # The Kafka brokers address. # REQUIRED kafka_brokers: "http://broker1:9092,http://broker2:9092" # Configure the topic name variables you can use in the pipeline definition. -topic_name_config: +topic_name_config: # Configures the value for the variable ${output_topic_name} default_output_topic_name: ${pipeline.name}-${component.name} # Configures the value for the variable ${error_topic_name} @@ -27,9 +25,6 @@ kafka_rest: kafka_connect: # Address of Kafka Connect. url: "http://localhost:8083" -# The timeout in seconds that specifies when actions like deletion or deploy -# timeout. -timeout: 300 # Flag for `helm upgrade --install`. # Create the release namespace if not present. create_namespace: false @@ -42,7 +37,7 @@ helm_config: # Kubernetes API version used for Capabilities.APIVersions api_version: null # Configure Helm Diff. -helm_diff_config: +helm_diff_config: # Set of keys that should not be checked. ignore: - name diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index c4b4050e8..f558d4d19 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -4,9 +4,6 @@ # settings in `config.yaml`. Variables marked as required can instead # be set in the global config. # -# components_module -# Custom Python module defining project-specific KPOps components -KPOPS_COMPONENTS_MODULE # No default value, not required # pipeline_base_dir # Base directory to the pipelines (default is current working # directory) diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 171715ba5..8685acba0 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -2,7 +2,6 @@ These variables take precedence over the settings in `config.yaml`. Variables ma | Name | Default Value |Required| Description | Setting name | |--------------------------------------------------|----------------------------------------|--------|----------------------------------------------------------------------------------|-------------------------------------------| -|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module | |KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir | |KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | |KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 47aab93b1..949c42791 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -177,19 +177,6 @@ "additionalProperties": false, "description": "Global configuration for KPOps project.", "properties": { - "components_module": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Custom Python module defining project-specific KPOps components", - "title": "Components Module" - }, "create_namespace": { "default": false, "description": "Flag for `helm upgrade --install`. Create the release namespace if not present.", diff --git a/docs/docs/user/migration-guide/v6-v7.md b/docs/docs/user/migration-guide/v6-v7.md new file mode 100644 index 000000000..53c83c796 --- /dev/null +++ b/docs/docs/user/migration-guide/v6-v7.md @@ -0,0 +1,62 @@ +# Migrate from V6 to V7 + +## [Automatic loading of namespaced custom components](https://github.com/bakdata/kpops/pull/500) + +KPOps is now distributed as a Python namespace package (as defined by [PEP 420](https://peps.python.org/pep-0420/)). This allows us to standardize the namespace `kpops.components` for both builtin and custom pipeline components. + +As a result of the restructure, some imports need to be adjusted: + +**KPOps Python API** + +```diff +-import kpops ++import kpops.api as kpops +``` + +**builtin KPOps components** + +```diff +-from kpops.components import ( +- HelmApp, +- KafkaApp, +- KafkaConnector, +- KafkaSinkConnector, +- KafkaSourceConnector, +- KubernetesApp, +- StreamsBootstrap, +- ProducerApp, +- StreamsApp, +- PipelineComponent, +- StreamsApp, +- ProducerApp, +-) ++from kpops.components.base_components import ( ++ HelmApp, ++ KafkaApp, ++ KafkaConnector, ++ KafkaSinkConnector, ++ KafkaSourceConnector, ++ KubernetesApp, ++ PipelineComponent, ++) ++from kpops.components.streams_bootstrap import ( ++ StreamsBootstrap, ++ StreamsApp, ++ ProducerApp, ++) +``` + +### your custom KPOps components + +#### config.yaml + +```diff +-components_module: components +``` + +#### Python module + +```diff +-components/__init__.py ++kpops/components/custom/__init__.py +``` diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 570563069..0a187f139 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -215,15 +215,16 @@ $ kpops schema [OPTIONS] SCOPE:{pipeline|defaults|config} - pipeline: Schema of PipelineComponents. Includes the built-in KPOps components by default. To include custom components, provide components module in config. + - pipeline: Schema of PipelineComponents for KPOps pipeline.yaml + - defaults: Schema of PipelineComponents for KPOps defaults.yaml + + - config: Schema of KpopsConfig. [required] + - config: Schema for KPOps config.yaml [required] **Options**: -* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--include-stock-components / --no-include-stock-components`: Include the built-in KPOps components. [default: include-stock-components] * `--help`: Show this message and exit. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0cadd93c5..0703f915e 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -130,6 +130,7 @@ nav: - Migrate from v3 to v4: user/migration-guide/v3-v4.md - Migrate from v4 to v5: user/migration-guide/v4-v5.md - Migrate from v5 to v6: user/migration-guide/v5-v6.md + - Migrate from v6 to v7: user/migration-guide/v6-v7.md - CLI usage: user/references/cli-commands.md - Editor integration: user/references/editor-integration.md - CI integration: diff --git a/hooks/gen_docs/gen_docs_components.py b/hooks/gen_docs/gen_docs_components.py index 10bb40af7..e5e03062d 100644 --- a/hooks/gen_docs/gen_docs_components.py +++ b/hooks/gen_docs/gen_docs_components.py @@ -8,12 +8,18 @@ import yaml from hooks import ROOT -from kpops.api.registry import _find_classes -from kpops.components import KafkaConnector, PipelineComponent +from kpops.api.registry import Registry +from kpops.components.base_components.kafka_connector import KafkaConnector +from kpops.components.base_components.pipeline_component import ( + PipelineComponent, +) from kpops.utils.colorify import redify, yellowify from kpops.utils.pydantic import issubclass_patched from kpops.utils.yaml import load_yaml_file +registry = Registry() +registry.discover_components() + PATH_KPOPS_MAIN = ROOT / "kpops/cli/main.py" PATH_CLI_COMMANDS_DOC = ROOT / "docs/docs/user/references/cli-commands.md" PATH_DOCS_RESOURCES = ROOT / "docs/docs/resources" @@ -33,7 +39,7 @@ (PATH_DOCS_RESOURCES / "pipeline-defaults/headers").iterdir(), ) -KPOPS_COMPONENTS = tuple(_find_classes("kpops.components", PipelineComponent)) +KPOPS_COMPONENTS = tuple(registry.components) KPOPS_COMPONENTS_SECTIONS = { component.type: [ field_name diff --git a/kpops/__init__.py b/kpops/__init__.py deleted file mode 100644 index 22aebfa50..000000000 --- a/kpops/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -__version__ = "6.0.2" - -# export public API functions -from kpops.api import clean, deploy, destroy, generate, init, manifest, reset - -__all__ = ( - "generate", - "manifest", - "deploy", - "destroy", - "reset", - "clean", - "init", -) diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 3073d9452..783fb3d57 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -4,7 +4,6 @@ from pathlib import Path from typing import TYPE_CHECKING -import kpops from kpops.api.logs import log, log_action from kpops.api.options import FilterType from kpops.api.registry import Registry @@ -23,8 +22,8 @@ from kpops.utils.cli_commands import init_project if TYPE_CHECKING: - from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource + from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.config import KpopsConfig @@ -90,7 +89,7 @@ def manifest( :param verbose: Enable verbose printing. :return: Resources. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -129,7 +128,7 @@ def deploy( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -177,7 +176,7 @@ def destroy( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -227,7 +226,7 @@ def reset( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -276,7 +275,7 @@ def clean( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -333,9 +332,7 @@ def _create_pipeline( :return: Created `Pipeline` object. """ registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") + registry.discover_components() handlers = _setup_handlers(kpops_config) parser = PipelineGenerator(kpops_config, registry, handlers) diff --git a/kpops/api/file_type.py b/kpops/api/file_type.py index c08fce987..3e170be96 100644 --- a/kpops/api/file_type.py +++ b/kpops/api/file_type.py @@ -33,3 +33,8 @@ def as_yaml_file(self, prefix: str = "", suffix: str = "") -> str: 'pre_pipeline_suf.yaml' """ return prefix + self.value + suffix + FILE_EXTENSION + + +PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file() +CONFIG_YAML = KpopsFileType.CONFIG.as_yaml_file() diff --git a/kpops/api/logs.py b/kpops/api/logs.py index e9a833aba..979b5a36c 100644 --- a/kpops/api/logs.py +++ b/kpops/api/logs.py @@ -6,7 +6,7 @@ import typer if TYPE_CHECKING: - from kpops.components import PipelineComponent + from kpops.components.base_components.pipeline_component import PipelineComponent class CustomFormatter(logging.Formatter): diff --git a/kpops/api/options.py b/kpops/api/options.py index dc116bd35..22fda2542 100644 --- a/kpops/api/options.py +++ b/kpops/api/options.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from kpops.components import PipelineComponent + from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.pipeline import ComponentFilterPredicate diff --git a/kpops/api/registry.py b/kpops/api/registry.py index 2df483329..37c8e5f9c 100644 --- a/kpops/api/registry.py +++ b/kpops/api/registry.py @@ -3,22 +3,26 @@ import importlib import inspect import logging +import pkgutil import sys +from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path +from types import ModuleType from typing import TYPE_CHECKING, TypeVar -from kpops import __name__ +import typer + from kpops.api.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.const import KPOPS_MODULE if TYPE_CHECKING: from collections.abc import Iterator -KPOPS_MODULE = __name__ + "." -T = TypeVar("T") -ClassDict = dict[str, type[T]] # type -> class +_PluginT = TypeVar("_PluginT") +ClassDict = dict[str, type[_PluginT]] # type -> class sys.path.append(str(Path.cwd())) log = logging.getLogger("Registry") @@ -30,12 +34,17 @@ class Registry: _classes: ClassDict[PipelineComponent] = field(default_factory=dict, init=False) - def find_components(self, module_name: str) -> None: - """Find all PipelineComponent subclasses in module. + @property + def components(self) -> Iterator[type[PipelineComponent]]: + yield from self._classes.values() + + def discover_components(self) -> None: + """Discover first- and third-party KPOps components. - :param module_name: name of the python module. + That is all classes inheriting from PipelineComponent. """ - for _class in _find_classes(module_name, PipelineComponent): + custom_modules = self.iter_component_modules() + for _class in _find_classes(custom_modules, base=PipelineComponent): self._classes[_class.type] = _class def __getitem__(self, component_type: str) -> type[PipelineComponent]: @@ -45,32 +54,50 @@ def __getitem__(self, component_type: str) -> type[PipelineComponent]: msg = f"Could not find a component of type {component_type}" raise ClassNotFoundError(msg) from ke + @staticmethod + def iter_component_modules() -> Iterator[ModuleType]: + import kpops.components + + yield kpops.components + yield from _iter_namespace(kpops.components) -def find_class(module_name: str, baseclass: type[T]) -> type[T]: + +def find_class(modules: Iterable[ModuleType], base: type[_PluginT]) -> type[_PluginT]: try: - return next(_find_classes(module_name, baseclass)) + return next(_find_classes(modules, base=base)) except StopIteration as e: raise ClassNotFoundError from e -def _find_classes(module_name: str, baseclass: type[T]) -> Iterator[type[T]]: +def import_module(module_name: str) -> ModuleType: module = importlib.import_module(module_name) - if module.__file__ and not module_name.startswith(KPOPS_MODULE): - file_path = Path(module.__file__) - try: - rel_path = file_path.relative_to(Path.cwd()) - log.debug(f"Picked up: {rel_path}") - except ValueError: - log.debug(f"Picked up: {file_path}") - for _, _class in inspect.getmembers(module, inspect.isclass): - if not __filter_internal_kpops_classes( - _class.__module__, module_name - ) and issubclass(_class, baseclass): - yield _class + if module.__file__: + log.debug( + f"Loading {typer.style(module.__name__,bold=True)} ({module.__file__})" + ) + return module + + +def _find_classes( + modules: Iterable[ModuleType], base: type[_PluginT] +) -> Iterator[type[_PluginT]]: + for module in modules: + for _, _class in inspect.getmembers(module, inspect.isclass): + if not __filter_internal_kpops_classes( + _class.__module__, module.__name__ + ) and issubclass(_class, base): + yield _class def __filter_internal_kpops_classes(class_module: str, module_name: str) -> bool: - # filter out internal kpops classes and components unless specifically requested + """Filter out internal kpops classes and components unless specifically requested.""" return class_module.startswith(KPOPS_MODULE) and not module_name.startswith( KPOPS_MODULE ) + + +def _iter_namespace(ns_pkg: ModuleType) -> Iterator[ModuleType]: + for _, module_name, _ in pkgutil.iter_modules( + ns_pkg.__path__, ns_pkg.__name__ + "." + ): + yield import_module(module_name) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 395ab8e53..16d379456 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -5,12 +5,14 @@ import typer -import kpops -from kpops import __version__ -from kpops.api.file_type import KpopsFileType +import kpops.api as kpops +from kpops.api.file_type import CONFIG_YAML, DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType from kpops.api.options import FilterType -from kpops.cli.utils import collect_pipeline_paths -from kpops.config import ENV_PREFIX, KpopsConfig +from kpops.cli.utils import ( + collect_pipeline_paths, +) +from kpops.config import ENV_PREFIX +from kpops.const import KPOPS, __version__ from kpops.utils.gen_schema import ( gen_config_schema, gen_defaults_schema, @@ -129,29 +131,21 @@ def schema( scope: KpopsFileType = typer.Argument( ..., show_default=False, - help=""" + help=f""" Scope of the generated schema \n\n\n - pipeline: Schema of PipelineComponents. Includes the built-in KPOps components by default. To include custom components, provide components module in config. - \n\n\n - config: Schema of KpopsConfig.""", - ), - config: Path = CONFIG_PATH_OPTION, - include_stock_components: bool = typer.Option( - default=True, help="Include the built-in KPOps components." + - {KpopsFileType.PIPELINE.value}: Schema of PipelineComponents for KPOps {PIPELINE_YAML} + \n\n + - {KpopsFileType.DEFAULTS.value}: Schema of PipelineComponents for KPOps {DEFAULTS_YAML} + \n\n + - {KpopsFileType.CONFIG.value}: Schema for KPOps {CONFIG_YAML}""", ), ) -> None: match scope: case KpopsFileType.PIPELINE: - kpops_config = KpopsConfig.create(config) - gen_pipeline_schema( - kpops_config.components_module, include_stock_components - ) + gen_pipeline_schema() case KpopsFileType.DEFAULTS: - kpops_config = KpopsConfig.create(config) - gen_defaults_schema( - kpops_config.components_module, include_stock_components - ) + gen_defaults_schema() case KpopsFileType.CONFIG: gen_config_schema() @@ -316,7 +310,7 @@ def clean( def version_callback(show_version: bool) -> None: if show_version: - typer.echo(f"KPOps {__version__}") + typer.echo(f"{KPOPS} {__version__}") raise typer.Exit diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index f4a04bcbb..16a8af563 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -3,9 +3,7 @@ from collections.abc import Iterable, Iterator from pathlib import Path -from kpops.api.file_type import KpopsFileType - -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +from kpops.api.file_type import PIPELINE_YAML def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]: diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 94b5cd3bf..c23bea4a3 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -9,7 +9,7 @@ from schema_registry.client.schema import AvroSchema from kpops.api.exception import ClassNotFoundError -from kpops.api.registry import find_class +from kpops.api.registry import Registry, find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, @@ -29,18 +29,16 @@ def __init__(self, kpops_config: KpopsConfig) -> None: str(kpops_config.schema_registry.url), timeout=kpops_config.schema_registry.timeout, # pyright: ignore[reportArgumentType] ) - self.components_module = kpops_config.components_module @cached_property def schema_provider(self) -> SchemaProvider: try: - if not self.components_module: - msg = f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." - raise ValueError(msg) - schema_provider_class = find_class(self.components_module, SchemaProvider) + schema_provider_class = find_class( + Registry.iter_component_modules(), base=SchemaProvider + ) return schema_provider_class() # pyright: ignore[reportAbstractUsage] except ClassNotFoundError as e: - msg = f"No schema provider found in components module {self.components_module}. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." + msg = f"No schema provider found. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." raise ValueError(msg) from e @classmethod diff --git a/kpops/components/__init__.py b/kpops/components/__init__.py deleted file mode 100644 index 3800b16d5..000000000 --- a/kpops/components/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -from kpops.components.base_components.helm_app import HelmApp -from kpops.components.base_components.kafka_app import KafkaApp -from kpops.components.base_components.kafka_connector import ( - KafkaConnector, - KafkaSinkConnector, - KafkaSourceConnector, -) -from kpops.components.base_components.kubernetes_app import KubernetesApp -from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.components.streams_bootstrap import StreamsBootstrap -from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp -from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp - -__all__ = ( - "HelmApp", - "KafkaApp", - "KafkaConnector", - "KafkaSinkConnector", - "KafkaSourceConnector", - "KubernetesApp", - "StreamsBootstrap", - "ProducerApp", - "StreamsApp", - "PipelineComponent", - "StreamsApp", - "ProducerApp", -) diff --git a/kpops/components/base_components/__init__.py b/kpops/components/base_components/__init__.py index e69de29bb..ff94dde1f 100644 --- a/kpops/components/base_components/__init__.py +++ b/kpops/components/base_components/__init__.py @@ -0,0 +1,19 @@ +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_connector import ( + KafkaConnector, + KafkaSinkConnector, + KafkaSourceConnector, +) +from kpops.components.base_components.kubernetes_app import KubernetesApp +from kpops.components.base_components.pipeline_component import PipelineComponent + +__all__ = ( + "HelmApp", + "KafkaApp", + "KafkaConnector", + "KafkaSinkConnector", + "KafkaSourceConnector", + "KubernetesApp", + "PipelineComponent", +) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index f52ca6e78..0ab4806b9 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -13,7 +13,7 @@ from kpops.components.base_components.helm_app import HelmAppValues from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/tests/cli/resources/empty_module/__init__.py b/kpops/components/common/__init__.py similarity index 100% rename from tests/cli/resources/empty_module/__init__.py rename to kpops/components/common/__init__.py diff --git a/kpops/components/common/streams_bootstrap.py b/kpops/components/common/streams_bootstrap.py new file mode 100644 index 000000000..ba45dbd30 --- /dev/null +++ b/kpops/components/common/streams_bootstrap.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import logging +from abc import ABC +from typing import TYPE_CHECKING + +import pydantic +from pydantic import Field + +from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppValues +from kpops.utils.docstring import describe_attr + +if TYPE_CHECKING: + try: + from typing import Self # pyright: ignore[reportAttributeAccessIssue] + except ImportError: + from typing_extensions import Self + +STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", +) +STREAMS_BOOTSTRAP_VERSION = "2.9.0" + +# Source of the pattern: https://kubernetes.io/docs/concepts/containers/images/#image-names +IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$" + +log = logging.getLogger("StreamsBootstrap") + + +class StreamsBootstrapValues(HelmAppValues): + """Base value class for all streams bootstrap related components. + + :param image_tag: Docker image tag of the streams-bootstrap app. + """ + + image_tag: str = Field( + default="latest", + pattern=IMAGE_TAG_PATTERN, + description=describe_attr("image_tag", __doc__), + ) + + +class StreamsBootstrap(HelmApp, ABC): + """Base for components with a streams-bootstrap Helm chart. + + :param app: streams-bootstrap app values + :param repo_config: Configuration of the Helm chart repo to be used for + deploying the component, defaults to streams-bootstrap Helm repo + :param version: Helm chart version, defaults to "2.9.0" + """ + + app: StreamsBootstrapValues = Field( + default_factory=StreamsBootstrapValues, + description=describe_attr("app", __doc__), + ) + + repo_config: HelmRepoConfig = Field( + default=STREAMS_BOOTSTRAP_HELM_REPO, + description=describe_attr("repo_config", __doc__), + ) + version: str | None = Field( + default=STREAMS_BOOTSTRAP_VERSION, + description=describe_attr("version", __doc__), + ) + + @pydantic.model_validator(mode="after") + def warning_for_latest_image_tag(self) -> Self: + if self.validate_ and self.app.image_tag == "latest": + log.warning( + f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag." + ) + return self diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index c6c329b85..b4eb34b2f 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -1,74 +1,9 @@ -from __future__ import annotations - -import logging -from abc import ABC -from typing import TYPE_CHECKING - -import pydantic -from pydantic import Field - -from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig -from kpops.components.base_components.helm_app import HelmApp, HelmAppValues -from kpops.utils.docstring import describe_attr - -if TYPE_CHECKING: - try: - from typing import Self # pyright: ignore[reportAttributeAccessIssue] - except ImportError: - from typing_extensions import Self - -STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", +from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp + +__all__ = ( + "StreamsBootstrap", + "StreamsApp", + "ProducerApp", ) -STREAMS_BOOTSTRAP_VERSION = "2.9.0" - -log = logging.getLogger("StreamsBootstrap") - -# Source of the pattern: https://kubernetes.io/docs/concepts/containers/images/#image-names -IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$" - - -class StreamsBootstrapValues(HelmAppValues): - """Base value class for all streams bootstrap related components. - - :param image_tag: Docker image tag of the streams-bootstrap app. - """ - - image_tag: str = Field( - default="latest", - pattern=IMAGE_TAG_PATTERN, - description=describe_attr("image_tag", __doc__), - ) - - -class StreamsBootstrap(HelmApp, ABC): - """Base for components with a streams-bootstrap Helm chart. - - :param app: streams-bootstrap app values - :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, defaults to streams-bootstrap Helm repo - :param version: Helm chart version, defaults to "2.9.0" - """ - - app: StreamsBootstrapValues = Field( - default_factory=StreamsBootstrapValues, - description=describe_attr("app", __doc__), - ) - - repo_config: HelmRepoConfig = Field( - default=STREAMS_BOOTSTRAP_HELM_REPO, - description=describe_attr("repo_config", __doc__), - ) - version: str | None = Field( - default=STREAMS_BOOTSTRAP_VERSION, - description=describe_attr("version", __doc__), - ) - - @pydantic.model_validator(mode="after") - def warning_for_latest_image_tag(self) -> Self: - if self.validate_ and self.app.image_tag == "latest": - log.warning( - f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag." - ) - return self diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 2dc3b5927..1cbdf495c 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -4,7 +4,7 @@ KafkaAppValues, KafkaStreamsConfig, ) -from kpops.components.streams_bootstrap import StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import StreamsBootstrapValues from kpops.utils.docstring import describe_attr diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index a9c06f37e..9674dd7da 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -12,7 +12,7 @@ OutputTopicTypes, TopicConfig, ) -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.producer.model import ProducerAppValues from kpops.utils.docstring import describe_attr diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 1bffb84c6..675978396 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -11,7 +11,7 @@ KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.components.streams_bootstrap import StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import StreamsBootstrapValues from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 9bd5d87c5..979c3c4c9 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -5,13 +5,10 @@ from typing_extensions import override from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler -from kpops.components import HelmApp -from kpops.components.base_components.kafka_app import ( - KafkaApp, - KafkaAppCleaner, -) +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner from kpops.components.base_components.models.topic import KafkaTopic -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import ( StreamsAppValues, diff --git a/kpops/config.py b/kpops/config/__init__.py similarity index 97% rename from kpops/config.py rename to kpops/config/__init__.py index 7efbbf0b7..5a9f25a3e 100644 --- a/kpops/config.py +++ b/kpops/config/__init__.py @@ -74,10 +74,6 @@ class KafkaConnectConfig(BaseSettings): class KpopsConfig(BaseSettings): """Global configuration for KPOps project.""" - components_module: str | None = Field( - default=None, - description="Custom Python module defining project-specific KPOps components", - ) pipeline_base_dir: Path = Field( default=Path(), description="Base directory to the pipelines (default is current working directory)", diff --git a/kpops/const/__init__.py b/kpops/const/__init__.py new file mode 100644 index 000000000..c55d7127f --- /dev/null +++ b/kpops/const/__init__.py @@ -0,0 +1,3 @@ +__version__ = "6.0.1" +KPOPS = "KPOps" +KPOPS_MODULE = "kpops." diff --git a/kpops/pipeline.py b/kpops/pipeline/__init__.py similarity index 100% rename from kpops/pipeline.py rename to kpops/pipeline/__init__.py diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index 3b4ce5ad3..539e2cf3a 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -20,8 +20,8 @@ ModelFieldsSchema, ) -from kpops.api.registry import _find_classes -from kpops.components import ( +from kpops.api.registry import Registry +from kpops.components.base_components.pipeline_component import ( PipelineComponent, ) from kpops.config import KpopsConfig @@ -32,6 +32,10 @@ class MultiComponentGenerateJsonSchema(GenerateJsonSchema): ... log = logging.getLogger("") +registry = Registry() +registry.discover_components() +COMPONENTS = tuple(registry.components) + def print_schema(model: type[BaseModel]) -> None: schema = model_json_schema(model, by_alias=True) @@ -39,14 +43,13 @@ def print_schema(model: type[BaseModel]) -> None: def _is_valid_component( - defined_component_types: set[str], component: type[PipelineComponent], allow_abstract: bool, ) -> bool: """Check whether a PipelineComponent subclass has a valid definition for the schema generation. - :param defined_component_types: types defined so far :param component: component type to be validated + :param allow_abstract: whether to include abstract components marked as ABC :return: Whether component is valid for schema generation """ if not allow_abstract and ( @@ -54,85 +57,25 @@ def _is_valid_component( ): log.warning(f"SKIPPED {component.__name__}, component is abstract.") return False - if component.type in defined_component_types: - log.warning(f"SKIPPED {component.__name__}, component type must be unique.") - return False - defined_component_types.add(component.type) return True -def _add_components( - components_module: str, - allow_abstract: bool, - components: tuple[type[PipelineComponent], ...] | None = None, -) -> tuple[type[PipelineComponent], ...]: - """Add components to a components tuple. - - If an empty tuple is provided or it is not provided at all, the components - types from the given module are 'tupled' - - :param components_module: Python module. Only the classes that inherit from - PipelineComponent will be considered. - :param components: Tuple of components to which to add, defaults to () - :return: Extended tuple - """ - if components is None: - components = () - # Set of existing types, against which to check the new ones - defined_component_types = {component.type for component in components} - custom_components = ( - component - for component in _find_classes(components_module, PipelineComponent) - if _is_valid_component(defined_component_types, component, allow_abstract) - ) - components += tuple(custom_components) - return components - - -def find_components( - components_module: str | None, - include_stock_components: bool, - include_abstract: bool = False, -) -> tuple[type[PipelineComponent], ...]: - if not (include_stock_components or components_module): - msg = "No components are provided, no schema is generated." - raise RuntimeError(msg) - # Add stock components if enabled - components: tuple[type[PipelineComponent], ...] = () - if include_stock_components: - components = _add_components("kpops.components", include_abstract) - # Add custom components if provided - if components_module: - components = _add_components(components_module, include_abstract, components) - if not components: - msg = "No valid components found." - raise RuntimeError(msg) - return components - - -def gen_pipeline_schema( - components_module: str | None = None, include_stock_components: bool = True -) -> None: - """Generate a json schema from the models of pipeline components. - - :param components_module: Python module. Only the classes that inherit from - PipelineComponent will be considered., defaults to None - :param include_stock_components: Whether to include the stock components, - defaults to True - """ - components = find_components(components_module, include_stock_components) - +def gen_pipeline_schema() -> None: + """Generate a JSON schema from the models of pipeline components.""" + components = [ + component for component in COMPONENTS if _is_valid_component(component, False) + ] # re-assign component type as Literal to work as discriminator for component in components: component.model_fields["type"] = FieldInfo( - annotation=Literal[component.type], # type:ignore[valid-type] + annotation=Literal[component.type], # type: ignore[valid-type] default=component.type, ) - core_schema: DefinitionsSchema = component.__pydantic_core_schema__ # pyright:ignore[reportAssignmentType] + core_schema: DefinitionsSchema = component.__pydantic_core_schema__ # pyright: ignore[reportAssignmentType] schema = core_schema while "schema" in schema: schema = schema["schema"] - model_schema: ModelFieldsSchema = schema # pyright:ignore[reportAssignmentType] + model_schema: ModelFieldsSchema = schema # pyright: ignore[reportAssignmentType] model_schema["fields"]["type"] = ModelField( type="model-field", schema=LiteralSchema( @@ -141,23 +84,23 @@ def gen_pipeline_schema( ), ) - PipelineComponents = Union[components] # type: ignore[valid-type] + PipelineComponents = Union[tuple(components)] # pyright: ignore[reportInvalidTypeArguments,reportGeneralTypeIssues] AnnotatedPipelineComponents = Annotated[ PipelineComponents, Field(discriminator="type") ] class PipelineSchema(RootModel): root: Sequence[ - AnnotatedPipelineComponents # pyright:ignore[reportInvalidTypeForm] + AnnotatedPipelineComponents # pyright: ignore[reportInvalidTypeForm] ] print_schema(PipelineSchema) -def gen_defaults_schema( - components_module: str | None = None, include_stock_components: bool = True -) -> None: - components = find_components(components_module, include_stock_components, True) +def gen_defaults_schema() -> None: + components = [ + component for component in COMPONENTS if _is_valid_component(component, True) + ] components_mapping: dict[str, Any] = { component.type: (component, ...) for component in components } diff --git a/pyproject.toml b/pyproject.toml index f84f7a43e..cd831c71b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ mkdocs-exclude-search = "^0.6.5" mike = "^1.1.2" mkdocstrings = { extras = ["python"], version = "^0.25.1" } -[tool.poetry_bumpversion.file."kpops/__init__.py"] +[tool.poetry_bumpversion.file."kpops/const/__init__.py"] [tool.pyright] reportUnknownParameterType = "warning" diff --git a/tests/api/test_handlers.py b/tests/api/test_handlers.py index c3a92784d..beb2b9dcc 100644 --- a/tests/api/test_handlers.py +++ b/tests/api/test_handlers.py @@ -16,10 +16,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): - config = KpopsConfig( - kafka_brokers="broker:9092", - components_module=MODULE, - ) + config = KpopsConfig(kafka_brokers="broker:9092") connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler diff --git a/tests/api/test_registry.py b/tests/api/test_registry.py index 00daee08d..ed496e0c8 100644 --- a/tests/api/test_registry.py +++ b/tests/api/test_registry.py @@ -1,11 +1,26 @@ from __future__ import annotations +import importlib +from pathlib import Path +from types import ModuleType + import pytest from kpops.api.exception import ClassNotFoundError -from kpops.api.registry import Registry, _find_classes, find_class +from kpops.api.registry import Registry, _find_classes, _iter_namespace, find_class from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_connector import ( + KafkaConnector, + KafkaSinkConnector, + KafkaSourceConnector, +) +from kpops.components.base_components.kubernetes_app import KubernetesApp from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp from tests.cli.resources.custom_module import CustomSchemaProvider @@ -22,51 +37,72 @@ class Unrelated: MODULE = SubComponent.__module__ -def test_find_classes(): - gen = _find_classes(MODULE, PipelineComponent) - assert next(gen) is SubComponent - assert next(gen) is SubSubComponent - with pytest.raises(StopIteration): - next(gen) +def test_namespace(): + """Ensure namespace package according to PEP 420.""" + assert not Path("kpops/__init__.py").exists() + assert not Path("kpops/components/__init__.py").exists() -def test_find_builtin_classes(): - components = [ - class_.__name__ - for class_ in _find_classes("kpops.components", PipelineComponent) +@pytest.mark.usefixtures("custom_components") +def test_iter_namespace(): + components_module = importlib.import_module("kpops.components") + assert [module.__name__ for module in _iter_namespace(components_module)] == [ + "kpops.components.base_components", + "kpops.components.common", + "kpops.components.streams_bootstrap", + "kpops.components.test_components", ] - assert len(components) == 10 - assert components == [ - "HelmApp", - "KafkaApp", - "KafkaConnector", - "KafkaSinkConnector", - "KafkaSourceConnector", - "KubernetesApp", - "PipelineComponent", - "ProducerApp", - "StreamsApp", - "StreamsBootstrap", + + +@pytest.mark.usefixtures("custom_components") +def test_iter_component_modules(): + assert [module.__name__ for module in Registry.iter_component_modules()] == [ + "kpops.components", + "kpops.components.base_components", + "kpops.components.common", + "kpops.components.streams_bootstrap", + "kpops.components.test_components", ] -def test_find_class(): - assert find_class(MODULE, SubComponent) is SubComponent - assert find_class(MODULE, PipelineComponent) is SubComponent - assert find_class(MODULE, SchemaProvider) is CustomSchemaProvider +@pytest.fixture() +def module() -> ModuleType: + return importlib.import_module(MODULE) + + +def test_find_classes(module: ModuleType): + gen = _find_classes([module], PipelineComponent) + assert next(gen) is SubComponent + assert next(gen) is SubSubComponent + with pytest.raises(StopIteration): + next(gen) + + +def test_find_class(module: ModuleType): + assert find_class([module], base=SubComponent) is SubComponent + assert find_class([module], base=PipelineComponent) is SubComponent + assert find_class([module], base=SchemaProvider) is CustomSchemaProvider with pytest.raises(ClassNotFoundError): - find_class(MODULE, dict) + find_class([module], base=dict) def test_registry(): registry = Registry() assert registry._classes == {} - registry.find_components(MODULE) + registry.discover_components() assert registry._classes == { - "sub-component": SubComponent, - "sub-sub-component": SubSubComponent, + "helm-app": HelmApp, + "kafka-app": KafkaApp, + "kafka-connector": KafkaConnector, + "kafka-sink-connector": KafkaSinkConnector, + "kafka-source-connector": KafkaSourceConnector, + "kubernetes-app": KubernetesApp, + "pipeline-component": PipelineComponent, + "producer-app": ProducerApp, + "streams-app": StreamsApp, + "streams-bootstrap": StreamsBootstrap, } - assert registry["sub-component"] is SubComponent - assert registry["sub-sub-component"] is SubSubComponent + for _type, _class in registry._classes.items(): + assert registry[_type] is _class with pytest.raises(ClassNotFoundError): registry["doesnt-exist"] diff --git a/tests/cli/resources/config.yaml b/tests/cli/resources/config.yaml index 046c98d2a..79261856b 100644 --- a/tests/cli/resources/config.yaml +++ b/tests/cli/resources/config.yaml @@ -1,2 +1 @@ kafka_brokers: http://127.0.0.1:9092 -components_module: tests.cli.test_schema_generation diff --git a/tests/cli/resources/empty_module/config.yaml b/tests/cli/resources/empty_module/config.yaml deleted file mode 100644 index 735b3904a..000000000 --- a/tests/cli/resources/empty_module/config.yaml +++ /dev/null @@ -1,2 +0,0 @@ -kafka_brokers: http://127.0.0.1:9092 -components_module: tests.cli.resources.empty_module diff --git a/tests/cli/resources/no_module/config.yaml b/tests/cli/resources/no_module/config.yaml deleted file mode 100644 index 79261856b..000000000 --- a/tests/cli/resources/no_module/config.yaml +++ /dev/null @@ -1 +0,0 @@ -kafka_brokers: http://127.0.0.1:9092 diff --git a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml index 5c251a22c..3c86a269a 100644 --- a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml +++ b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml @@ -4,7 +4,6 @@ kafka_brokers: null # Non-required fields -components_module: null create_namespace: false helm_config: api_version: null diff --git a/tests/cli/test_init.py b/tests/cli/test_init.py index 8d4790d23..3109d16fc 100644 --- a/tests/cli/test_init.py +++ b/tests/cli/test_init.py @@ -3,7 +3,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.cli.main import app from kpops.utils.cli_commands import create_config diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 0741340e0..81f975b4c 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -3,19 +3,15 @@ import json from abc import ABC, abstractmethod from pathlib import Path -from typing import TYPE_CHECKING import pytest from pydantic import ConfigDict, Field from typer.testing import CliRunner -from kpops.api.registry import Registry from kpops.cli.main import app -from kpops.components import PipelineComponent +from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.docstring import describe_attr - -if TYPE_CHECKING: - from pytest_snapshot.plugin import Snapshot +from kpops.utils.gen_schema import COMPONENTS RESOURCE_PATH = Path(__file__).parent / "resources" @@ -80,92 +76,6 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): "ignore:handlers", "ignore:config", "ignore:enrich", "ignore:validate" ) class TestGenSchema: - @pytest.fixture - def stock_components(self) -> list[type[PipelineComponent]]: - registry = Registry() - registry.find_components("kpops.components") - return list(registry._classes.values()) - - def test_gen_pipeline_schema_no_modules(self): - with pytest.raises( - RuntimeError, match="^No components are provided, no schema is generated.$" - ): - runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH / "no_module"), - ], - catch_exceptions=False, - ) - - def test_gen_pipeline_schema_no_components(self): - with pytest.raises(RuntimeError, match="^No valid components found.$"): - runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH / "empty_module"), - ], - catch_exceptions=False, - ) - - def test_gen_pipeline_schema_only_stock_module(self): - result = runner.invoke( - app, - [ - "schema", - "pipeline", - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - assert result.stdout - - result = runner.invoke( - app, - [ - "schema", - "pipeline", - "--include-stock-components", - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - assert result.stdout - - def test_gen_pipeline_schema_only_custom_module( - self, snapshot: Snapshot, stock_components: list[type[PipelineComponent]] - ): - result = runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH), - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - - snapshot.assert_match(result.stdout, "schema.json") - schema = json.loads(result.stdout) - assert schema["title"] == "PipelineSchema" - assert set(schema["items"]["discriminator"]["mapping"].keys()).isdisjoint( - {component.type for component in stock_components} - ) - def test_gen_pipeline_schema_stock_and_custom_module(self): result = runner.invoke( app, @@ -179,14 +89,12 @@ def test_gen_pipeline_schema_stock_and_custom_module(self): assert result.exit_code == 0, result.stdout assert result.stdout - def test_gen_defaults_schema(self, stock_components: list[type[PipelineComponent]]): + def test_gen_defaults_schema(self): result = runner.invoke( app, [ "schema", "defaults", - "--config", - str(RESOURCE_PATH / "no_module"), ], catch_exceptions=False, ) @@ -195,7 +103,7 @@ def test_gen_defaults_schema(self, stock_components: list[type[PipelineComponent assert result.stdout schema = json.loads(result.stdout) assert schema["title"] == "DefaultsSchema" - assert schema["required"] == [component.type for component in stock_components] + assert schema["required"] == [component.type for component in COMPONENTS] def test_gen_config_schema(self): result = runner.invoke( diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index 8d4052e54..716f2f482 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from pydantic import AnyHttpUrl, BaseModel, TypeAdapter +from pydantic import AnyHttpUrl, TypeAdapter from pytest_mock import MockerFixture from schema_registry.client.schema import AvroSchema from schema_registry.client.utils import SchemaVersion @@ -20,10 +20,6 @@ from kpops.utils.colorify import greenify, magentaify, yellowify from tests.pipeline.test_components import TestSchemaProvider -NON_EXISTING_PROVIDER_MODULE = BaseModel.__module__ -TEST_SCHEMA_PROVIDER_MODULE = TestSchemaProvider.__module__ - - log = logging.getLogger("SchemaHandler") @@ -48,13 +44,6 @@ def log_warning_mock(mocker: MockerFixture) -> MagicMock: ) -@pytest.fixture(autouse=False) -def find_class_mock(mocker: MockerFixture) -> MagicMock: - return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.find_class" - ) - - @pytest.fixture(autouse=True) def schema_registry_mock(mocker: MockerFixture) -> AsyncMock: schema_registry_mock_constructor = mocker.patch( @@ -87,15 +76,11 @@ def kpops_config() -> KpopsConfig: enabled=True, url=TypeAdapter(AnyHttpUrl).validate_python("http://mock:8081"), # pyright: ignore[reportCallIssue,reportArgumentType] ), - components_module=TEST_SCHEMA_PROVIDER_MODULE, ) def test_load_schema_handler(kpops_config: KpopsConfig): - assert isinstance( - SchemaHandler.load_schema_handler(kpops_config), - SchemaHandler, - ) + assert isinstance(SchemaHandler.load_schema_handler(kpops_config), SchemaHandler) config_disable = kpops_config.model_copy() config_disable.schema_registry = SchemaRegistryConfig(enabled=False) @@ -103,9 +88,8 @@ def test_load_schema_handler(kpops_config: KpopsConfig): assert SchemaHandler.load_schema_handler(config_disable) is None -def test_should_lazy_load_schema_provider( - find_class_mock: MagicMock, kpops_config: KpopsConfig -): +@pytest.mark.usefixtures("custom_components") +def test_should_lazy_load_schema_provider(kpops_config: KpopsConfig): schema_handler = SchemaHandler.load_schema_handler(kpops_config) assert schema_handler is not None @@ -117,18 +101,17 @@ def test_should_lazy_load_schema_provider( "com.bakdata.kpops.test.SomeOtherSchemaClass", {} ) - find_class_mock.assert_called_once_with(TEST_SCHEMA_PROVIDER_MODULE, SchemaProvider) + assert isinstance(schema_handler.schema_provider, TestSchemaProvider) def test_should_raise_value_error_if_schema_provider_class_not_found( kpops_config: KpopsConfig, ): - kpops_config.components_module = NON_EXISTING_PROVIDER_MODULE schema_handler = SchemaHandler(kpops_config) with pytest.raises( ValueError, - match="No schema provider found in components module pydantic.main. " + match="No schema provider found. " "Please implement the abstract method in " f"{SchemaProvider.__module__}.{SchemaProvider.__name__}.", ): @@ -137,35 +120,8 @@ def test_should_raise_value_error_if_schema_provider_class_not_found( ) -@pytest.mark.parametrize( - ("components_module"), - [ - pytest.param( - None, - id="components_module = None", - ), - pytest.param( - "", - id="components_module = ''", - ), - ], -) -def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty( - kpops_config: KpopsConfig, components_module: str | None -): - kpops_config.components_module = components_module - schema_handler = SchemaHandler.load_schema_handler(kpops_config) - assert schema_handler is not None - with pytest.raises( - ValueError, - match="The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your SchemaProvider implementation exists.", - ): - schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} - ) - - @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( to_section: ToSection, log_info_mock: MagicMock, @@ -185,6 +141,7 @@ async def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_t @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -207,6 +164,7 @@ async def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatible_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -244,6 +202,7 @@ async def test_should_raise_exception_when_submit_schema_that_exists_and_not_com @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_version_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -279,6 +238,7 @@ async def test_should_log_debug_when_submit_schema_that_exists_and_registered_un @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_submit_non_existing_schema_when_not_dry( topic_config: TopicConfig, to_section: ToSection, diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index ab21ef68e..f8fe12eec 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -6,7 +6,7 @@ import pydantic import pytest -from kpops.api.file_type import KpopsFileType +from kpops.api.file_type import DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.base_defaults_component import ( BaseDefaultsComponent, @@ -17,10 +17,6 @@ from kpops.utils.environment import ENV from tests.components import PIPELINE_BASE_DIR, RESOURCES_PATH -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() - -DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file() - class Parent(BaseDefaultsComponent): __test__ = False diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 1b98e4ac4..bf5cbb6b9 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -12,8 +12,10 @@ KafkaConnectorConfig, KafkaConnectorType, ) -from kpops.components import KafkaSinkConnector -from kpops.components.base_components.kafka_connector import KafkaConnectorResetter +from kpops.components.base_components.kafka_connector import ( + KafkaConnectorResetter, + KafkaSinkConnector, +) from kpops.components.base_components.models.from_section import ( FromSection, FromTopic, diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 4f7184ead..346913e57 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -7,13 +7,15 @@ from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components import ProducerApp from kpops.components.base_components.models.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, ) -from kpops.components.streams_bootstrap.producer.producer_app import ProducerAppCleaner +from kpops.components.streams_bootstrap.producer.producer_app import ( + ProducerApp, + ProducerAppCleaner, +) from kpops.config import KpopsConfig, TopicNameConfig from tests.components import PIPELINE_BASE_DIR diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index cb340174a..a72328d5e 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -12,7 +12,6 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components import StreamsApp from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( ToSection, @@ -27,6 +26,7 @@ StreamsAppAutoScaling, ) from kpops.components.streams_bootstrap.streams.streams_app import ( + StreamsApp, StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig diff --git a/tests/components/test_streams_bootstrap.py b/tests/components/test_streams_bootstrap.py index 3ece4612d..6084c3ea3 100644 --- a/tests/components/test_streams_bootstrap.py +++ b/tests/components/test_streams_bootstrap.py @@ -12,7 +12,10 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.streams_bootstrap import StreamsBootstrap, StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import ( + StreamsBootstrap, + StreamsBootstrapValues, +) from kpops.config import KpopsConfig from tests.components import PIPELINE_BASE_DIR diff --git a/tests/conftest.py b/tests/conftest.py index 5fb77a415..0e23be87c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,8 @@ import logging import os +import shutil from collections.abc import Iterator +from pathlib import Path from unittest import mock import pytest @@ -38,3 +40,14 @@ def mock_env() -> Iterator[Environment]: def load_yaml_file_clear_cache() -> Iterator[None]: yield load_yaml_file.cache.clear() # pyright: ignore[reportFunctionMemberAccess] + + +@pytest.fixture() +def custom_components(): + src = Path("tests/pipeline/test_components") + dst = Path("kpops/components/test_components") + try: + shutil.copytree(src, dst) + yield + finally: + shutil.rmtree(dst) diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 36f24f938..4502ea69e 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -5,17 +5,15 @@ Schema, SchemaProvider, ) -from kpops.components import ( - KafkaSinkConnector, - PipelineComponent, - ProducerApp, - StreamsApp, -) +from kpops.components.base_components.kafka_connector import KafkaSinkConnector from kpops.components.base_components.models import ModelName, ModelVersion, TopicName from kpops.components.base_components.models.to_section import ( ToSection, ) from kpops.components.base_components.models.topic import OutputTopicTypes, TopicConfig +from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp class ScheduledProducer(ProducerApp): ... diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 1d54a9f7a..9646e569b 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -1,13 +1,11 @@ from typing_extensions import override from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig -from kpops.components import ( - KafkaSinkConnector, - PipelineComponent, - ProducerApp, - StreamsApp, -) +from kpops.components.base_components.kafka_connector import KafkaSinkConnector from kpops.components.base_components.models.topic import OutputTopicTypes +from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp class ScheduledProducer(ProducerApp): ... diff --git a/tests/pipeline/test_example.py b/tests/pipeline/test_example.py index 1101b5795..7a03ae2ee 100644 --- a/tests/pipeline/test_example.py +++ b/tests/pipeline/test_example.py @@ -5,7 +5,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops runner = CliRunner() diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index ef68aadf9..1d2fce41f 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -9,20 +9,19 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.api.exception import ParsingException, ValidationError -from kpops.api.file_type import KpopsFileType +from kpops.api.file_type import PIPELINE_YAML, KpopsFileType from kpops.cli.main import FilterType, app -from kpops.components import KafkaSinkConnector, PipelineComponent - -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +from kpops.components.base_components.kafka_connector import KafkaSinkConnector +from kpops.components.base_components.pipeline_component import PipelineComponent runner = CliRunner() RESOURCE_PATH = Path(__file__).parent / "resources" -@pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache") +@pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache", "custom_components") class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index 445e528cc..45f3e3a94 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -7,7 +7,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.cli.main import app from kpops.component_handlers.helm_wrapper.helm import Helm from kpops.component_handlers.helm_wrapper.model import HelmConfig, Version diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b5c741ad0..61d338dcf 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -7,9 +7,9 @@ from kpops.component_handlers import ( ComponentHandlers, ) -from kpops.components import PipelineComponent from kpops.components.base_components.models.from_section import FromSection from kpops.components.base_components.models.to_section import ToSection +from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.pipeline import Pipeline PREFIX = "example-prefix-"