Skip to content

Commit

Permalink
Adds test
Browse files Browse the repository at this point in the history
  • Loading branch information
rootxakash committed Oct 17, 2024
1 parent faf81fe commit 3ac9b2a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 43 deletions.
19 changes: 7 additions & 12 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
:enabled
:jaas])

(defn- not-blank?
[s]
(and (not (nil? s)) (not (str/blank? (str/trim s)))))

(defn- to-list
[s]
(if (empty? s)
Expand Down Expand Up @@ -201,20 +205,12 @@
(.setProperty p sk nv))))
p)

(defn create-jaas-properties [user-name password login-module]
(let [username-str (if user-name (format " username=\"%s\"" user-name) "")
password-str (if password (format " password=\"%s\"" password) "")
(defn create-jaas-properties [username password login-module]
(let [username-str (when (not-blank? username) (format " username=\"%s\"" username))
password-str (when (not-blank? password) (format " password=\"%s\"" password))
credentials (str username-str password-str)]
(format "%s required%s;" login-module (if (empty? credentials) "" credentials))))

(defn- add-ssl-properties
[properties ssl-config-map]
(if (and (some? (:ssl-truststore-location ssl-config-map)) (some? (:ssl-truststore-password ssl-config-map)))
(doto properties
(.put SslConfigs/SSL_TRUSTSTORE_LOCATION_CONFIG (:ssl-truststore-location ssl-config-map))
(.put SslConfigs/SSL_TRUSTSTORE_PASSWORD_CONFIG (:ssl-truststore-password ssl-config-map))))
properties)

(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
Expand Down Expand Up @@ -262,7 +258,6 @@
protocol (get ssl-config-map :protocol)]
(if (true? ssl-configs-enabled)
(as-> properties pr
(add-ssl-properties pr ssl-config-map)
(add-jaas-properties pr jaas-config)
(add-sasl-properties pr mechanism protocol)
(reduce-kv set-property-fn pr ssl-config-map))
Expand Down
94 changes: 63 additions & 31 deletions test/ziggurat/config_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
streams-config-mapping-table)
set-all-property (partial set-property config-mapping-table)
build-all-config-properties (partial build-properties set-all-property)]

(testing "all valid kafka configs"
(let [config-map {:auto-offset-reset :latest
:replication-factor 2
Expand All @@ -174,18 +175,21 @@
(is (= replication-factor "2"))
(is (= enable-idempotence "true"))
(is (= group-id "foo"))))

(testing "valid kafka consumer configs converts commit-interval-ms to auto-commit-interval-ms"
(let [config-map {:commit-interval-ms 5000}
props (build-consumer-config-properties config-map)
auto-commit-interval-ms (.getProperty props "auto.commit.interval.ms")]
(is (= auto-commit-interval-ms "5000"))))

(testing "valid kafka streams configs does not convert commit-interval-ms to auto-commit-interval-ms"
(let [config-map {:commit-interval-ms 5000}
props (build-streams-config-properties config-map)
auto-commit-interval-ms (.getProperty props "auto.commit.interval.ms" "NOT FOUND")
commit-interval-ms (.getProperty props "commit.interval.ms")]
(is (= auto-commit-interval-ms "NOT FOUND"))
(is (= commit-interval-ms "5000"))))

(testing "mapping table for backward compatibility"
(let [config-map {:auto-offset-reset-config "latest"
:changelog-topic-replication-factor 2
Expand Down Expand Up @@ -226,6 +230,7 @@
(is (= value-deserializer "value-deserializer"))
(is (= value-serializer "value-serializer"))
(is (= group-id "foo"))))

(testing "non kafka config keys should not be in Properties"
(let [config-map {:consumer-type :joins
:producer {:foo "bar"}
Expand All @@ -238,16 +243,17 @@
:poll-timeout-ms-config 10000}
props (build-all-config-properties config-map)]
(doall
(map (fn [[k _]]
(let [string-key (str/replace (name k) #"-" ".")
not-found "NOT FOUND!"
v (.getProperty props string-key not-found)]
(is (= v not-found))))
config-map))))
(map (fn [[k _]]
(let [string-key (str/replace (name k) #"-" ".")
not-found "NOT FOUND!"
v (.getProperty props string-key not-found)]
(is (= v not-found))))
config-map))))

(testing "should set ssl properties for streams if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest
:group-id "foo"}
props (build-streams-config-properties streams-config-map)
Expand All @@ -259,25 +265,33 @@
(is (= group-id "foo"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))

(testing "should set ssl properties for consumer API if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
:ssl-keystore-password "some-password"
:ssl-truststore-location "/some/truststore/location"
:ssl-truststore-password "some-truststore-password"})]
(let [streams-config-map {:max-poll-records 500
:enable-auto-commit true}
props (build-consumer-config-properties streams-config-map)
max-poll-records (.getProperty props "max.poll.records")
enable-auto-comit (.getProperty props "enable.auto.commit")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
ssl-ks-password (.getProperty props "ssl.keystore.password")
ssl-ts-location (.getProperty props "ssl.truststore.location")
ssl-ts-password (.getProperty props "ssl.truststore.password")]
(is (= max-poll-records "500"))
(is (= enable-auto-comit "true"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))
(is (= ssl-ks-password "some-password"))
(is (= ssl-ts-location "/some/truststore/location"))
(is (= ssl-ts-password "some-truststore-password")))))

(testing "should set ssl properties for producer API if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:batch.size 500
:acks 1}
props (build-producer-config-properties streams-config-map)
Expand All @@ -289,6 +303,7 @@
(is (= acks "1"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))

(testing "should not set ssl properties for streams if eenabled is set to false"
(with-redefs [ssl-config (constantly {:enabled false
:ssl-keystore-location "/some/location"
Expand All @@ -304,10 +319,11 @@
(is (= group-id "foo"))
(is (nil? ssl-ks-location))
(is (nil? ssl-ks-password)))))

(testing "ssl properties from streams config map overrides the ssl properties provided in [:ziggurat :ssl]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest
:ssl-keystore-location "/some/different/location"
:ssl-keystore-password "different-password"}
Expand All @@ -318,14 +334,15 @@
(is (= auto-offset-reset "latest"))
(is (= ssl-ks-location "/some/different/location"))
(is (= ssl-ks-password "different-password")))))

(testing "ssl properties create jaas template from the map provided in [:ziggurat :ssl :jaas]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"
:mechanism "SCRAM-SHA-512"
:jaas {:username "myuser"
:password "mypassword"
:login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})]
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"
:mechanism "SCRAM-SHA-512"
:jaas {:username "myuser"
:password "mypassword"
:login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})]
(let [streams-config-map {:auto-offset-reset :latest}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
Expand All @@ -336,10 +353,11 @@
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password"))
(is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule"))))))

(testing "ssl properties DO NOT create jaas template if no value is provided for key sequence [:ziggurat :ssl :jaas]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
Expand All @@ -350,6 +368,7 @@
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password"))
(is (nil? sasl-jaas-config)))))

(testing "sasl properties create jaas template from the map provided in [:ziggurat :sasl :jaas]"
(with-redefs [sasl-config (constantly {:enabled true
:protocol "SASL_PLAINTEXT"
Expand All @@ -361,11 +380,24 @@
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
sasl-jaas-config (.getProperty props "sasl.jaas.config")
sasl-protocol (.getProperty props "security.protocol")
sasl-mechanism (.getProperty props "sasl.mechanism")]
sasl-protocol (.getProperty props "security.protocol")]
(is (= auto-offset-reset "latest"))
(is (= sasl-protocol "SASL_PLAINTEXT"))
(is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule"))))))

(testing "sasl properties create jaas template from the map provided in [:ziggurat :sasl :jaas] without username password"
(with-redefs [sasl-config (constantly {:enabled true
:protocol "SASL_PLAINTEXT"
:mechanism "SCRAM-SHA-256"
:jaas {:login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})]
(let [streams-config-map {:auto-offset-reset :latest}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
sasl-jaas-config (.getProperty props "sasl.jaas.config")
sasl-protocol (.getProperty props "security.protocol")]
(is (= auto-offset-reset "latest"))
(is (= sasl-protocol "SASL_PLAINTEXT"))
(is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule"))))))))
(is (= sasl-jaas-config (create-jaas-properties "" "" "org.apache.kafka.common.security.scram.ScramLoginModule"))))))))

(deftest test-set-property
(testing "set-property with empty (with spaces) value"
Expand Down

0 comments on commit 3ac9b2a

Please sign in to comment.