diff --git a/kpops/component_handlers/kubernetes/model.py b/kpops/component_handlers/kubernetes/model.py index 5970de1bf..e2f7fece7 100644 --- a/kpops/component_handlers/kubernetes/model.py +++ b/kpops/component_handlers/kubernetes/model.py @@ -10,6 +10,9 @@ K8S_LABEL_MAX_LEN = 63 +# https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs +K8S_CRON_JOB_NAME_MAX_LEN = 52 + class KubernetesManifest(UserDict[str, JsonType]): """Representation of a Kubernetes API object as YAML/JSON mapping.""" diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 21cda9ebd..44fe2e592 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -4,6 +4,8 @@ from pydantic import Field, ValidationError, computed_field from typing_extensions import override +from kpops.component_handlers.kubernetes.model import K8S_CRON_JOB_NAME_MAX_LEN +from kpops.component_handlers.kubernetes.utils import trim from kpops.components.base_components.kafka_app import KafkaAppCleaner from kpops.components.common.app_type import AppType from kpops.components.common.topic import ( @@ -54,6 +56,10 @@ class ProducerApp(StreamsBootstrap): description=describe_attr("from_", __doc__), ) + @property + def is_cron_job(self) -> bool: + return bool(not self.values.deployment and self.values.schedule) + @computed_field @cached_property def _cleaner(self) -> ProducerAppCleaner: @@ -70,6 +76,13 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: case _: super().apply_to_outputs(name, topic) + @property + @override + def helm_name_override(self) -> str: + if self.is_cron_job: + return trim(K8S_CRON_JOB_NAME_MAX_LEN, self.full_name, "") + return super().helm_name_override + @property @override def output_topic(self) -> KafkaTopic | None: diff --git a/tests/components/streams_bootstrap/test_producer_app.py b/tests/components/streams_bootstrap/test_producer_app.py index 67b5e2929..3cbc510eb 100644 --- a/tests/components/streams_bootstrap/test_producer_app.py +++ b/tests/components/streams_bootstrap/test_producer_app.py @@ -41,14 +41,39 @@ def test_release_name(self): @pytest.fixture() def producer_app(self) -> ProducerApp: - return ProducerApp( - name=PRODUCER_APP_NAME, - **{ + producer = ProducerApp.model_validate( + { + "name": PRODUCER_APP_NAME, + "version": "3.2.1", + "namespace": "test-namespace", + "values": { + "image": "ProducerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "clean_schemas": True, + "to": { + "topics": { + "producer-app-output-topic": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + }, + }, + ) + assert producer.is_cron_job is False + return producer + + @pytest.fixture() + def producer_app_cron_job(self) -> ProducerApp: + producer = ProducerApp.model_validate( + { + "name": PRODUCER_APP_NAME, "version": "3.2.1", "namespace": "test-namespace", "values": { "image": "ProducerApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, + "schedule": "0 12 * * *", }, "clean_schemas": True, "to": { @@ -60,11 +85,24 @@ def producer_app(self) -> ProducerApp: }, }, ) + assert producer.is_cron_job is True + return producer @pytest.fixture(autouse=True) def empty_helm_get_values(self, mocker: MockerFixture) -> MagicMock: return mocker.patch.object(Helm, "get_values", return_value=None) + def test_helm_name_override(self, producer_app: ProducerApp): + assert len(producer_app.helm_name_override) == 63 + assert producer_app.helm_name_override == PRODUCER_APP_HELM_NAME_OVERRIDE + + def test_cron_job_helm_name_override(self, producer_app_cron_job: ProducerApp): + assert len(producer_app_cron_job.helm_name_override) == 52 + assert ( + producer_app_cron_job.helm_name_override + == "${pipeline.name}-test-producer-app-with-long-n-c4c51" + ) + def test_cleaner(self, producer_app: ProducerApp): cleaner = producer_app._cleaner assert isinstance(cleaner, ProducerAppCleaner)