Skip to content

Commit

Permalink
fix: proper conversion of headers for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
hairmare committed Dec 2, 2024
1 parent de8ba29 commit 65e9087
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions minioevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import TYPE_CHECKING, Any, NoReturn

from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from cloudevents.kafka import KafkaMessage, to_structured
from configargparse import ArgumentParser # type: ignore[import-untyped]
from kafka import KafkaConsumer, KafkaProducer # type: ignore[import-untyped]

Expand Down Expand Up @@ -98,7 +98,7 @@ def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover
def on_send_error(ex: Exception) -> None: # pragma: no cover
logger.error("Failed to send CloudEvent", exc_info=ex)

def _key_mapper(ce: CloudEvent) -> Any | None: # noqa: ANN401
def _key_mapper(ce: CloudEvent) -> str:
return ".".join(
[
ce.get("type"), # type: ignore[list-item]
Expand All @@ -109,12 +109,15 @@ def _key_mapper(ce: CloudEvent) -> Any | None: # noqa: ANN401

for msg in consumer:
for ce in from_consumer_record(msg):
km = to_structured(ce, key_mapper=_key_mapper)
km: KafkaMessage = to_structured(ce, key_mapper=_key_mapper)
headers: list[tuple[str, bytes]] | None
if km.headers:
headers = list(km.headers.items())
producer.send(
producer_topic,
key=km.key,
value=km.value,
headers=km.headers if km.headers else [],
headers=headers,
).add_errback(on_send_error)
producer.flush()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_minioevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,5 @@ def test_app(mock_consumer, mock_producer):
),
"utf-8",
),
headers={"content-type": b"application/json"},
headers=[("content-type", b"application/json")],
)

0 comments on commit 65e9087

Please sign in to comment.