Skip to content

Commit

Permalink
Trim Helm name override for Producer CronJob to 52 characters (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Dec 9, 2024
1 parent 420dc6f commit 6489eba
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
3 changes: 3 additions & 0 deletions kpops/component_handlers/kubernetes/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

K8S_LABEL_MAX_LEN = 63

# https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs
K8S_CRON_JOB_NAME_MAX_LEN = 52


class KubernetesManifest(UserDict[str, JsonType]):
"""Representation of a Kubernetes API object as YAML/JSON mapping."""
Expand Down
13 changes: 13 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pydantic import Field, ValidationError, computed_field
from typing_extensions import override

from kpops.component_handlers.kubernetes.model import K8S_CRON_JOB_NAME_MAX_LEN
from kpops.component_handlers.kubernetes.utils import trim
from kpops.components.base_components.kafka_app import KafkaAppCleaner
from kpops.components.common.app_type import AppType
from kpops.components.common.topic import (
Expand Down Expand Up @@ -54,6 +56,10 @@ class ProducerApp(StreamsBootstrap):
description=describe_attr("from_", __doc__),
)

@property
def is_cron_job(self) -> bool:
return bool(not self.values.deployment and self.values.schedule)

@computed_field
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
Expand All @@ -70,6 +76,13 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
case _:
super().apply_to_outputs(name, topic)

@property
@override
def helm_name_override(self) -> str:
if self.is_cron_job:
return trim(K8S_CRON_JOB_NAME_MAX_LEN, self.full_name, "")
return super().helm_name_override

@property
@override
def output_topic(self) -> KafkaTopic | None:
Expand Down
44 changes: 41 additions & 3 deletions tests/components/streams_bootstrap/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,39 @@ def test_release_name(self):

@pytest.fixture()
def producer_app(self) -> ProducerApp:
return ProducerApp(
name=PRODUCER_APP_NAME,
**{
producer = ProducerApp.model_validate(
{
"name": PRODUCER_APP_NAME,
"version": "3.2.1",
"namespace": "test-namespace",
"values": {
"image": "ProducerApp",
"kafka": {"bootstrapServers": "fake-broker:9092"},
},
"clean_schemas": True,
"to": {
"topics": {
"producer-app-output-topic": TopicConfig(
type=OutputTopicTypes.OUTPUT, partitions_count=10
),
}
},
},
)
assert producer.is_cron_job is False
return producer

@pytest.fixture()
def producer_app_cron_job(self) -> ProducerApp:
producer = ProducerApp.model_validate(
{
"name": PRODUCER_APP_NAME,
"version": "3.2.1",
"namespace": "test-namespace",
"values": {
"image": "ProducerApp",
"kafka": {"bootstrapServers": "fake-broker:9092"},
"schedule": "0 12 * * *",
},
"clean_schemas": True,
"to": {
Expand All @@ -60,11 +85,24 @@ def producer_app(self) -> ProducerApp:
},
},
)
assert producer.is_cron_job is True
return producer

@pytest.fixture(autouse=True)
def empty_helm_get_values(self, mocker: MockerFixture) -> MagicMock:
return mocker.patch.object(Helm, "get_values", return_value=None)

def test_helm_name_override(self, producer_app: ProducerApp):
assert len(producer_app.helm_name_override) == 63
assert producer_app.helm_name_override == PRODUCER_APP_HELM_NAME_OVERRIDE

def test_cron_job_helm_name_override(self, producer_app_cron_job: ProducerApp):
assert len(producer_app_cron_job.helm_name_override) == 52
assert (
producer_app_cron_job.helm_name_override
== "${pipeline.name}-test-producer-app-with-long-n-c4c51"
)

def test_cleaner(self, producer_app: ProducerApp):
cleaner = producer_app._cleaner
assert isinstance(cleaner, ProducerAppCleaner)
Expand Down

0 comments on commit 6489eba

Please sign in to comment.