Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix io bugs #3

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 80 additions & 31 deletions src/piotr_yuxuan/slava.clj
Original file line number Diff line number Diff line change
@@ -1,51 +1,90 @@
(ns piotr-yuxuan.slava
"FIXME add cljdoc"
(:require [piotr-yuxuan.slava.encode :refer [encode]]
(:require [piotr-yuxuan.slava.config :as config]
[piotr-yuxuan.slava.decode :refer [decode]]
[piotr-yuxuan.slava.config :as config])
(:import (io.confluent.kafka.schemaregistry.avro AvroSchema)
[piotr-yuxuan.slava.encode :refer [encode]])
(:import (clojure.lang Atom Obj)
(io.confluent.kafka.schemaregistry.avro AvroSchema)
(io.confluent.kafka.schemaregistry.client SchemaRegistryClient)
(io.confluent.kafka.serializers KafkaAvroSerializer KafkaAvroSerializerConfig KafkaAvroDeserializer)
(io.confluent.kafka.serializers.subject.strategy SubjectNameStrategy)
(java.nio ByteBuffer)
(java.util Map)
(org.apache.avro Schema)
(org.apache.avro.generic GenericContainer)
(org.apache.kafka.common.serialization Serializer Deserializer Serdes Serde)
(clojure.lang Atom)
(org.apache.avro.generic GenericContainer)))
(java.util.concurrent ConcurrentHashMap)))

(defn subject-name
(defn ^String subject-name
"FIXME add cljdoc"
[{:keys [key? ^SubjectNameStrategy subject-name-strategy]} ^String topic]
(.subjectName subject-name-strategy topic key? nil))

(defn resolve-subject-name
(defn ^String resolve-subject-name
"FIXME add cljdoc"
[config ^String topic m]
(if (contains? (meta m) :piotr-yuxuan.slava/subject-name)
(get (meta m) :piotr-yuxuan.slava/subject-name)
(if (contains? (meta m) ::subject-name)
(get (meta m) ::subject-name)
(subject-name config topic)))

(defn cached-schema!
"FIXME add cljdoc"
[^SchemaRegistryClient inner-client schema-id]
(.rawSchema ^AvroSchema (.getSchemaById inner-client schema-id)))

(defn default-subject-name->id
[subject-name->id inner-client subject-name]
(if (contains? @subject-name->id subject-name)
(get @subject-name->id subject-name)
(let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))]
(swap! subject-name->id assoc subject-name retrieved-id)
retrieved-id)))

(defn subject-name->id
[config inner-client subject-name]
(let [subject-name->id (get-in config [:subject-name->id :ref])
through (get-in config [:subject-name->id :through] default-subject-name->id)]
(through subject-name->id subject-name)
(get @subject-name->id subject-name
(let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))]
(swap! subject-name->id assoc subject-name retrieved-id)
retrieved-id))))

(defn schema-id!
"FIXME add cljdoc"
;; FIXME on every serialization. Should be cached.
[^SchemaRegistryClient inner-client ^String subject-name]
(.getId (.getLatestSchemaMetadata inner-client subject-name)))
[config topic ^Map m]
(let [subject-name->id (get-in config [:subject-name->id :through] subject-name->id)]
(subject-name->id (resolve-subject-name config topic m))))

(defn resolve-schema-id
"FIXME add cljdoc"
[inner-client ^Map m ^String subject-name]
(if (contains? (meta m) :piotr-yuxuan.slava/schema-id)
(get (meta m) :piotr-yuxuan.slava/schema-id)
(schema-id! inner-client subject-name)))
[config topic ^Map m]
(if (contains? (meta m) ::schema-id)
(get (meta m) ::schema-id)
(schema-id! config topic m)))

(defn resolve-schema
(defn ^Schema resolve-schema
"FIXME add cljdoc"
^Schema [^SchemaRegistryClient inner-client ^Map m schema-id]
(cond (contains? (meta m) :piotr-yuxuan.slava/writer-schema) (get (meta m) :piotr-yuxuan.slava/writer-schema)
(contains? (meta m) :piotr-yuxuan.slava/reader-schema) (get (meta m) :piotr-yuxuan.slava/reader-schema)
:else (.rawSchema ^AvroSchema (.getSchemaById inner-client schema-id))))
[config ^SchemaRegistryClient inner-client topic ^Map m]
(cond (contains? (meta m) ::schema) (get (meta m) ::schema) ; User-defined, takes precedence.
(contains? (meta m) ::writer-schema) (get (meta m) ::writer-schema)
(contains? (meta m) ::reader-schema) (get (meta m) ::reader-schema)
:else (cached-schema! inner-client (resolve-schema-id config topic m))))

(defn subject-name->id
[inner-client value]
(let [found (get value :subject-name->id)]
(if (= :default found)
(let [subject-name->id (atom {})]
{:ref subject-name->id
:through (fn stub-through [subject-name]
(get @subject-name->id subject-name
(let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))]
(swap! subject-name->id assoc subject-name retrieved-id)
retrieved-id)))})
found)))

(defn subject-name-strategy
(defn ^SubjectNameStrategy subject-name-strategy
"FIXME add cljdoc"
[inner-config key?]
(let [inner-config-obj (KafkaAvroSerializerConfig. inner-config)]
Expand All @@ -55,11 +94,12 @@

(defn configure!
"FIXME add cljdoc"
[{:keys [config inner]} value key?]
[{:keys [config inner inner-client]} value key?]
(let [inner-config (->> value
(remove (comp config/slava-key? key))
(into {}))]
(reset! config (assoc value
:subject-name->id (subject-name->id inner-client value)
:key? key?
:subject-name-strategy (subject-name-strategy inner-config key?)))
;; Reflection warning: either a KafkaAvroSerializer or a KafkaAvroDeserializer.
Expand All @@ -71,13 +111,20 @@
Serializer
(configure [this value key?] (configure! this value key?))
(serialize [_ topic m]
(->> (resolve-subject-name @config topic m)
(resolve-schema-id inner-client m)
(resolve-schema inner-client m)
(->> (resolve-schema config inner-client topic m)
(encode @config m)
(.serialize inner topic)))
(close [_] (.close inner)))

(def int-size
"In the JVM, an int always uses 4 bytes."
4)

(defn schema-id
"Extract the schema id as known in the schema registry."
[data]
(.getInt (ByteBuffer/wrap data 0 int-size)))

(defrecord ClojureDeserializer [^Atom config
^KafkaAvroDeserializer inner
^SchemaRegistryClient inner-client]
Expand All @@ -86,29 +133,31 @@
(deserialize [_ topic data]
(let [^GenericContainer generic-container (.deserialize inner topic data)
reader-schema (.getSchema generic-container)
m (decode @config generic-container reader-schema)
subject-name (resolve-subject-name @config topic m)]
m (decode @config generic-container reader-schema)]
(vary-meta m assoc
:piotr-yuxuan.slava/reader-schema reader-schema
:piotr-yuxuan.slava/subject-name subject-name
:piotr-yuxuan.slava/schema-id (resolve-schema-id inner-client m subject-name))))
::reader-schema reader-schema
::subject-name (subject-name @config topic)
::schema-id (schema-id data))))
(close [_] (.close inner)))

(defn ^ClojureSerializer serializer
"FIXME add cljdoc"
([inner-client]
(ClojureSerializer. (atom nil) (KafkaAvroSerializer. inner-client) inner-client))
([inner-client config key?]
(doto (serializer inner-client)
(.configure config key?))))

(defn ^ClojureDeserializer deserializer
"FIXME add cljdoc"
([inner-client]
(ClojureDeserializer. (atom nil) (KafkaAvroDeserializer. inner-client) inner-client))
([inner-client config key?]
(doto (deserializer inner-client)
(.configure config key?))))

(defn ^Serde clojure-serde
"FIXME add cljdoc"
([inner-client]
(Serdes/serdeFrom (serializer inner-client)
(deserializer inner-client)))
Expand Down
3 changes: 2 additions & 1 deletion src/piotr_yuxuan/slava/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
(merge
{:record-key-fn (constantly nil)
:clojure-types clojure-types
:generic-concrete-types generic-concrete-types}
:generic-concrete-types generic-concrete-types
:subject-name->id :default}
avro-decoders
avro-encoders))

Expand Down
23 changes: 3 additions & 20 deletions test/piotr_yuxuan/slava_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,6 @@
(.name "field") .type .intType .noDefault
^GenericData$Record .endRecord))

(deftest schema-id!-test
(let [previous-version-id 2
previous-schema-id 2
version-id 3
schema-id 3
inner-client (doto (MockSchemaRegistryClient.)
(.register "subject-name"
(AvroSchema. previous-schema)
previous-version-id
previous-schema-id)
(.register "subject-name"
(AvroSchema. schema)
version-id
schema-id))]
(testing "get latest schema version"
(is (= (slava/schema-id! inner-client "subject-name")
schema-id)))))

(def ^Integer version-id
(rand-int 100))

Expand Down Expand Up @@ -131,10 +113,11 @@

(deftest resolve-schema-test
(is (= schema (slava/resolve-schema
config/default
(doto (MockSchemaRegistryClient.)
(.register "subject-name" (AvroSchema. schema) version-id schema-id))
{}
schema-id)))
topic
{})))
(is (= writer-schema
(slava/resolve-schema (MockSchemaRegistryClient.)
(with-meta {} {:piotr-yuxuan.slava/writer-schema writer-schema
Expand Down