diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 7fed9b2a..1160540d 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -172,6 +172,10 @@ :enabled :jaas]) +(defn- not-blank? + [s] + (and (not (nil? s)) (not (str/blank? (str/trim s))))) + (defn- to-list [s] (if (empty? s) @@ -201,20 +205,12 @@ (.setProperty p sk nv)))) p) -(defn create-jaas-properties [user-name password login-module] - (let [username-str (if user-name (format " username=\"%s\"" user-name) "") - password-str (if password (format " password=\"%s\"" password) "") +(defn create-jaas-properties [username password login-module] + (let [username-str (when (not-blank? username) (format " username=\"%s\"" username)) + password-str (when (not-blank? password) (format " password=\"%s\"" password)) credentials (str username-str password-str)] (format "%s required%s;" login-module (if (empty? credentials) "" credentials)))) -(defn- add-ssl-properties - [properties ssl-config-map] - (if (and (some? (:ssl-truststore-location ssl-config-map)) (some? (:ssl-truststore-password ssl-config-map))) - (doto properties - (.put SslConfigs/SSL_TRUSTSTORE_LOCATION_CONFIG (:ssl-truststore-location ssl-config-map)) - (.put SslConfigs/SSL_TRUSTSTORE_PASSWORD_CONFIG (:ssl-truststore-password ssl-config-map)))) - properties) - (defn- add-jaas-properties [properties jaas-config] (if (some? jaas-config) @@ -262,7 +258,6 @@ protocol (get ssl-config-map :protocol)] (if (true? ssl-configs-enabled) (as-> properties pr - (add-ssl-properties pr ssl-config-map) (add-jaas-properties pr jaas-config) (add-sasl-properties pr mechanism protocol) (reduce-kv set-property-fn pr ssl-config-map)) diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index 6085ddc1..0955c799 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -160,6 +160,7 @@ streams-config-mapping-table) set-all-property (partial set-property config-mapping-table) build-all-config-properties (partial build-properties set-all-property)] + (testing "all valid kafka configs" (let [config-map {:auto-offset-reset :latest :replication-factor 2 @@ -174,11 +175,13 @@ (is (= replication-factor "2")) (is (= enable-idempotence "true")) (is (= group-id "foo")))) + (testing "valid kafka consumer configs converts commit-interval-ms to auto-commit-interval-ms" (let [config-map {:commit-interval-ms 5000} props (build-consumer-config-properties config-map) auto-commit-interval-ms (.getProperty props "auto.commit.interval.ms")] (is (= auto-commit-interval-ms "5000")))) + (testing "valid kafka streams configs does not convert commit-interval-ms to auto-commit-interval-ms" (let [config-map {:commit-interval-ms 5000} props (build-streams-config-properties config-map) @@ -186,6 +189,7 @@ commit-interval-ms (.getProperty props "commit.interval.ms")] (is (= auto-commit-interval-ms "NOT FOUND")) (is (= commit-interval-ms "5000")))) + (testing "mapping table for backward compatibility" (let [config-map {:auto-offset-reset-config "latest" :changelog-topic-replication-factor 2 @@ -226,6 +230,7 @@ (is (= value-deserializer "value-deserializer")) (is (= value-serializer "value-serializer")) (is (= group-id "foo")))) + (testing "non kafka config keys should not be in Properties" (let [config-map {:consumer-type :joins :producer {:foo "bar"} @@ -238,16 +243,17 @@ :poll-timeout-ms-config 10000} props (build-all-config-properties config-map)] (doall - (map (fn [[k _]] - (let [string-key (str/replace (name k) #"-" ".") - not-found "NOT FOUND!" - v (.getProperty props string-key not-found)] - (is (= v not-found)))) - config-map)))) + (map (fn [[k _]] + (let [string-key (str/replace (name k) #"-" ".") + not-found "NOT FOUND!" + v (.getProperty props string-key not-found)] + (is (= v not-found)))) + config-map)))) + (testing "should set ssl properties for streams if enabled is set to true" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] (let [streams-config-map {:auto-offset-reset :latest :group-id "foo"} props (build-streams-config-properties streams-config-map) @@ -259,25 +265,33 @@ (is (= group-id "foo")) (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password"))))) + (testing "should set ssl properties for consumer API if enabled is set to true" (with-redefs [ssl-config (constantly {:enabled true :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + :ssl-keystore-password "some-password" + :ssl-truststore-location "/some/truststore/location" + :ssl-truststore-password "some-truststore-password"})] (let [streams-config-map {:max-poll-records 500 :enable-auto-commit true} props (build-consumer-config-properties streams-config-map) max-poll-records (.getProperty props "max.poll.records") enable-auto-comit (.getProperty props "enable.auto.commit") ssl-ks-location (.getProperty props "ssl.keystore.location") - ssl-ks-password (.getProperty props "ssl.keystore.password")] + ssl-ks-password (.getProperty props "ssl.keystore.password") + ssl-ts-location (.getProperty props "ssl.truststore.location") + ssl-ts-password (.getProperty props "ssl.truststore.password")] (is (= max-poll-records "500")) (is (= enable-auto-comit "true")) (is (= ssl-ks-location "/some/location")) - (is (= ssl-ks-password "some-password"))))) + (is (= ssl-ks-password "some-password")) + (is (= ssl-ts-location "/some/truststore/location")) + (is (= ssl-ts-password "some-truststore-password"))))) + (testing "should set ssl properties for producer API if enabled is set to true" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] (let [streams-config-map {:batch.size 500 :acks 1} props (build-producer-config-properties streams-config-map) @@ -289,6 +303,7 @@ (is (= acks "1")) (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password"))))) + (testing "should not set ssl properties for streams if eenabled is set to false" (with-redefs [ssl-config (constantly {:enabled false :ssl-keystore-location "/some/location" @@ -304,10 +319,11 @@ (is (= group-id "foo")) (is (nil? ssl-ks-location)) (is (nil? ssl-ks-password))))) + (testing "ssl properties from streams config map overrides the ssl properties provided in [:ziggurat :ssl]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] (let [streams-config-map {:auto-offset-reset :latest :ssl-keystore-location "/some/different/location" :ssl-keystore-password "different-password"} @@ -318,14 +334,15 @@ (is (= auto-offset-reset "latest")) (is (= ssl-ks-location "/some/different/location")) (is (= ssl-ks-password "different-password"))))) + (testing "ssl properties create jaas template from the map provided in [:ziggurat :ssl :jaas]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password" - :mechanism "SCRAM-SHA-512" - :jaas {:username "myuser" - :password "mypassword" - :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :mechanism "SCRAM-SHA-512" + :jaas {:username "myuser" + :password "mypassword" + :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] (let [streams-config-map {:auto-offset-reset :latest} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") @@ -336,10 +353,11 @@ (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password")) (is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule")))))) + (testing "ssl properties DO NOT create jaas template if no value is provided for key sequence [:ziggurat :ssl :jaas]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] (let [streams-config-map {:auto-offset-reset :latest} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") @@ -350,6 +368,7 @@ (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password")) (is (nil? sasl-jaas-config))))) + (testing "sasl properties create jaas template from the map provided in [:ziggurat :sasl :jaas]" (with-redefs [sasl-config (constantly {:enabled true :protocol "SASL_PLAINTEXT" @@ -361,11 +380,24 @@ props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") sasl-jaas-config (.getProperty props "sasl.jaas.config") - sasl-protocol (.getProperty props "security.protocol") - sasl-mechanism (.getProperty props "sasl.mechanism")] + sasl-protocol (.getProperty props "security.protocol")] + (is (= auto-offset-reset "latest")) + (is (= sasl-protocol "SASL_PLAINTEXT")) + (is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule")))))) + + (testing "sasl properties create jaas template from the map provided in [:ziggurat :sasl :jaas] without username password" + (with-redefs [sasl-config (constantly {:enabled true + :protocol "SASL_PLAINTEXT" + :mechanism "SCRAM-SHA-256" + :jaas {:login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] + (let [streams-config-map {:auto-offset-reset :latest} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + sasl-jaas-config (.getProperty props "sasl.jaas.config") + sasl-protocol (.getProperty props "security.protocol")] (is (= auto-offset-reset "latest")) (is (= sasl-protocol "SASL_PLAINTEXT")) - (is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule")))))))) + (is (= sasl-jaas-config (create-jaas-properties "" "" "org.apache.kafka.common.security.scram.ScramLoginModule")))))))) (deftest test-set-property (testing "set-property with empty (with spaces) value"