diff --git a/minioevents.py b/minioevents.py index 8fd3a68..e0252b0 100644 --- a/minioevents.py +++ b/minioevents.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, NoReturn from cloudevents.http import CloudEvent -from cloudevents.kafka import to_structured +from cloudevents.kafka import KafkaMessage, to_structured from configargparse import ArgumentParser # type: ignore[import-untyped] from kafka import KafkaConsumer, KafkaProducer # type: ignore[import-untyped] @@ -98,7 +98,7 @@ def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover def on_send_error(ex: Exception) -> None: # pragma: no cover logger.error("Failed to send CloudEvent", exc_info=ex) - def _key_mapper(ce: CloudEvent) -> Any | None: # noqa: ANN401 + def _key_mapper(ce: CloudEvent) -> str: return ".".join( [ ce.get("type"), # type: ignore[list-item] @@ -109,12 +109,15 @@ def _key_mapper(ce: CloudEvent) -> Any | None: # noqa: ANN401 for msg in consumer: for ce in from_consumer_record(msg): - km = to_structured(ce, key_mapper=_key_mapper) + km: KafkaMessage = to_structured(ce, key_mapper=_key_mapper) + headers: list[tuple[str, bytes]] | None + if km.headers: + headers = list(km.headers.items()) producer.send( producer_topic, key=km.key, value=km.value, - headers=km.headers if km.headers else [], + headers=headers, ).add_errback(on_send_error) producer.flush() diff --git a/tests/test_minioevents.py b/tests/test_minioevents.py index 6401ab6..1c94812 100644 --- a/tests/test_minioevents.py +++ b/tests/test_minioevents.py @@ -136,5 +136,5 @@ def test_app(mock_consumer, mock_producer): ), "utf-8", ), - headers={"content-type": b"application/json"}, + headers=[("content-type", b"application/json")], )