Skip to content

Commit

Permalink
Create generic SerializeAsOptional type for Pydantic (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Dec 18, 2024
1 parent cb0c477 commit fd1acf5
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 180 deletions.
112 changes: 56 additions & 56 deletions docs/docs/schema/defaults.json

Large diffs are not rendered by default.

84 changes: 42 additions & 42 deletions docs/docs/schema/pipeline.json

Large diffs are not rendered by default.

69 changes: 37 additions & 32 deletions kpops/components/common/kubernetes_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from pydantic import Field, model_validator

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import CamelCaseConfigModel, DescConfigModel
from kpops.utils.pydantic import (
CamelCaseConfigModel,
DescConfigModel,
SerializeAsOptional,
SerializeAsOptionalModel,
)

if TYPE_CHECKING:
try:
Expand Down Expand Up @@ -91,18 +96,18 @@ def validate_values(self) -> Self:
return self


class NodeSelectorTerm(DescConfigModel, CamelCaseConfigModel):
class NodeSelectorTerm(SerializeAsOptionalModel, DescConfigModel, CamelCaseConfigModel):
"""A null or empty node selector term matches no objects. The requirements of them are ANDed. The TopologySelectorTerm type implements a subset of the NodeSelectorTerm.
:param match_expressions: A list of node selector requirements by node's labels.
:param match_fields: A list of node selector requirements by node's fields.
"""

match_expressions: list[NodeSelectorRequirement] | None = Field(
default=None, description=describe_attr("match_expressions", __doc__)
match_expressions: SerializeAsOptional[list[NodeSelectorRequirement]] = Field(
default=[], description=describe_attr("match_expressions", __doc__)
)
match_fields: list[NodeSelectorRequirement] | None = Field(
default=None, description=describe_attr("match_fields", __doc__)
match_fields: SerializeAsOptional[list[NodeSelectorRequirement]] = Field(
default=[], description=describe_attr("match_fields", __doc__)
)


Expand Down Expand Up @@ -130,7 +135,7 @@ class PreferredSchedulingTerm(DescConfigModel, CamelCaseConfigModel):
weight: Weight = Field(description=describe_attr("weight", __doc__))


class NodeAffinity(DescConfigModel, CamelCaseConfigModel):
class NodeAffinity(SerializeAsOptionalModel, DescConfigModel, CamelCaseConfigModel):
"""Node affinity is a group of node affinity scheduling rules.
:param required_during_scheduling_ignored_during_execution: If the affinity requirements specified by this field are not met at scheduling time, the pod will not be scheduled onto the node. If the affinity requirements specified by this field cease to be met at some point during pod execution (e.g. due to an update), the system may or may not try to eventually evict the pod from its node.
Expand All @@ -143,10 +148,10 @@ class NodeAffinity(DescConfigModel, CamelCaseConfigModel):
"required_during_scheduling_ignored_during_execution", __doc__
),
)
preferred_during_scheduling_ignored_during_execution: (
list[PreferredSchedulingTerm] | None
) = Field(
default=None,
preferred_during_scheduling_ignored_during_execution: SerializeAsOptional[
list[PreferredSchedulingTerm]
] = Field(
default=[],
description=describe_attr(
"preferred_during_scheduling_ignored_during_execution", __doc__
),
Expand Down Expand Up @@ -190,24 +195,24 @@ def validate_values(self) -> Self:
return self


class LabelSelector(DescConfigModel, CamelCaseConfigModel):
class LabelSelector(SerializeAsOptionalModel, DescConfigModel, CamelCaseConfigModel):
"""A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects.
:param match_labels: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is *key*, the operator is *In*, and the values array contains only *value*. The requirements are ANDed.
:param match_expressions: matchExpressions is a list of label selector requirements. The requirements are ANDed.
"""

match_labels: dict[str, str] | None = Field(
default=None,
match_labels: SerializeAsOptional[dict[str, str]] = Field(
default={},
description=describe_attr("match_labels", __doc__),
)
match_expressions: list[LabelSelectorRequirement] | None = Field(
default=None,
match_expressions: SerializeAsOptional[list[LabelSelectorRequirement]] = Field(
default=[],
description=describe_attr("match_expressions", __doc__),
)


class PodAffinityTerm(DescConfigModel, CamelCaseConfigModel):
class PodAffinityTerm(SerializeAsOptionalModel, DescConfigModel, CamelCaseConfigModel):
"""Defines a set of pods (namely those matching the labelSelector relative to the given namespace(s)) that this pod should be co-located (affinity) or not co-located (anti-affinity) with, where co-located is defined as running on a node whose value of the label with key <topologyKey> matches that of any node on which a pod of the set of pods is running.
:param label_selector: A label query over a set of resources, in this case pods. If it's null, this PodAffinityTerm matches with no Pods.
Expand All @@ -222,19 +227,19 @@ class PodAffinityTerm(DescConfigModel, CamelCaseConfigModel):
default=None,
description=describe_attr("label_selector", __doc__),
)
match_label_keys: list[str] | None = Field(
default=None,
match_label_keys: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("match_label_keys", __doc__),
)
mismatch_label_keys: list[str] | None = Field(
default=None,
mismatch_label_keys: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("mismatch_label_keys", __doc__),
)
topology_key: str = Field(
description=describe_attr("topology_key", __doc__),
)
namespaces: list[str] | None = Field(
default=None,
namespaces: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("namespaces", __doc__),
)
namespace_selector: LabelSelector | None = Field(
Expand All @@ -258,25 +263,25 @@ class WeightedPodAffinityTerm(DescConfigModel, CamelCaseConfigModel):
)


class PodAffinity(DescConfigModel, CamelCaseConfigModel):
class PodAffinity(SerializeAsOptionalModel, DescConfigModel, CamelCaseConfigModel):
"""Pod affinity is a group of inter pod affinity scheduling rules.
:param required_during_scheduling_ignored_during_execution: If the affinity requirements specified by this field are not met at scheduling time, the pod will not be scheduled onto the node. If the affinity requirements specified by this field cease to be met at some point during pod execution (e.g. due to a pod label update), the system may or may not try to eventually evict the pod from its node. When there are multiple elements, the lists of nodes corresponding to each podAffinityTerm are intersected, i.e. all terms must be satisfied.
:param preferred_during_scheduling_ignored_during_execution: The scheduler will prefer to schedule pods to nodes that satisfy the affinity expressions specified by this field, but it may choose a node that violates one or more of the expressions. The node that is most preferred is the one with the greatest sum of weights, i.e. for each node that meets all of the scheduling requirements (resource request, requiredDuringScheduling affinity expressions, etc.), compute a sum by iterating through the elements of this field and adding weight to the sum if the node has pods which matches the corresponding podAffinityTerm; the node(s) with the highest sum are the most preferred.
"""

required_during_scheduling_ignored_during_execution: (
list[PodAffinityTerm] | None
) = Field(
default=None,
required_during_scheduling_ignored_during_execution: SerializeAsOptional[
list[PodAffinityTerm]
] = Field(
default=[],
description=describe_attr(
"required_during_scheduling_ignored_during_execution", __doc__
),
)
preferred_during_scheduling_ignored_during_execution: (
list[WeightedPodAffinityTerm] | None
) = Field(
default=None,
preferred_during_scheduling_ignored_during_execution: SerializeAsOptional[
list[WeightedPodAffinityTerm]
] = Field(
default=[],
description=describe_attr(
"preferred_during_scheduling_ignored_during_execution", __doc__
),
Expand Down
56 changes: 29 additions & 27 deletions kpops/components/streams_bootstrap/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ImagePullPolicy,
ProtocolSchema,
Resources,
SerializeAsOptional,
ServiceType,
Toleration,
)
Expand All @@ -19,6 +20,7 @@
from kpops.utils.pydantic import (
CamelCaseConfigModel,
DescConfigModel,
SerializeAsOptionalModel,
exclude_by_value,
exclude_defaults,
)
Expand Down Expand Up @@ -93,7 +95,7 @@ class JavaOptions(CamelCaseConfigModel, DescConfigModel):
)


class StreamsBootstrapValues(HelmAppValues):
class StreamsBootstrapValues(SerializeAsOptionalModel, HelmAppValues):
"""Base value class for all streams bootstrap related components.
:param image: Docker image of the Kafka producer app.
Expand Down Expand Up @@ -132,8 +134,8 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("image_pull_policy", __doc__),
)

image_pull_secrets: list[dict[str, str]] | None = Field(
default=None,
image_pull_secrets: SerializeAsOptional[list[dict[str, str]]] = Field(
default=[],
description=describe_attr("image_pull_secret", __doc__),
)

Expand All @@ -146,8 +148,8 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("resources", __doc__),
)

ports: list[PortConfig] | None = Field(
default=None,
ports: SerializeAsOptional[list[PortConfig]] = Field(
default=[],
description=describe_attr("ports", __doc__),
)

Expand All @@ -161,33 +163,33 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("configuration_env_prefix", __doc__),
)

command_line: dict[str, str | bool | int] | None = Field(
default=None,
command_line: SerializeAsOptional[dict[str, str | bool | int]] = Field(
default={},
description=describe_attr("command_line", __doc__),
)

env: dict[str, str] | None = Field(
default=None,
env: SerializeAsOptional[dict[str, str]] = Field(
default={},
description=describe_attr("env", __doc__),
)

secrets: dict[str, str] | None = Field(
default=None,
secrets: SerializeAsOptional[dict[str, str]] = Field(
default={},
description=describe_attr("secrets", __doc__),
)

secret_refs: dict[str, Any] | None = Field(
default=None,
secret_refs: SerializeAsOptional[dict[str, Any]] = Field(
default={},
description=describe_attr("secret_refs", __doc__),
)

secret_files_refs: list[str] | None = Field(
default=None,
secret_files_refs: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("secret_files_refs", __doc__),
)

files: dict[str, Any] | None = Field(
default=None,
files: SerializeAsOptional[dict[str, Any]] = Field(
default={},
description=describe_attr("files", __doc__),
)

Expand All @@ -196,23 +198,23 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("java_options", __doc__),
)

pod_annotations: dict[str, str] | None = Field(
default=None,
pod_annotations: SerializeAsOptional[dict[str, str]] = Field(
default={},
description=describe_attr("pod_annotations", __doc__),
)

pod_labels: dict[str, str] | None = Field(
default=None,
pod_labels: SerializeAsOptional[dict[str, str]] = Field(
default={},
description=describe_attr("pod_labels", __doc__),
)

liveness_probe: dict[str, Any] | None = Field(
default=None,
liveness_probe: SerializeAsOptional[dict[str, Any]] = Field(
default={},
description=describe_attr("liveness_probe", __doc__),
)

readiness_probe: dict[str, Any] | None = Field(
default=None,
readiness_probe: SerializeAsOptional[dict[str, Any]] = Field(
default={},
description=describe_attr("readiness_probe", __doc__),
)

Expand All @@ -221,8 +223,8 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("affinity", __doc__),
)

tolerations: list[Toleration] | None = Field(
default=None,
tolerations: SerializeAsOptional[list[Toleration]] = Field(
default=[],
description=describe_attr("tolerations", __doc__),
)

Expand Down
9 changes: 6 additions & 3 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ def is_cron_job(self) -> bool:
@computed_field
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
return ProducerAppCleaner(
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"})
)
kwargs = {
name: getattr(self, name)
for name in self.model_fields_set
if name not in {"_cleaner", "from_", "to"}
}
return ProducerAppCleaner.model_validate(kwargs)

@override
def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
Expand Down
24 changes: 14 additions & 10 deletions kpops/components/streams_bootstrap/streams/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from kpops.components.common.kubernetes_model import (
ImagePullPolicy,
Resources,
SerializeAsOptional,
)
from kpops.components.common.topic import KafkaTopic, KafkaTopicStr
from kpops.components.streams_bootstrap.model import (
Expand All @@ -18,6 +19,7 @@
from kpops.utils.pydantic import (
CamelCaseConfigModel,
DescConfigModel,
SerializeAsOptionalModel,
)


Expand Down Expand Up @@ -117,7 +119,9 @@ def add_labeled_input_topics(self, label: str, topics: list[KafkaTopic]) -> None
)


class StreamsAppAutoScaling(CamelCaseConfigModel, DescConfigModel):
class StreamsAppAutoScaling(
SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel
):
"""Kubernetes Event-driven Autoscaling config.
:param enabled: Whether to enable auto-scaling using KEDA., defaults to False
Expand Down Expand Up @@ -191,16 +195,16 @@ class StreamsAppAutoScaling(CamelCaseConfigModel, DescConfigModel):
title="Idle replica count",
description=describe_attr("idle_replicas", __doc__),
)
internal_topics: list[str] | None = Field(
default=None,
internal_topics: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("internal_topics", __doc__),
)
topics: list[str] | None = Field(
default=None,
topics: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("topics", __doc__),
)
additional_triggers: list[str] | None = Field(
default=None,
additional_triggers: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("additional_triggers", __doc__),
)
model_config = ConfigDict(extra="allow")
Expand Down Expand Up @@ -277,7 +281,7 @@ class PrometheusJMXExporterConfig(CamelCaseConfigModel, DescConfigModel):
)


class JMXConfig(CamelCaseConfigModel, DescConfigModel):
class JMXConfig(SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel):
"""JMX configuration options.
:param port: The jmx port which JMX style metrics are exposed.
Expand All @@ -289,8 +293,8 @@ class JMXConfig(CamelCaseConfigModel, DescConfigModel):
description=describe_attr("port", __doc__),
)

metric_rules: list[str] | None = Field(
default=None,
metric_rules: SerializeAsOptional[list[str]] = Field(
default=[],
description=describe_attr("metric_rules", __doc__),
)

Expand Down
9 changes: 6 additions & 3 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ class StreamsApp(StreamsBootstrap):
@computed_field
@cached_property
def _cleaner(self) -> StreamsAppCleaner:
return StreamsAppCleaner(
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"})
)
kwargs = {
name: getattr(self, name)
for name in self.model_fields_set
if name not in {"_cleaner", "from_", "to"}
}
return StreamsAppCleaner.model_validate(kwargs)

@property
@override
Expand Down
Loading

0 comments on commit fd1acf5

Please sign in to comment.