diff --git a/storm-core/project.clj b/storm-core/project.clj index 0eaa6a3f0..76ac2694d 100644 --- a/storm-core/project.clj +++ b/storm-core/project.clj @@ -36,7 +36,8 @@ :target-path "target" :javac-options ["-target" "1.6" "-source" "1.6"] :profiles {:dev {:resource-paths ["src/dev"] - :dependencies [[org.mockito/mockito-all "1.9.5"]]} + :dependencies [[org.mockito/mockito-all "1.9.5"] + [org.clojure/clojure-contrib "1.2.0"]]} :release {} :lib {} } diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..7ce7664c2 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -400,6 +400,19 @@ (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) )) +(defn replace-childopts-tags-by-ids + [childopts worker-id storm-id port] + ( + let [replacement-map {"%ID%" (str port) + "%WORKER-ID%" (str worker-id) + "%STORM-ID%" (str storm-id) + "%WORKER-PORT%" (str port)}] + (if-not (nil? childopts) + (reduce (fn [string entry] + (apply clojure.string/replace string entry)) + childopts replacement-map) + nil) + )) (defmethod launch-worker :distributed [supervisor storm-id port worker-id] @@ -409,9 +422,8 @@ stormjar (supervisor-stormjar-path stormroot) storm-conf (read-supervisor-storm-conf conf storm-id) classpath (add-to-classpath (current-classpath) [stormjar]) - childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) - "%ID%" - (str port)) + childopts (replace-childopts-tags-by-ids (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) + worker-id storm-id port) logfilename (str "worker-" port ".log") command (str "java -server " childopts " -Djava.library.path=" (conf JAVA-LIBRARY-PATH) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 13e8d02be..25074aad8 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -433,7 +433,8 @@ public class Config extends HashMap { /** * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced - * with an identifier for this worker. + * with an identifier for this worker. Also, "%WORKER-ID%", "%STORM-ID%" and "%WORKER-PORT%" are + * replaced with appropriate runtime values for this worker. */ public static final String WORKER_CHILDOPTS = "worker.childopts"; public static final Object WORKER_CHILDOPTS_SCHEMA = String.class; diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 5b075fe2d..6f84f9779 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -1,5 +1,6 @@ (ns backtype.storm.supervisor-test (:use [clojure test]) + (:require [clojure.contrib [string :as contrib-str]]) (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) (:use [backtype.storm bootstrap testing]) (:use [backtype.storm.daemon common]) @@ -240,3 +241,93 @@ ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code ) +(defn found? [sub-str input-str] ( + contrib-str/substring? sub-str (str input-str) + )) +(defn not-found? [sub-str input-str] + (complement (found? sub-str input-str))) + +(deftest test-replace-childopts-tags-by-ids-happy-path + (testing "worker-launcher replaces ids in childopts" + (let [ worker-id "w-01" + storm-id "s-01" + port 9999 + childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log" + ] + (def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port)) + (is (not-found? "%WORKER-ID%" childopts-with-ids)) + (is (found? "w-01" childopts-with-ids)) + (is (not-found? "%STORM-ID%" childopts-with-ids)) + (is (found? "s-01" childopts-with-ids)) + (is (not-found? "%WORKER-PORT%" childopts-with-ids)) + (is (found? "-9999." childopts-with-ids)) + (is (not-found? "%ID%" childopts-with-ids)) + (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) + ))) + +(deftest test-replace-childopts-tags-by-ids-storm-id-alone + (testing "worker-launcher replaces ids in childopts" + (let [ worker-id "w-01" + storm-id "s-01" + port 9999 + childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%STORM-ID%.log"] + (def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port)) + (is (not-found? "%WORKER-ID%" childopts-with-ids)) + (is (not-found? "w-01" childopts-with-ids)) + (is (not-found? "%STORM-ID%" childopts-with-ids)) + (is (found? "s-01" childopts-with-ids)) + (is (not-found? "%WORKER-PORT%" childopts-with-ids)) + (is (not-found? "-9999." childopts-with-ids)) + (is (not-found? "%ID%" childopts-with-ids)) + (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) ))) + +(deftest test-replace-childopts-tags-by-ids-no-keys + (testing "worker-launcher has no ids to replace in childopts" + (let [ worker-id "w-01" + storm-id "s-01" + port 9999 + childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"] + (def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port)) + (is (not-found? "%WORKER-ID%" childopts-with-ids)) + (is (not-found? "w-01" childopts-with-ids)) + (is (not-found? "%STORM-ID%" childopts-with-ids)) + (is (not-found? "s-01" childopts-with-ids)) + (is (not-found? "%WORKER-PORT%" childopts-with-ids)) + (is (not-found? "-9999." childopts-with-ids)) + (is (not-found? "%ID%" childopts-with-ids)) + (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) ))) + +(deftest test-replace-childopts-tags-by-ids-nil-childopts + (testing "worker-launcher has nil childopts" + (let [ worker-id "w-01" + storm-id "s-01" + port 9999 + childopts nil] + (def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port)) + (is (not-found? "%WORKER-ID%" childopts-with-ids)) + (is (not-found? "w-01" childopts-with-ids)) + (is (not-found? "%STORM-ID%" childopts-with-ids)) + (is (not-found? "s-01" childopts-with-ids)) + (is (not-found? "%WORKER-PORT%" childopts-with-ids)) + (is (not-found? "-9999." childopts-with-ids)) + (is (not-found? "%ID%" childopts-with-ids)) + (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) + ))) + +(deftest test-replace-childopts-tags-by-ids-nil-ids + (testing "worker-launcher has nil ids" + (let [ worker-id nil + storm-id "s-01" + port 9999 + childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"] + (def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port)) + (is (not-found? "%WORKER-ID%" childopts-with-ids)) + (is (not-found? "w-01" childopts-with-ids)) + (is (not-found? "%STORM-ID%" childopts-with-ids)) + (is (found? "s-01" childopts-with-ids)) + (is (not-found? "%WORKER-PORT%" childopts-with-ids)) + (is (found? "-9999." childopts-with-ids)) + (is (not-found? "%ID%" childopts-with-ids)) + (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) + ))) +