diff --git a/src/ziggurat/header_transformer.clj b/src/ziggurat/header_transformer.clj index f587fb0a..dd72c5c9 100644 --- a/src/ziggurat/header_transformer.clj +++ b/src/ziggurat/header_transformer.clj @@ -1,17 +1,17 @@ (ns ziggurat.header-transformer - (:import [org.apache.kafka.streams.kstream ValueTransformer] + (:import [org.apache.kafka.streams.kstream ValueTransformerWithKey] [org.apache.kafka.streams.processor ProcessorContext])) -(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer +(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformerWithKey (^void init [_ ^ProcessorContext context] (set! processor-context context)) - (transform [_ record-value] + (transform [_ record-key record-value] (let [topic (.topic processor-context) timestamp (.timestamp processor-context) partition (.partition processor-context) headers (.headers processor-context) metadata {:topic topic :timestamp timestamp :partition partition}] - {:value record-value :headers headers :metadata metadata})) + {:value record-value :headers headers :metadata metadata :key record-key})) (close [_] nil)) (defn create [] diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index d867cd76..6c1772a2 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -16,7 +16,7 @@ [org.apache.kafka.common.errors TimeoutException] [org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse] - [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] + [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerWithKeySupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream @@ -77,7 +77,7 @@ (defn- header-transformer-supplier [] - (reify ValueTransformerSupplier + (reify ValueTransformerWithKeySupplier (get [_] (header-transformer/create)))) (defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] @@ -123,9 +123,11 @@ (close-stream topic-entity stream))) (defn- mapped-handler-fn [handler-fn channels message topic-entity] + (log/debug (format "See my message - %s", message)) (try ((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity) + (assoc :key (:key message)) (assoc :headers (:headers message)) (assoc :metadata (:metadata message)))) (finally))) diff --git a/test/ziggurat/header_transformer_test.clj b/test/ziggurat/header_transformer_test.clj index e3c192b5..c21252bf 100644 --- a/test/ziggurat/header_transformer_test.clj +++ b/test/ziggurat/header_transformer_test.clj @@ -17,8 +17,8 @@ (partition [_] partition)) transformer (create) _ (.init transformer context) - transformed-val (.transform transformer "val")] - (is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))) + transformed-val (.transform transformer "key" "val")] + (is (= {:key "key" :value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))) (testing "transforms value with nil headers when not passed" (let [topic "topic" @@ -31,5 +31,5 @@ (partition [_] partition)) transformer (create) _ (.init transformer context) - transformed-val (.transform transformer "val")] - (is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))) + transformed-val (.transform transformer "key" "val")] + (is (= {:key "key" :value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))