diff --git a/conf/defaults.yaml b/conf/defaults.yaml index ede403450..54f5405b6 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -120,5 +120,7 @@ topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 +topology.update: false +topology.update.interval.secs: 10 dev.zookeeper.path: "/tmp/dev-storm-zookeeper" diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..0d6413a6f 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -191,6 +191,7 @@ (if (contains? executor-stats t) {t {:time-secs (:time-secs worker-hb) :uptime (:uptime worker-hb) + :topology-version (:topology-version worker-hb) :stats (get executor-stats t)}}))) (into {})))) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index c11074eb1..9f277db35 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -200,8 +200,16 @@ [conf id] (str (worker-root conf id) "/heartbeats")) +(defn worker-version-root + [conf id] + (str (worker-root conf id) "/version")) + ;; workers heartbeat here with pid and timestamp ;; if supervisor stops receiving heartbeat, it kills and restarts the process ;; in local mode, keep a global map of ids to threads for simulating process management (defn ^LocalState worker-state [conf id] (LocalState. (worker-heartbeats-root conf id))) + +(defn ^LocalState worker-version [conf id] + (LocalState. (worker-version-root conf id))) + diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index dd8b12f85..f513a00f5 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -41,6 +41,7 @@ (waiting? [this])) (def LS-WORKER-HEARTBEAT "worker-heartbeat") +(def LS-WORKER-VERSION "worker-version") ;; LocalState constants (def LS-ID "supervisor-id") @@ -57,6 +58,12 @@ ^long transferred ^long failed]) +(defn gen-topology-version [storm-id] + (str storm-id "-v" (current-time-string))) + +(defn get-topology-id [topology-version] + (.substring topology-version 0 (.lastIndexOf topology-version "-v"))) + (defn new-executor-stats [] (ExecutorStats. 0 0 0 0 0)) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index caac9963d..12bb7da03 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -158,6 +158,11 @@ (:old-status status)) }}) +(defn nil-to-unknown [x] + (if x + x + "unknown")) + (defn topology-status [nimbus storm-id] (-> nimbus :storm-cluster-state (.storm-base storm-id nil) :status)) @@ -701,7 +706,7 @@ (.assignSlots inimbus topologies)) )) -(defn- start-storm [nimbus storm-name storm-id topology-initial-status] +(defn- start-storm [nimbus storm-name storm-id topology-initial-status topology-version] {:pre [(#{:active :inactive} topology-initial-status)]} (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) @@ -713,7 +718,9 @@ storm-id (StormBase. storm-name (current-time-secs) - {:type topology-initial-status} + {:type topology-initial-status + :topology-version topology-version + :update-duration-secs 1} (storm-conf TOPOLOGY-WORKERS) num-executors)))) @@ -916,6 +923,7 @@ topology) (swap! (:submitted-count nimbus) inc) (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) + topology-version (gen-topology-version storm-id) storm-conf (normalize-conf conf (-> serializedConf @@ -938,7 +946,7 @@ (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] - (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))) + (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)) topology-version)) (mk-assignments nimbus))) (catch Throwable e (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") @@ -948,6 +956,44 @@ [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology (SubmitOptions. TopologyInitialStatus/ACTIVE))) + + (^void updateTopology + [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] + (validate-topology-name! storm-name) + (check-storm-active! nimbus storm-name true) + (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) + storm-name + (from-json serializedConf) + topology) + (let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name) + topology-version (gen-topology-version storm-id) + storm-conf (normalize-conf + conf + (-> serializedConf + from-json + (assoc STORM-ID storm-id) + (assoc TOPOLOGY-NAME storm-name)) + topology) + total-storm-conf (merge conf storm-conf) + topology (normalize-topology total-storm-conf topology) + topology (if (total-storm-conf TOPOLOGY-OPTIMIZE) + (optimize-topology topology) + topology) + storm-cluster-state (:storm-cluster-state nimbus)] + (system-topology! total-storm-conf topology) ;; this validates the structure of the topology + (log-message "Received topology update for " storm-name " with conf " storm-conf) + (locking (:submit-lock nimbus) + (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) + (let [assignment (.assignment-info storm-cluster-state storm-id nil) + storm-id->supervisors (:node->host assignment) + intervals (total-storm-conf TOPOLOGY-UPDATE-INTERVAL-SECS) + update-duration-secs (* (count storm-id->supervisors) intervals)] + (set-topology-status! nimbus storm-id + (merge (topology-status nimbus storm-id) + {:topology-version topology-version + :update-duration-secs update-duration-secs}))) + ) + )) (^void killTopology [this ^String name] (.killTopologyWithOpts this name (KillOptions.))) @@ -1077,7 +1123,8 @@ set count) (time-delta (:launch-time-secs base)) - (extract-status-str base)) + (extract-status-str base) + (nil-to-unknown (-> base :status :topology-version))) ))] (ClusterSummary. supervisor-summaries nimbus-uptime @@ -1105,7 +1152,9 @@ (-> executor first task->component) host port - (nil-to-zero (:uptime heartbeat))) + (nil-to-zero (:uptime heartbeat)) + (nil-to-unknown (:topology-version heartbeat)) + ) (.set_stats stats)) )) ] @@ -1115,6 +1164,7 @@ executor-summaries (extract-status-str base) errors + (nil-to-unknown (-> base :status :topology-version)) ) )) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..8e9e080f5 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -75,6 +75,20 @@ [id (read-worker-heartbeat conf id)])) )) +(defn read-worker-version [conf id] + (let [local-state (worker-version conf id)] + (.get local-state LS-WORKER-VERSION) + )) + +(defn read-worker-versions + "Returns map from worker id to topology-version" + [conf] + (let [ids (my-worker-ids conf)] + (into {} + (dofor [id ids] + [id (read-worker-version conf id)])) + )) + (defn matches-an-assignment? [worker-heartbeat assigned-executors] (let [local-assignment (assigned-executors (:port worker-heartbeat))] @@ -89,6 +103,8 @@ (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) id->heartbeat (read-worker-heartbeats conf) + id->version (read-worker-versions conf) + storm-id->topology-version (:storm-id->topology-version supervisor) approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] (into {} @@ -99,12 +115,20 @@ :disallowed (not hb) :not-started + (and + (@(:storm-id->update-time supervisor) (:storm-id hb)) + (< (@(:storm-id->update-time supervisor) (:storm-id hb)) (current-time-secs)) + (id->version id) + (not (= (@(:storm-id->topology-version supervisor) (:storm-id hb)) (id->version id)))) + :update (> (- now (:time-secs hb)) (conf SUPERVISOR-WORKER-TIMEOUT-SECS)) :timed-out true :valid)] - (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now) + (log-debug "Worker " id " is " state " version: " (id->version id) ": " (pr-str hb) " at supervisor time-secs " now) + (if (= :update state) + (log-message "Worker " id " is " state " update-time " (@(:storm-id->update-time supervisor) (:storm-id hb)) " topology-version " (id->version id) " to " (@(:storm-id->topology-version supervisor) (:storm-id hb)))) [id [state hb]] )) ))) @@ -139,6 +163,7 @@ (defn try-cleanup-worker [conf id] (try (rmr (worker-heartbeats-root conf id)) + (rmr (worker-version-root conf id)) ;; this avoids a race condition with worker or subprocess writing pid around same time (rmpath (worker-pids-root conf id)) (rmpath (worker-root conf id)) @@ -167,6 +192,8 @@ :active (atom true) :uptime (uptime-computer) :worker-thread-pids-atom (atom {}) + :storm-id->topology-version (atom {}) + :storm-id->update-time (atom {}) :storm-cluster-state (cluster/mk-storm-cluster-state conf) :local-state (supervisor-state conf) :supervisor-id (.getSupervisorId isupervisor) @@ -230,7 +257,9 @@ (wait-for-workers-launch conf (dofor [[port assignment] reassign-executors] - (let [id (new-worker-ids port)] + (let [id (new-worker-ids port) + topology-version (@(:storm-id->topology-version supervisor) (:storm-id assignment)) + topology-version (if topology-version topology-version (:storm-id assignment))] (log-message "Launching worker with assignment " (pr-str assignment) " for this supervisor " @@ -239,11 +268,14 @@ port " with id " id + " topology-version " + topology-version ) (launch-worker supervisor (:storm-id assignment) port - id) + id + topology-version) id))) )) @@ -281,18 +313,31 @@ ;; - should this be done separately from usual monitoring? ;; should we only download when topology is assigned to this supervisor? (doseq [[storm-id master-code-dir] storm-code-map] - (when (and (not (downloaded-storm-ids storm-id)) - (assigned-storm-ids storm-id)) - (log-message "Downloading code for storm id " - storm-id - " from " - master-code-dir) - (download-storm-code conf storm-id master-code-dir) - (log-message "Finished downloading code for storm id " - storm-id - " from " - master-code-dir) - )) + (let [storm-base (.storm-base (:storm-cluster-state supervisor) storm-id nil) + topology-version (-> storm-base :status :topology-version)] + (if-not storm-base + (log-warn "storm-id " storm-id " storm-base is nil") + (if-not topology-version + (log-warn "storm-id " storm-id " " topology-version " topology-version is nil") + (do + (when (and (not (downloaded-storm-ids topology-version)) + (assigned-storm-ids storm-id)) + (log-message "Downloading code for storm id " + storm-id + " from " + master-code-dir) + (download-storm-code conf storm-id master-code-dir topology-version) + (log-message "Finished downloading code for storm id " + storm-id + " from " + master-code-dir)) + (when-not (= (@(:storm-id->topology-version supervisor) storm-id) topology-version) + (let [rand (Random. (Utils/secureRandomLong)) + wait-time (.nextInt rand (-> storm-base :status :update-duration-secs)) + update-time (+ (current-time-secs) wait-time)] + (log-message storm-id " topology-version change from " (@(:storm-id->topology-version supervisor) storm-id) " to " topology-version " wait " wait-time " secs until " update-time " to do update restart") + (swap! (:storm-id->update-time supervisor) assoc storm-id update-time)) + (swap! (:storm-id->topology-version supervisor) assoc storm-id topology-version))))))) (log-debug "Writing new assignment " (pr-str new-assignment)) @@ -309,7 +354,8 @@ ;; synchronize-supervisor doesn't try to launch workers for which the ;; resources don't exist (doseq [storm-id downloaded-storm-ids] - (when-not (assigned-storm-ids storm-id) + (when-not (or (storm-code-map storm-id) + (storm-code-map (get-topology-id storm-id))) (log-message "Removing code for storm id " storm-id) (rmr (supervisor-stormdist-root conf storm-id)) @@ -387,10 +433,10 @@ ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir] + :distributed [conf storm-id master-code-dir topology-version] ;; Downloading to permanent location is atomic (let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid)) - stormroot (supervisor-stormdist-root conf storm-id)] + stormroot (supervisor-stormdist-root conf topology-version)] (FileUtils/forceMkdir (File. tmproot)) (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot)) @@ -402,12 +448,12 @@ (defmethod launch-worker - :distributed [supervisor storm-id port worker-id] + :distributed [supervisor storm-id port worker-id topology-version] (let [conf (:conf supervisor) storm-home (System/getProperty "storm.home") - stormroot (supervisor-stormdist-root conf storm-id) + stormroot (supervisor-stormdist-root conf topology-version) stormjar (supervisor-stormjar-path stormroot) - storm-conf (read-supervisor-storm-conf conf storm-id) + storm-conf (read-supervisor-storm-conf conf topology-version) classpath (add-to-classpath (current-classpath) [stormjar]) childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) "%ID%" @@ -423,7 +469,7 @@ " -Dworker.port=" port " -cp " classpath " backtype.storm.daemon.worker " (java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor) - " " port " " worker-id)] + " " port " " worker-id " " topology-version)] (log-message "Launching worker with command: " command) (launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)}) )) @@ -437,9 +483,10 @@ first )) (defmethod download-storm-code - :local [conf storm-id master-code-dir] - (let [stormroot (supervisor-stormdist-root conf storm-id)] - (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) + :local [conf storm-id master-code-dir topology-version] + (let [stormroot (supervisor-stormdist-root conf topology-version)] + (when-not (exists-file? stormroot) + (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))) (let [classloader (.getContextClassLoader (Thread/currentThread)) resources-jar (resources-jar) url (.getResource classloader RESOURCES-SUBDIR) @@ -457,7 +504,7 @@ ))) (defmethod launch-worker - :local [supervisor storm-id port worker-id] + :local [supervisor storm-id port worker-id topology-version] (let [conf (:conf supervisor) pid (uuid) worker (worker/mk-worker conf @@ -465,7 +512,8 @@ storm-id (:assignment-id supervisor) port - worker-id)] + worker-id + topology-version)] (psim/register-process pid worker) (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid) )) diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj index a517e6f43..1889469d1 100644 --- a/storm-core/src/clj/backtype/storm/daemon/task.clj +++ b/storm-core/src/clj/backtype/storm/daemon/task.clj @@ -21,7 +21,7 @@ (:component->stream->fields worker) (:storm-id worker) (supervisor-storm-resources-path - (supervisor-stormdist-root conf (:storm-id worker))) + (supervisor-stormdist-root conf (:topology-version worker))) (worker-pids-root conf (:worker-id worker)) (int %) (:port worker) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 5182027c2..c52cecca3 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -33,6 +33,7 @@ :executor-stats stats :uptime ((:uptime worker)) :time-secs (current-time-secs) + :topology-version (:topology-version worker) }] ;; do the zookeeper heartbeat (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) @@ -45,7 +46,8 @@ (:storm-id worker) (:executors worker) (:port worker)) - state (worker-state conf (:worker-id worker))] + state (worker-state conf (:worker-id worker)) + version (worker-version conf (:worker-id worker))] (log-debug "Doing heartbeat " (pr-str hb)) ;; do the local-file-system heartbeat. (.put state @@ -55,6 +57,12 @@ ) (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up. ; it shouldn't take supervisor 120 seconds between listing dir and reading it + (.put version + LS-WORKER-VERSION + (:topology-version worker) + false + ) + (.cleanup version 60) )) @@ -149,10 +157,10 @@ (halt-process! 20 "Error when processing an event") ))) -(defn worker-data [conf mq-context storm-id assignment-id port worker-id] +(defn worker-data [conf mq-context storm-id assignment-id port worker-id topology-version] (let [cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) - storm-conf (read-supervisor-storm-conf conf storm-id) + storm-conf (read-supervisor-storm-conf conf topology-version) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port)) transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) @@ -162,13 +170,14 @@ (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) (into {})) - topology (read-supervisor-topology conf storm-id)] + topology (read-supervisor-topology conf topology-version)] (recursive-map :conf conf :mq-context (if mq-context mq-context (TransportFactory/makeContext storm-conf)) :storm-id storm-id + :topology-version topology-version :assignment-id assignment-id :port port :worker-id worker-id @@ -326,7 +335,7 @@ ;; what about if there's inconsistency in assignments? -> but nimbus ;; should guarantee this consistency ;; TODO: consider doing worker heartbeating rather than task heartbeating to reduce the load on zookeeper -(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] +(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id topology-version] (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id " and conf " conf) (if-not (local-mode? conf) @@ -335,7 +344,7 @@ ;; process. supervisor will register it in this case (when (= :distributed (cluster-mode conf)) (touch (worker-pid-path conf worker-id (process-pid)))) - (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id) + (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id topology-version) heartbeat-fn #(do-heartbeat worker) ;; do this here so that the worker process dies if this fails ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on) @@ -433,7 +442,7 @@ :distributed [conf] (fn [] (halt-process! 1 "Worker died"))) -(defn -main [storm-id assignment-id port-str worker-id] +(defn -main [storm-id assignment-id port-str worker-id topology-version] (let [conf (read-storm-config)] (validate-distributed-mode! conf) - (mk-worker conf nil (java.net.URLDecoder/decode storm-id) assignment-id (Integer/parseInt port-str) worker-id))) + (mk-worker conf nil (java.net.URLDecoder/decode storm-id) assignment-id (Integer/parseInt port-str) worker-id topology-version))) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index e2397c25d..4e63b7e35 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -93,10 +93,11 @@ ;; make the id clickable ;; make the table sortable (sorted-table - ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"] + ["Name" "Id" "Version" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"] (for [^TopologySummary t summs] [(topology-link (.get_id t) (.get_name t)) (escape-html (.get_id t)) + (last (.split (.get_topology_version t) "-")) (.get_status t) (pretty-uptime-sec (.get_uptime_secs t)) (.get_num_workers t) @@ -299,12 +300,21 @@ (defn bolt-summary? [topology s] (= :bolt (executor-summary-type topology s))) +(defn update-percent [version executors] + (let [newest-executors (->> executors + (filter #(= version (last (.split (.get_topology_version %) "-")))))] + (str "" (quot (* (count newest-executors) 100) + (count executors)) " %") + )) + (defn topology-summary-table [^TopologyInfo summ] (let [executors (.get_executors summ) workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))] - (table ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"] + (table ["Name" "Id" "Version" "Update percent" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"] [[(escape-html (.get_name summ)) (escape-html (.get_id summ)) + (last (.split (.get_topology_version summ) "-")) + (update-percent (last (.split (.get_topology_version summ) "-")) executors) (.get_status summ) (pretty-uptime-sec (.get_uptime_secs summ)) (count workers) @@ -555,7 +565,7 @@ (defn spout-executor-table [topology-id executors window include-sys?] (sorted-table - ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" + ["Id" "Version" "Uptime" "Host" "Port" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"] (for [^ExecutorSummary e executors :let [stats (.get_stats e) @@ -566,6 +576,7 @@ swap-map-order (get window)))]] [(pretty-executor-info (.get_executor_info e)) + (last (.split (.get_topology_version e) "-")) (pretty-uptime-sec (.get_uptime_secs e)) (.get_host e) (worker-log-link (.get_host e) (.get_port e)) @@ -630,7 +641,7 @@ (defn bolt-executor-table [topology-id executors window include-sys?] (sorted-table - ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)" + ["Id" "Version" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"] (for [^ExecutorSummary e executors :let [stats (.get_stats e) @@ -641,6 +652,7 @@ swap-map-order (get window)))]] [(pretty-executor-info (.get_executor_info e)) + (last (.split (.get_topology_version e) "-")) (pretty-uptime-sec (.get_uptime_secs e)) (.get_host e) (worker-log-link (.get_host e) (.get_port e)) diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index 20636a809..c29478035 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -195,6 +195,9 @@ (defn current-time-millis [] (Time/currentTimeMillis)) +(defn current-time-string [] + (Time/currentTimeString)) + (defn clojurify-structure [s] (prewalk (fn [x] (cond (instance? Map x) (into {} x) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 89e17d27e..07c3f037e 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -726,6 +726,19 @@ public class Config extends HashMap { public static final String TOPOLOGY_NAME="topology.name"; public static final Object TOPOLOGY_NAME_SCHEMA = String.class; + /** + * Is update topology? true/false + */ + public static final String TOPOLOGY_UPDATE = "topology.update"; + public static final Object TOPOLOGY_UPDATE_SCHEMA = Boolean.class; + + /** + * topology update interval for each worker restart + */ + public static final String TOPOLOGY_UPDATE_INTERVAL_SECS = "topology.update.interval.secs"; + public static final Object TOPOLOGY_UPDATE_INTERVAL_SECS_SCHEMA = Number.class; + + /** * The root directory in ZooKeeper for metadata about TransactionalSpouts. */ diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java index 1417c8e9a..933fda018 100644 --- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java +++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java @@ -37,7 +37,7 @@ public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) { * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ - public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { + public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, NotAliveException, InvalidTopologyException { submitTopology(name, stormConf, topology, null); } @@ -53,7 +53,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ - public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { + public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, NotAliveException, InvalidTopologyException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } @@ -61,18 +61,44 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); + boolean topologyUpdate = false; + Object o = conf.get(Config.TOPOLOGY_UPDATE); + if (o != null) { + if (o instanceof Boolean) { + topologyUpdate = (Boolean)o; + } else if (o instanceof String) { + topologyUpdate = Boolean.valueOf((String)o); + } + } + try { String serConf = JSONValue.toJSONString(stormConf); if(localNimbus!=null) { - LOG.info("Submitting topology " + name + " in local mode"); - localNimbus.submitTopology(name, null, serConf, topology); + if (topologyUpdate) { + LOG.info("Updating topology " + name + " in local mode"); + localNimbus.updateTopology(name, null, serConf, topology); + } else { + LOG.info("Submitting topology " + name + " in local mode"); + localNimbus.submitTopology(name, null, serConf, topology); + } } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); - if(topologyNameExists(conf, name)) { - throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); + if (topologyUpdate) { + if (!topologyNameExists(conf, name)) { + throw new RuntimeException("Topology with name `" + name + "` not alive on cluster"); + } + + } else { + if (topologyNameExists(conf, name)) { + throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); + } } submitJar(conf); try { + if (topologyUpdate) { + LOG.info("Updating topology " + name + " in distributed mode with conf " + serConf); + client.getClient().updateTopology(name, submittedJar, serConf, topology); + } else { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); @@ -80,6 +106,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); } + } } catch(InvalidTopologyException e) { LOG.warn("Topology submission exception", e); throw e; @@ -90,7 +117,11 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo client.close(); } } - LOG.info("Finished submitting topology: " + name); + if (topologyUpdate) { + LOG.info("Finished Updating topology: " + name); + } else { + LOG.info("Finished submitting topology: " + name); + } } catch(TException e) { throw new RuntimeException(e); } diff --git a/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java b/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java index 1ab205026..31c437082 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java @@ -29,6 +29,7 @@ public class ExecutorSummary implements org.apache.thrift7.TBase byName = new HashMap(); @@ -70,6 +73,8 @@ public static _Fields findByThriftId(int fieldId) { return PORT; case 5: // UPTIME_SECS return UPTIME_SECS; + case 6: // TOPOLOGY_VERSION + return TOPOLOGY_VERSION; case 7: // STATS return STATS; default: @@ -129,6 +134,8 @@ public String getFieldName() { new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32))); tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift7.meta_data.FieldMetaData("uptime_secs", org.apache.thrift7.TFieldRequirementType.REQUIRED, new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32))); + tmpMap.put(_Fields.TOPOLOGY_VERSION, new org.apache.thrift7.meta_data.FieldMetaData("topology_version", org.apache.thrift7.TFieldRequirementType.REQUIRED, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); tmpMap.put(_Fields.STATS, new org.apache.thrift7.meta_data.FieldMetaData("stats", org.apache.thrift7.TFieldRequirementType.OPTIONAL, new org.apache.thrift7.meta_data.StructMetaData(org.apache.thrift7.protocol.TType.STRUCT, ExecutorStats.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); @@ -143,7 +150,8 @@ public ExecutorSummary( String component_id, String host, int port, - int uptime_secs) + int uptime_secs, + String topology_version) { this(); this.executor_info = executor_info; @@ -153,6 +161,7 @@ public ExecutorSummary( set_port_isSet(true); this.uptime_secs = uptime_secs; set_uptime_secs_isSet(true); + this.topology_version = topology_version; } /** @@ -172,6 +181,9 @@ public ExecutorSummary(ExecutorSummary other) { } this.port = other.port; this.uptime_secs = other.uptime_secs; + if (other.is_set_topology_version()) { + this.topology_version = other.topology_version; + } if (other.is_set_stats()) { this.stats = new ExecutorStats(other.stats); } @@ -190,6 +202,7 @@ public void clear() { this.port = 0; set_uptime_secs_isSet(false); this.uptime_secs = 0; + this.topology_version = null; this.stats = null; } @@ -306,6 +319,29 @@ public void set_uptime_secs_isSet(boolean value) { __isset_bit_vector.set(__UPTIME_SECS_ISSET_ID, value); } + public String get_topology_version() { + return this.topology_version; + } + + public void set_topology_version(String topology_version) { + this.topology_version = topology_version; + } + + public void unset_topology_version() { + this.topology_version = null; + } + + /** Returns true if field topology_version is set (has been assigned a value) and false otherwise */ + public boolean is_set_topology_version() { + return this.topology_version != null; + } + + public void set_topology_version_isSet(boolean value) { + if (!value) { + this.topology_version = null; + } + } + public ExecutorStats get_stats() { return this.stats; } @@ -371,6 +407,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TOPOLOGY_VERSION: + if (value == null) { + unset_topology_version(); + } else { + set_topology_version((String)value); + } + break; + case STATS: if (value == null) { unset_stats(); @@ -399,6 +443,9 @@ public Object getFieldValue(_Fields field) { case UPTIME_SECS: return Integer.valueOf(get_uptime_secs()); + case TOPOLOGY_VERSION: + return get_topology_version(); + case STATS: return get_stats(); @@ -423,6 +470,8 @@ public boolean isSet(_Fields field) { return is_set_port(); case UPTIME_SECS: return is_set_uptime_secs(); + case TOPOLOGY_VERSION: + return is_set_topology_version(); case STATS: return is_set_stats(); } @@ -487,6 +536,15 @@ public boolean equals(ExecutorSummary that) { return false; } + boolean this_present_topology_version = true && this.is_set_topology_version(); + boolean that_present_topology_version = true && that.is_set_topology_version(); + if (this_present_topology_version || that_present_topology_version) { + if (!(this_present_topology_version && that_present_topology_version)) + return false; + if (!this.topology_version.equals(that.topology_version)) + return false; + } + boolean this_present_stats = true && this.is_set_stats(); boolean that_present_stats = true && that.is_set_stats(); if (this_present_stats || that_present_stats) { @@ -528,6 +586,11 @@ public int hashCode() { if (present_uptime_secs) builder.append(uptime_secs); + boolean present_topology_version = true && (is_set_topology_version()); + builder.append(present_topology_version); + if (present_topology_version) + builder.append(topology_version); + boolean present_stats = true && (is_set_stats()); builder.append(present_stats); if (present_stats) @@ -594,6 +657,16 @@ public int compareTo(ExecutorSummary other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_topology_version()).compareTo(typedOther.is_set_topology_version()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_topology_version()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology_version, typedOther.topology_version); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = Boolean.valueOf(is_set_stats()).compareTo(typedOther.is_set_stats()); if (lastComparison != 0) { return lastComparison; @@ -659,6 +732,13 @@ public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache. org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 6: // TOPOLOGY_VERSION + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.topology_version = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; case 7: // STATS if (field.type == org.apache.thrift7.protocol.TType.STRUCT) { this.stats = new ExecutorStats(); @@ -701,6 +781,11 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC); oprot.writeI32(this.uptime_secs); oprot.writeFieldEnd(); + if (this.topology_version != null) { + oprot.writeFieldBegin(TOPOLOGY_VERSION_FIELD_DESC); + oprot.writeString(this.topology_version); + oprot.writeFieldEnd(); + } if (this.stats != null) { if (is_set_stats()) { oprot.writeFieldBegin(STATS_FIELD_DESC); @@ -748,6 +833,14 @@ public String toString() { sb.append("uptime_secs:"); sb.append(this.uptime_secs); first = false; + if (!first) sb.append(", "); + sb.append("topology_version:"); + if (this.topology_version == null) { + sb.append("null"); + } else { + sb.append(this.topology_version); + } + first = false; if (is_set_stats()) { if (!first) sb.append(", "); sb.append("stats:"); @@ -784,6 +877,10 @@ public void validate() throws org.apache.thrift7.TException { throw new org.apache.thrift7.protocol.TProtocolException("Required field 'uptime_secs' is unset! Struct:" + toString()); } + if (!is_set_topology_version()) { + throw new org.apache.thrift7.protocol.TProtocolException("Required field 'topology_version' is unset! Struct:" + toString()); + } + } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { diff --git a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java index 6a8592a84..0477705d5 100644 --- a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java +++ b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java @@ -29,6 +29,8 @@ public interface Iface { public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException; + public void updateTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException; + public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException; public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException; @@ -69,6 +71,8 @@ public interface AsyncIface { public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; + public void updateTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; + public void killTopology(String name, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; public void killTopologyWithOpts(String name, KillOptions options, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; @@ -182,6 +186,35 @@ public void recv_submitTopologyWithOpts() throws AlreadyAliveException, InvalidT return; } + public void updateTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException + { + send_updateTopology(name, uploadedJarLocation, jsonConf, topology); + recv_updateTopology(); + } + + public void send_updateTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift7.TException + { + updateTopology_args args = new updateTopology_args(); + args.set_name(name); + args.set_uploadedJarLocation(uploadedJarLocation); + args.set_jsonConf(jsonConf); + args.set_topology(topology); + sendBase("updateTopology", args); + } + + public void recv_updateTopology() throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException + { + updateTopology_result result = new updateTopology_result(); + receiveBase(result, "updateTopology"); + if (result.e != null) { + throw result.e; + } + if (result.ite != null) { + throw result.ite; + } + return; + } + public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException { send_killTopology(name); @@ -662,6 +695,47 @@ public void getResult() throws AlreadyAliveException, InvalidTopologyException, } } + public void updateTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException { + checkReady(); + updateTopology_call method_call = new updateTopology_call(name, uploadedJarLocation, jsonConf, topology, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class updateTopology_call extends org.apache.thrift7.async.TAsyncMethodCall { + private String name; + private String uploadedJarLocation; + private String jsonConf; + private StormTopology topology; + public updateTopology_call(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, org.apache.thrift7.async.AsyncMethodCallback resultHandler, org.apache.thrift7.async.TAsyncClient client, org.apache.thrift7.protocol.TProtocolFactory protocolFactory, org.apache.thrift7.transport.TNonblockingTransport transport) throws org.apache.thrift7.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.name = name; + this.uploadedJarLocation = uploadedJarLocation; + this.jsonConf = jsonConf; + this.topology = topology; + } + + public void write_args(org.apache.thrift7.protocol.TProtocol prot) throws org.apache.thrift7.TException { + prot.writeMessageBegin(new org.apache.thrift7.protocol.TMessage("updateTopology", org.apache.thrift7.protocol.TMessageType.CALL, 0)); + updateTopology_args args = new updateTopology_args(); + args.set_name(name); + args.set_uploadedJarLocation(uploadedJarLocation); + args.set_jsonConf(jsonConf); + args.set_topology(topology); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException { + if (getState() != org.apache.thrift7.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift7.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift7.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift7.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_updateTopology(); + } + } + public void killTopology(String name, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException { checkReady(); killTopology_call method_call = new killTopology_call(name, resultHandler, this, ___protocolFactory, ___transport); @@ -1189,6 +1263,7 @@ protected Processor(I iface, Map Map> getProcessMap(Map> processMap) { processMap.put("submitTopology", new submitTopology()); processMap.put("submitTopologyWithOpts", new submitTopologyWithOpts()); + processMap.put("updateTopology", new updateTopology()); processMap.put("killTopology", new killTopology()); processMap.put("killTopologyWithOpts", new killTopologyWithOpts()); processMap.put("activate", new activate()); @@ -1252,6 +1327,28 @@ protected submitTopologyWithOpts_result getResult(I iface, submitTopologyWithOpt } } + private static class updateTopology extends org.apache.thrift7.ProcessFunction { + public updateTopology() { + super("updateTopology"); + } + + protected updateTopology_args getEmptyArgsInstance() { + return new updateTopology_args(); + } + + protected updateTopology_result getResult(I iface, updateTopology_args args) throws org.apache.thrift7.TException { + updateTopology_result result = new updateTopology_result(); + try { + iface.updateTopology(args.name, args.uploadedJarLocation, args.jsonConf, args.topology); + } catch (NotAliveException e) { + result.e = e; + } catch (InvalidTopologyException ite) { + result.ite = ite; + } + return result; + } + } + private static class killTopology extends org.apache.thrift7.ProcessFunction { public killTopology() { super("killTopology"); @@ -3592,6 +3689,981 @@ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException } + public static class updateTopology_args implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("updateTopology_args"); + + private static final org.apache.thrift7.protocol.TField NAME_FIELD_DESC = new org.apache.thrift7.protocol.TField("name", org.apache.thrift7.protocol.TType.STRING, (short)1); + private static final org.apache.thrift7.protocol.TField UPLOADED_JAR_LOCATION_FIELD_DESC = new org.apache.thrift7.protocol.TField("uploadedJarLocation", org.apache.thrift7.protocol.TType.STRING, (short)2); + private static final org.apache.thrift7.protocol.TField JSON_CONF_FIELD_DESC = new org.apache.thrift7.protocol.TField("jsonConf", org.apache.thrift7.protocol.TType.STRING, (short)3); + private static final org.apache.thrift7.protocol.TField TOPOLOGY_FIELD_DESC = new org.apache.thrift7.protocol.TField("topology", org.apache.thrift7.protocol.TType.STRUCT, (short)4); + + private String name; // required + private String uploadedJarLocation; // required + private String jsonConf; // required + private StormTopology topology; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift7.TFieldIdEnum { + NAME((short)1, "name"), + UPLOADED_JAR_LOCATION((short)2, "uploadedJarLocation"), + JSON_CONF((short)3, "jsonConf"), + TOPOLOGY((short)4, "topology"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // NAME + return NAME; + case 2: // UPLOADED_JAR_LOCATION + return UPLOADED_JAR_LOCATION; + case 3: // JSON_CONF + return JSON_CONF; + case 4: // TOPOLOGY + return TOPOLOGY; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + + public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.NAME, new org.apache.thrift7.meta_data.FieldMetaData("name", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.UPLOADED_JAR_LOCATION, new org.apache.thrift7.meta_data.FieldMetaData("uploadedJarLocation", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.JSON_CONF, new org.apache.thrift7.meta_data.FieldMetaData("jsonConf", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.TOPOLOGY, new org.apache.thrift7.meta_data.FieldMetaData("topology", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.StructMetaData(org.apache.thrift7.protocol.TType.STRUCT, StormTopology.class))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(updateTopology_args.class, metaDataMap); + } + + public updateTopology_args() { + } + + public updateTopology_args( + String name, + String uploadedJarLocation, + String jsonConf, + StormTopology topology) + { + this(); + this.name = name; + this.uploadedJarLocation = uploadedJarLocation; + this.jsonConf = jsonConf; + this.topology = topology; + } + + /** + * Performs a deep copy on other. + */ + public updateTopology_args(updateTopology_args other) { + if (other.is_set_name()) { + this.name = other.name; + } + if (other.is_set_uploadedJarLocation()) { + this.uploadedJarLocation = other.uploadedJarLocation; + } + if (other.is_set_jsonConf()) { + this.jsonConf = other.jsonConf; + } + if (other.is_set_topology()) { + this.topology = new StormTopology(other.topology); + } + } + + public updateTopology_args deepCopy() { + return new updateTopology_args(this); + } + + @Override + public void clear() { + this.name = null; + this.uploadedJarLocation = null; + this.jsonConf = null; + this.topology = null; + } + + public String get_name() { + return this.name; + } + + public void set_name(String name) { + this.name = name; + } + + public void unset_name() { + this.name = null; + } + + /** Returns true if field name is set (has been assigned a value) and false otherwise */ + public boolean is_set_name() { + return this.name != null; + } + + public void set_name_isSet(boolean value) { + if (!value) { + this.name = null; + } + } + + public String get_uploadedJarLocation() { + return this.uploadedJarLocation; + } + + public void set_uploadedJarLocation(String uploadedJarLocation) { + this.uploadedJarLocation = uploadedJarLocation; + } + + public void unset_uploadedJarLocation() { + this.uploadedJarLocation = null; + } + + /** Returns true if field uploadedJarLocation is set (has been assigned a value) and false otherwise */ + public boolean is_set_uploadedJarLocation() { + return this.uploadedJarLocation != null; + } + + public void set_uploadedJarLocation_isSet(boolean value) { + if (!value) { + this.uploadedJarLocation = null; + } + } + + public String get_jsonConf() { + return this.jsonConf; + } + + public void set_jsonConf(String jsonConf) { + this.jsonConf = jsonConf; + } + + public void unset_jsonConf() { + this.jsonConf = null; + } + + /** Returns true if field jsonConf is set (has been assigned a value) and false otherwise */ + public boolean is_set_jsonConf() { + return this.jsonConf != null; + } + + public void set_jsonConf_isSet(boolean value) { + if (!value) { + this.jsonConf = null; + } + } + + public StormTopology get_topology() { + return this.topology; + } + + public void set_topology(StormTopology topology) { + this.topology = topology; + } + + public void unset_topology() { + this.topology = null; + } + + /** Returns true if field topology is set (has been assigned a value) and false otherwise */ + public boolean is_set_topology() { + return this.topology != null; + } + + public void set_topology_isSet(boolean value) { + if (!value) { + this.topology = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case NAME: + if (value == null) { + unset_name(); + } else { + set_name((String)value); + } + break; + + case UPLOADED_JAR_LOCATION: + if (value == null) { + unset_uploadedJarLocation(); + } else { + set_uploadedJarLocation((String)value); + } + break; + + case JSON_CONF: + if (value == null) { + unset_jsonConf(); + } else { + set_jsonConf((String)value); + } + break; + + case TOPOLOGY: + if (value == null) { + unset_topology(); + } else { + set_topology((StormTopology)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case NAME: + return get_name(); + + case UPLOADED_JAR_LOCATION: + return get_uploadedJarLocation(); + + case JSON_CONF: + return get_jsonConf(); + + case TOPOLOGY: + return get_topology(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case NAME: + return is_set_name(); + case UPLOADED_JAR_LOCATION: + return is_set_uploadedJarLocation(); + case JSON_CONF: + return is_set_jsonConf(); + case TOPOLOGY: + return is_set_topology(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof updateTopology_args) + return this.equals((updateTopology_args)that); + return false; + } + + public boolean equals(updateTopology_args that) { + if (that == null) + return false; + + boolean this_present_name = true && this.is_set_name(); + boolean that_present_name = true && that.is_set_name(); + if (this_present_name || that_present_name) { + if (!(this_present_name && that_present_name)) + return false; + if (!this.name.equals(that.name)) + return false; + } + + boolean this_present_uploadedJarLocation = true && this.is_set_uploadedJarLocation(); + boolean that_present_uploadedJarLocation = true && that.is_set_uploadedJarLocation(); + if (this_present_uploadedJarLocation || that_present_uploadedJarLocation) { + if (!(this_present_uploadedJarLocation && that_present_uploadedJarLocation)) + return false; + if (!this.uploadedJarLocation.equals(that.uploadedJarLocation)) + return false; + } + + boolean this_present_jsonConf = true && this.is_set_jsonConf(); + boolean that_present_jsonConf = true && that.is_set_jsonConf(); + if (this_present_jsonConf || that_present_jsonConf) { + if (!(this_present_jsonConf && that_present_jsonConf)) + return false; + if (!this.jsonConf.equals(that.jsonConf)) + return false; + } + + boolean this_present_topology = true && this.is_set_topology(); + boolean that_present_topology = true && that.is_set_topology(); + if (this_present_topology || that_present_topology) { + if (!(this_present_topology && that_present_topology)) + return false; + if (!this.topology.equals(that.topology)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_name = true && (is_set_name()); + builder.append(present_name); + if (present_name) + builder.append(name); + + boolean present_uploadedJarLocation = true && (is_set_uploadedJarLocation()); + builder.append(present_uploadedJarLocation); + if (present_uploadedJarLocation) + builder.append(uploadedJarLocation); + + boolean present_jsonConf = true && (is_set_jsonConf()); + builder.append(present_jsonConf); + if (present_jsonConf) + builder.append(jsonConf); + + boolean present_topology = true && (is_set_topology()); + builder.append(present_topology); + if (present_topology) + builder.append(topology); + + return builder.toHashCode(); + } + + public int compareTo(updateTopology_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + updateTopology_args typedOther = (updateTopology_args)other; + + lastComparison = Boolean.valueOf(is_set_name()).compareTo(typedOther.is_set_name()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_name()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.name, typedOther.name); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_uploadedJarLocation()).compareTo(typedOther.is_set_uploadedJarLocation()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_uploadedJarLocation()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.uploadedJarLocation, typedOther.uploadedJarLocation); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_jsonConf()).compareTo(typedOther.is_set_jsonConf()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_jsonConf()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.jsonConf, typedOther.jsonConf); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_topology()).compareTo(typedOther.is_set_topology()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_topology()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology, typedOther.topology); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException { + org.apache.thrift7.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift7.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // NAME + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.name = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // UPLOADED_JAR_LOCATION + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.uploadedJarLocation = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 3: // JSON_CONF + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.jsonConf = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 4: // TOPOLOGY + if (field.type == org.apache.thrift7.protocol.TType.STRUCT) { + this.topology = new StormTopology(); + this.topology.read(iprot); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.name != null) { + oprot.writeFieldBegin(NAME_FIELD_DESC); + oprot.writeString(this.name); + oprot.writeFieldEnd(); + } + if (this.uploadedJarLocation != null) { + oprot.writeFieldBegin(UPLOADED_JAR_LOCATION_FIELD_DESC); + oprot.writeString(this.uploadedJarLocation); + oprot.writeFieldEnd(); + } + if (this.jsonConf != null) { + oprot.writeFieldBegin(JSON_CONF_FIELD_DESC); + oprot.writeString(this.jsonConf); + oprot.writeFieldEnd(); + } + if (this.topology != null) { + oprot.writeFieldBegin(TOPOLOGY_FIELD_DESC); + this.topology.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("updateTopology_args("); + boolean first = true; + + sb.append("name:"); + if (this.name == null) { + sb.append("null"); + } else { + sb.append(this.name); + } + first = false; + if (!first) sb.append(", "); + sb.append("uploadedJarLocation:"); + if (this.uploadedJarLocation == null) { + sb.append("null"); + } else { + sb.append(this.uploadedJarLocation); + } + first = false; + if (!first) sb.append(", "); + sb.append("jsonConf:"); + if (this.jsonConf == null) { + sb.append("null"); + } else { + sb.append(this.jsonConf); + } + first = false; + if (!first) sb.append(", "); + sb.append("topology:"); + if (this.topology == null) { + sb.append("null"); + } else { + sb.append(this.topology); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift7.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + } + + public static class updateTopology_result implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("updateTopology_result"); + + private static final org.apache.thrift7.protocol.TField E_FIELD_DESC = new org.apache.thrift7.protocol.TField("e", org.apache.thrift7.protocol.TType.STRUCT, (short)1); + private static final org.apache.thrift7.protocol.TField ITE_FIELD_DESC = new org.apache.thrift7.protocol.TField("ite", org.apache.thrift7.protocol.TType.STRUCT, (short)2); + + private NotAliveException e; // required + private InvalidTopologyException ite; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift7.TFieldIdEnum { + E((short)1, "e"), + ITE((short)2, "ite"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // E + return E; + case 2: // ITE + return ITE; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + + public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.E, new org.apache.thrift7.meta_data.FieldMetaData("e", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRUCT))); + tmpMap.put(_Fields.ITE, new org.apache.thrift7.meta_data.FieldMetaData("ite", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRUCT))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(updateTopology_result.class, metaDataMap); + } + + public updateTopology_result() { + } + + public updateTopology_result( + NotAliveException e, + InvalidTopologyException ite) + { + this(); + this.e = e; + this.ite = ite; + } + + /** + * Performs a deep copy on other. + */ + public updateTopology_result(updateTopology_result other) { + if (other.is_set_e()) { + this.e = new NotAliveException(other.e); + } + if (other.is_set_ite()) { + this.ite = new InvalidTopologyException(other.ite); + } + } + + public updateTopology_result deepCopy() { + return new updateTopology_result(this); + } + + @Override + public void clear() { + this.e = null; + this.ite = null; + } + + public NotAliveException get_e() { + return this.e; + } + + public void set_e(NotAliveException e) { + this.e = e; + } + + public void unset_e() { + this.e = null; + } + + /** Returns true if field e is set (has been assigned a value) and false otherwise */ + public boolean is_set_e() { + return this.e != null; + } + + public void set_e_isSet(boolean value) { + if (!value) { + this.e = null; + } + } + + public InvalidTopologyException get_ite() { + return this.ite; + } + + public void set_ite(InvalidTopologyException ite) { + this.ite = ite; + } + + public void unset_ite() { + this.ite = null; + } + + /** Returns true if field ite is set (has been assigned a value) and false otherwise */ + public boolean is_set_ite() { + return this.ite != null; + } + + public void set_ite_isSet(boolean value) { + if (!value) { + this.ite = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case E: + if (value == null) { + unset_e(); + } else { + set_e((NotAliveException)value); + } + break; + + case ITE: + if (value == null) { + unset_ite(); + } else { + set_ite((InvalidTopologyException)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case E: + return get_e(); + + case ITE: + return get_ite(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case E: + return is_set_e(); + case ITE: + return is_set_ite(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof updateTopology_result) + return this.equals((updateTopology_result)that); + return false; + } + + public boolean equals(updateTopology_result that) { + if (that == null) + return false; + + boolean this_present_e = true && this.is_set_e(); + boolean that_present_e = true && that.is_set_e(); + if (this_present_e || that_present_e) { + if (!(this_present_e && that_present_e)) + return false; + if (!this.e.equals(that.e)) + return false; + } + + boolean this_present_ite = true && this.is_set_ite(); + boolean that_present_ite = true && that.is_set_ite(); + if (this_present_ite || that_present_ite) { + if (!(this_present_ite && that_present_ite)) + return false; + if (!this.ite.equals(that.ite)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_e = true && (is_set_e()); + builder.append(present_e); + if (present_e) + builder.append(e); + + boolean present_ite = true && (is_set_ite()); + builder.append(present_ite); + if (present_ite) + builder.append(ite); + + return builder.toHashCode(); + } + + public int compareTo(updateTopology_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + updateTopology_result typedOther = (updateTopology_result)other; + + lastComparison = Boolean.valueOf(is_set_e()).compareTo(typedOther.is_set_e()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_e()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.e, typedOther.e); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_ite()).compareTo(typedOther.is_set_ite()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_ite()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.ite, typedOther.ite); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException { + org.apache.thrift7.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift7.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // E + if (field.type == org.apache.thrift7.protocol.TType.STRUCT) { + this.e = new NotAliveException(); + this.e.read(iprot); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // ITE + if (field.type == org.apache.thrift7.protocol.TType.STRUCT) { + this.ite = new InvalidTopologyException(); + this.ite.read(iprot); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException { + oprot.writeStructBegin(STRUCT_DESC); + + if (this.is_set_e()) { + oprot.writeFieldBegin(E_FIELD_DESC); + this.e.write(oprot); + oprot.writeFieldEnd(); + } else if (this.is_set_ite()) { + oprot.writeFieldBegin(ITE_FIELD_DESC); + this.ite.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("updateTopology_result("); + boolean first = true; + + sb.append("e:"); + if (this.e == null) { + sb.append("null"); + } else { + sb.append(this.e); + } + first = false; + if (!first) sb.append(", "); + sb.append("ite:"); + if (this.ite == null) { + sb.append("null"); + } else { + sb.append(this.ite); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift7.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + } + public static class killTopology_args implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("killTopology_args"); diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java index 9120b7c4d..3b224b854 100644 --- a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java +++ b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java @@ -30,6 +30,7 @@ public class TopologyInfo implements org.apache.thrift7.TBase executors; // required private String status; // required private Map> errors; // required + private String topology_version; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift7.TFieldIdEnum { @@ -45,7 +47,8 @@ public enum _Fields implements org.apache.thrift7.TFieldIdEnum { UPTIME_SECS((short)3, "uptime_secs"), EXECUTORS((short)4, "executors"), STATUS((short)5, "status"), - ERRORS((short)6, "errors"); + ERRORS((short)6, "errors"), + TOPOLOGY_VERSION((short)7, "topology_version"); private static final Map byName = new HashMap(); @@ -72,6 +75,8 @@ public static _Fields findByThriftId(int fieldId) { return STATUS; case 6: // ERRORS return ERRORS; + case 7: // TOPOLOGY_VERSION + return TOPOLOGY_VERSION; default: return null; } @@ -134,6 +139,8 @@ public String getFieldName() { new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING), new org.apache.thrift7.meta_data.ListMetaData(org.apache.thrift7.protocol.TType.LIST, new org.apache.thrift7.meta_data.StructMetaData(org.apache.thrift7.protocol.TType.STRUCT, ErrorInfo.class))))); + tmpMap.put(_Fields.TOPOLOGY_VERSION, new org.apache.thrift7.meta_data.FieldMetaData("topology_version", org.apache.thrift7.TFieldRequirementType.REQUIRED, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(TopologyInfo.class, metaDataMap); } @@ -147,7 +154,8 @@ public TopologyInfo( int uptime_secs, List executors, String status, - Map> errors) + Map> errors, + String topology_version) { this(); this.id = id; @@ -157,6 +165,7 @@ public TopologyInfo( this.executors = executors; this.status = status; this.errors = errors; + this.topology_version = topology_version; } /** @@ -200,6 +209,9 @@ public TopologyInfo(TopologyInfo other) { } this.errors = __this__errors; } + if (other.is_set_topology_version()) { + this.topology_version = other.topology_version; + } } public TopologyInfo deepCopy() { @@ -215,6 +227,7 @@ public void clear() { this.executors = null; this.status = null; this.errors = null; + this.topology_version = null; } public String get_id() { @@ -380,6 +393,29 @@ public void set_errors_isSet(boolean value) { } } + public String get_topology_version() { + return this.topology_version; + } + + public void set_topology_version(String topology_version) { + this.topology_version = topology_version; + } + + public void unset_topology_version() { + this.topology_version = null; + } + + /** Returns true if field topology_version is set (has been assigned a value) and false otherwise */ + public boolean is_set_topology_version() { + return this.topology_version != null; + } + + public void set_topology_version_isSet(boolean value) { + if (!value) { + this.topology_version = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case ID: @@ -430,6 +466,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TOPOLOGY_VERSION: + if (value == null) { + unset_topology_version(); + } else { + set_topology_version((String)value); + } + break; + } } @@ -453,6 +497,9 @@ public Object getFieldValue(_Fields field) { case ERRORS: return get_errors(); + case TOPOLOGY_VERSION: + return get_topology_version(); + } throw new IllegalStateException(); } @@ -476,6 +523,8 @@ public boolean isSet(_Fields field) { return is_set_status(); case ERRORS: return is_set_errors(); + case TOPOLOGY_VERSION: + return is_set_topology_version(); } throw new IllegalStateException(); } @@ -547,6 +596,15 @@ public boolean equals(TopologyInfo that) { return false; } + boolean this_present_topology_version = true && this.is_set_topology_version(); + boolean that_present_topology_version = true && that.is_set_topology_version(); + if (this_present_topology_version || that_present_topology_version) { + if (!(this_present_topology_version && that_present_topology_version)) + return false; + if (!this.topology_version.equals(that.topology_version)) + return false; + } + return true; } @@ -584,6 +642,11 @@ public int hashCode() { if (present_errors) builder.append(errors); + boolean present_topology_version = true && (is_set_topology_version()); + builder.append(present_topology_version); + if (present_topology_version) + builder.append(topology_version); + return builder.toHashCode(); } @@ -655,6 +718,16 @@ public int compareTo(TopologyInfo other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_topology_version()).compareTo(typedOther.is_set_topology_version()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_topology_version()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology_version, typedOther.topology_version); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -749,6 +822,13 @@ public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache. org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 7: // TOPOLOGY_VERSION + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.topology_version = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -812,6 +892,11 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache } oprot.writeFieldEnd(); } + if (this.topology_version != null) { + oprot.writeFieldBegin(TOPOLOGY_VERSION_FIELD_DESC); + oprot.writeString(this.topology_version); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -864,6 +949,14 @@ public String toString() { sb.append(this.errors); } first = false; + if (!first) sb.append(", "); + sb.append("topology_version:"); + if (this.topology_version == null) { + sb.append("null"); + } else { + sb.append(this.topology_version); + } + first = false; sb.append(")"); return sb.toString(); } @@ -894,6 +987,10 @@ public void validate() throws org.apache.thrift7.TException { throw new org.apache.thrift7.protocol.TProtocolException("Required field 'errors' is unset! Struct:" + toString()); } + if (!is_set_topology_version()) { + throw new org.apache.thrift7.protocol.TProtocolException("Required field 'topology_version' is unset! Struct:" + toString()); + } + } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java index 9bb16edaa..463b4c1be 100644 --- a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java @@ -31,6 +31,7 @@ public class TopologySummary implements org.apache.thrift7.TBase byName = new HashMap(); @@ -77,6 +80,8 @@ public static _Fields findByThriftId(int fieldId) { return UPTIME_SECS; case 7: // STATUS return STATUS; + case 8: // TOPOLOGY_VERSION + return TOPOLOGY_VERSION; default: return null; } @@ -140,6 +145,8 @@ public String getFieldName() { new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32))); tmpMap.put(_Fields.STATUS, new org.apache.thrift7.meta_data.FieldMetaData("status", org.apache.thrift7.TFieldRequirementType.REQUIRED, new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.TOPOLOGY_VERSION, new org.apache.thrift7.meta_data.FieldMetaData("topology_version", org.apache.thrift7.TFieldRequirementType.REQUIRED, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(TopologySummary.class, metaDataMap); } @@ -154,7 +161,8 @@ public TopologySummary( int num_executors, int num_workers, int uptime_secs, - String status) + String status, + String topology_version) { this(); this.id = id; @@ -168,6 +176,7 @@ public TopologySummary( this.uptime_secs = uptime_secs; set_uptime_secs_isSet(true); this.status = status; + this.topology_version = topology_version; } /** @@ -189,6 +198,9 @@ public TopologySummary(TopologySummary other) { if (other.is_set_status()) { this.status = other.status; } + if (other.is_set_topology_version()) { + this.topology_version = other.topology_version; + } } public TopologySummary deepCopy() { @@ -208,6 +220,7 @@ public void clear() { set_uptime_secs_isSet(false); this.uptime_secs = 0; this.status = null; + this.topology_version = null; } public String get_id() { @@ -367,6 +380,29 @@ public void set_status_isSet(boolean value) { } } + public String get_topology_version() { + return this.topology_version; + } + + public void set_topology_version(String topology_version) { + this.topology_version = topology_version; + } + + public void unset_topology_version() { + this.topology_version = null; + } + + /** Returns true if field topology_version is set (has been assigned a value) and false otherwise */ + public boolean is_set_topology_version() { + return this.topology_version != null; + } + + public void set_topology_version_isSet(boolean value) { + if (!value) { + this.topology_version = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case ID: @@ -425,6 +461,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TOPOLOGY_VERSION: + if (value == null) { + unset_topology_version(); + } else { + set_topology_version((String)value); + } + break; + } } @@ -451,6 +495,9 @@ public Object getFieldValue(_Fields field) { case STATUS: return get_status(); + case TOPOLOGY_VERSION: + return get_topology_version(); + } throw new IllegalStateException(); } @@ -476,6 +523,8 @@ public boolean isSet(_Fields field) { return is_set_uptime_secs(); case STATUS: return is_set_status(); + case TOPOLOGY_VERSION: + return is_set_topology_version(); } throw new IllegalStateException(); } @@ -556,6 +605,15 @@ public boolean equals(TopologySummary that) { return false; } + boolean this_present_topology_version = true && this.is_set_topology_version(); + boolean that_present_topology_version = true && that.is_set_topology_version(); + if (this_present_topology_version || that_present_topology_version) { + if (!(this_present_topology_version && that_present_topology_version)) + return false; + if (!this.topology_version.equals(that.topology_version)) + return false; + } + return true; } @@ -598,6 +656,11 @@ public int hashCode() { if (present_status) builder.append(status); + boolean present_topology_version = true && (is_set_topology_version()); + builder.append(present_topology_version); + if (present_topology_version) + builder.append(topology_version); + return builder.toHashCode(); } @@ -679,6 +742,16 @@ public int compareTo(TopologySummary other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_topology_version()).compareTo(typedOther.is_set_topology_version()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_topology_version()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology_version, typedOther.topology_version); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -749,6 +822,13 @@ public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache. org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 8: // TOPOLOGY_VERSION + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.topology_version = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -789,6 +869,11 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeString(this.status); oprot.writeFieldEnd(); } + if (this.topology_version != null) { + oprot.writeFieldBegin(TOPOLOGY_VERSION_FIELD_DESC); + oprot.writeString(this.topology_version); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -837,6 +922,14 @@ public String toString() { sb.append(this.status); } first = false; + if (!first) sb.append(", "); + sb.append("topology_version:"); + if (this.topology_version == null) { + sb.append("null"); + } else { + sb.append(this.topology_version); + } + first = false; sb.append(")"); return sb.toString(); } @@ -871,6 +964,10 @@ public void validate() throws org.apache.thrift7.TException { throw new org.apache.thrift7.protocol.TProtocolException("Required field 'status' is unset! Struct:" + toString()); } + if (!is_set_topology_version()) { + throw new org.apache.thrift7.protocol.TProtocolException("Required field 'topology_version' is unset! Struct:" + toString()); + } + } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { diff --git a/storm-core/src/jvm/backtype/storm/utils/Time.java b/storm-core/src/jvm/backtype/storm/utils/Time.java index 9550de102..39e997bcd 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Time.java +++ b/storm-core/src/jvm/backtype/storm/utils/Time.java @@ -4,12 +4,16 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Date; +import java.text.SimpleDateFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Time { public static Logger LOG = LoggerFactory.getLogger(Time.class); + + public static SimpleDateFormat DATE_yyyyMMddHHmmss = new SimpleDateFormat("yyyyMMddHHmmss"); private static AtomicBoolean simulating = new AtomicBoolean(false); //TODO: should probably use weak references here or something @@ -70,6 +74,10 @@ public static int currentTimeSecs() { return (int) (currentTimeMillis() / 1000); } + public static String currentTimeString() { + return DATE_yyyyMMddHHmmss.format(new Date(currentTimeMillis())); + } + public static void advanceTime(long ms) { if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms); diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote index 4b2ff041a..c5d9c2a40 100755 --- a/storm-core/src/py/storm/Nimbus-remote +++ b/storm-core/src/py/storm/Nimbus-remote @@ -23,6 +23,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print 'Functions:' print ' void submitTopology(string name, string uploadedJarLocation, string jsonConf, StormTopology topology)' print ' void submitTopologyWithOpts(string name, string uploadedJarLocation, string jsonConf, StormTopology topology, SubmitOptions options)' + print ' void updateTopology(string name, string uploadedJarLocation, string jsonConf, StormTopology topology)' print ' void killTopology(string name)' print ' void killTopologyWithOpts(string name, KillOptions options)' print ' void activate(string name)' @@ -101,6 +102,12 @@ elif cmd == 'submitTopologyWithOpts': sys.exit(1) pp.pprint(client.submitTopologyWithOpts(args[0],args[1],args[2],eval(args[3]),eval(args[4]),)) +elif cmd == 'updateTopology': + if len(args) != 4: + print 'updateTopology requires 4 args' + sys.exit(1) + pp.pprint(client.updateTopology(args[0],args[1],args[2],eval(args[3]),)) + elif cmd == 'killTopology': if len(args) != 1: print 'killTopology requires 1 args' diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py index cd535be73..e28405e93 100644 --- a/storm-core/src/py/storm/Nimbus.py +++ b/storm-core/src/py/storm/Nimbus.py @@ -37,6 +37,16 @@ def submitTopologyWithOpts(self, name, uploadedJarLocation, jsonConf, topology, """ pass + def updateTopology(self, name, uploadedJarLocation, jsonConf, topology): + """ + Parameters: + - name + - uploadedJarLocation + - jsonConf + - topology + """ + pass + def killTopology(self, name): """ Parameters: @@ -226,6 +236,44 @@ def recv_submitTopologyWithOpts(self, ): raise result.ite return + def updateTopology(self, name, uploadedJarLocation, jsonConf, topology): + """ + Parameters: + - name + - uploadedJarLocation + - jsonConf + - topology + """ + self.send_updateTopology(name, uploadedJarLocation, jsonConf, topology) + self.recv_updateTopology() + + def send_updateTopology(self, name, uploadedJarLocation, jsonConf, topology): + self._oprot.writeMessageBegin('updateTopology', TMessageType.CALL, self._seqid) + args = updateTopology_args() + args.name = name + args.uploadedJarLocation = uploadedJarLocation + args.jsonConf = jsonConf + args.topology = topology + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_updateTopology(self, ): + (fname, mtype, rseqid) = self._iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(self._iprot) + self._iprot.readMessageEnd() + raise x + result = updateTopology_result() + result.read(self._iprot) + self._iprot.readMessageEnd() + if result.e is not None: + raise result.e + if result.ite is not None: + raise result.ite + return + def killTopology(self, name): """ Parameters: @@ -710,6 +758,7 @@ def __init__(self, handler): self._processMap = {} self._processMap["submitTopology"] = Processor.process_submitTopology self._processMap["submitTopologyWithOpts"] = Processor.process_submitTopologyWithOpts + self._processMap["updateTopology"] = Processor.process_updateTopology self._processMap["killTopology"] = Processor.process_killTopology self._processMap["killTopologyWithOpts"] = Processor.process_killTopologyWithOpts self._processMap["activate"] = Processor.process_activate @@ -774,6 +823,22 @@ def process_submitTopologyWithOpts(self, seqid, iprot, oprot): oprot.writeMessageEnd() oprot.trans.flush() + def process_updateTopology(self, seqid, iprot, oprot): + args = updateTopology_args() + args.read(iprot) + iprot.readMessageEnd() + result = updateTopology_result() + try: + self._handler.updateTopology(args.name, args.uploadedJarLocation, args.jsonConf, args.topology) + except NotAliveException, e: + result.e = e + except InvalidTopologyException, ite: + result.ite = ite + oprot.writeMessageBegin("updateTopology", TMessageType.REPLY, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + def process_killTopology(self, seqid, iprot, oprot): args = killTopology_args() args.read(iprot) @@ -1349,6 +1414,183 @@ def __eq__(self, other): def __ne__(self, other): return not (self == other) +class updateTopology_args: + """ + Attributes: + - name + - uploadedJarLocation + - jsonConf + - topology + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'name', None, None, ), # 1 + (2, TType.STRING, 'uploadedJarLocation', None, None, ), # 2 + (3, TType.STRING, 'jsonConf', None, None, ), # 3 + (4, TType.STRUCT, 'topology', (StormTopology, StormTopology.thrift_spec), None, ), # 4 + ) + + def __hash__(self): + return 0 + hash(self.name) + hash(self.uploadedJarLocation) + hash(self.jsonConf) + hash(self.topology) + + def __init__(self, name=None, uploadedJarLocation=None, jsonConf=None, topology=None,): + self.name = name + self.uploadedJarLocation = uploadedJarLocation + self.jsonConf = jsonConf + self.topology = topology + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.name = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.uploadedJarLocation = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.jsonConf = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.topology = StormTopology() + self.topology.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('updateTopology_args') + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 1) + oprot.writeString(self.name.encode('utf-8')) + oprot.writeFieldEnd() + if self.uploadedJarLocation is not None: + oprot.writeFieldBegin('uploadedJarLocation', TType.STRING, 2) + oprot.writeString(self.uploadedJarLocation.encode('utf-8')) + oprot.writeFieldEnd() + if self.jsonConf is not None: + oprot.writeFieldBegin('jsonConf', TType.STRING, 3) + oprot.writeString(self.jsonConf.encode('utf-8')) + oprot.writeFieldEnd() + if self.topology is not None: + oprot.writeFieldBegin('topology', TType.STRUCT, 4) + self.topology.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class updateTopology_result: + """ + Attributes: + - e + - ite + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'e', (NotAliveException, NotAliveException.thrift_spec), None, ), # 1 + (2, TType.STRUCT, 'ite', (InvalidTopologyException, InvalidTopologyException.thrift_spec), None, ), # 2 + ) + + def __hash__(self): + return 0 + hash(self.e) + hash(self.ite) + + def __init__(self, e=None, ite=None,): + self.e = e + self.ite = ite + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.e = NotAliveException() + self.e.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRUCT: + self.ite = InvalidTopologyException() + self.ite.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('updateTopology_result') + if self.e is not None: + oprot.writeFieldBegin('e', TType.STRUCT, 1) + self.e.write(oprot) + oprot.writeFieldEnd() + if self.ite is not None: + oprot.writeFieldBegin('ite', TType.STRUCT, 2) + self.ite.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class killTopology_args: """ Attributes: diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index 2c0a50bd8..f5d5ee443 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -1485,6 +1485,7 @@ class TopologySummary: - num_workers - uptime_secs - status + - topology_version """ thrift_spec = ( @@ -1496,12 +1497,13 @@ class TopologySummary: (5, TType.I32, 'num_workers', None, None, ), # 5 (6, TType.I32, 'uptime_secs', None, None, ), # 6 (7, TType.STRING, 'status', None, None, ), # 7 + (8, TType.STRING, 'topology_version', None, None, ), # 8 ) def __hash__(self): - return 0 + hash(self.id) + hash(self.name) + hash(self.num_tasks) + hash(self.num_executors) + hash(self.num_workers) + hash(self.uptime_secs) + hash(self.status) + return 0 + hash(self.id) + hash(self.name) + hash(self.num_tasks) + hash(self.num_executors) + hash(self.num_workers) + hash(self.uptime_secs) + hash(self.status) + hash(self.topology_version) - def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None,): + def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None, topology_version=None,): self.id = id self.name = name self.num_tasks = num_tasks @@ -1509,6 +1511,7 @@ def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_w self.num_workers = num_workers self.uptime_secs = uptime_secs self.status = status + self.topology_version = topology_version def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -1554,6 +1557,11 @@ def read(self, iprot): self.status = iprot.readString().decode('utf-8') else: iprot.skip(ftype) + elif fid == 8: + if ftype == TType.STRING: + self.topology_version = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -1592,6 +1600,10 @@ def write(self, oprot): oprot.writeFieldBegin('status', TType.STRING, 7) oprot.writeString(self.status.encode('utf-8')) oprot.writeFieldEnd() + if self.topology_version is not None: + oprot.writeFieldBegin('topology_version', TType.STRING, 8) + oprot.writeString(self.topology_version.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -1610,6 +1622,8 @@ def validate(self): raise TProtocol.TProtocolException(message='Required field uptime_secs is unset!') if self.status is None: raise TProtocol.TProtocolException(message='Required field status is unset!') + if self.topology_version is None: + raise TProtocol.TProtocolException(message='Required field topology_version is unset!') return @@ -2612,6 +2626,7 @@ class ExecutorSummary: - host - port - uptime_secs + - topology_version - stats """ @@ -2622,19 +2637,20 @@ class ExecutorSummary: (3, TType.STRING, 'host', None, None, ), # 3 (4, TType.I32, 'port', None, None, ), # 4 (5, TType.I32, 'uptime_secs', None, None, ), # 5 - None, # 6 + (6, TType.STRING, 'topology_version', None, None, ), # 6 (7, TType.STRUCT, 'stats', (ExecutorStats, ExecutorStats.thrift_spec), None, ), # 7 ) def __hash__(self): - return 0 + hash(self.executor_info) + hash(self.component_id) + hash(self.host) + hash(self.port) + hash(self.uptime_secs) + hash(self.stats) + return 0 + hash(self.executor_info) + hash(self.component_id) + hash(self.host) + hash(self.port) + hash(self.uptime_secs) + hash(self.topology_version) + hash(self.stats) - def __init__(self, executor_info=None, component_id=None, host=None, port=None, uptime_secs=None, stats=None,): + def __init__(self, executor_info=None, component_id=None, host=None, port=None, uptime_secs=None, topology_version=None, stats=None,): self.executor_info = executor_info self.component_id = component_id self.host = host self.port = port self.uptime_secs = uptime_secs + self.topology_version = topology_version self.stats = stats def read(self, iprot): @@ -2672,6 +2688,11 @@ def read(self, iprot): self.uptime_secs = iprot.readI32(); else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.topology_version = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) elif fid == 7: if ftype == TType.STRUCT: self.stats = ExecutorStats() @@ -2708,6 +2729,10 @@ def write(self, oprot): oprot.writeFieldBegin('uptime_secs', TType.I32, 5) oprot.writeI32(self.uptime_secs) oprot.writeFieldEnd() + if self.topology_version is not None: + oprot.writeFieldBegin('topology_version', TType.STRING, 6) + oprot.writeString(self.topology_version.encode('utf-8')) + oprot.writeFieldEnd() if self.stats is not None: oprot.writeFieldBegin('stats', TType.STRUCT, 7) self.stats.write(oprot) @@ -2726,6 +2751,8 @@ def validate(self): raise TProtocol.TProtocolException(message='Required field port is unset!') if self.uptime_secs is None: raise TProtocol.TProtocolException(message='Required field uptime_secs is unset!') + if self.topology_version is None: + raise TProtocol.TProtocolException(message='Required field topology_version is unset!') return @@ -2749,6 +2776,7 @@ class TopologyInfo: - executors - status - errors + - topology_version """ thrift_spec = ( @@ -2759,18 +2787,20 @@ class TopologyInfo: (4, TType.LIST, 'executors', (TType.STRUCT,(ExecutorSummary, ExecutorSummary.thrift_spec)), None, ), # 4 (5, TType.STRING, 'status', None, None, ), # 5 (6, TType.MAP, 'errors', (TType.STRING,None,TType.LIST,(TType.STRUCT,(ErrorInfo, ErrorInfo.thrift_spec))), None, ), # 6 + (7, TType.STRING, 'topology_version', None, None, ), # 7 ) def __hash__(self): - return 0 + hash(self.id) + hash(self.name) + hash(self.uptime_secs) + hash(self.executors) + hash(self.status) + hash(self.errors) + return 0 + hash(self.id) + hash(self.name) + hash(self.uptime_secs) + hash(self.executors) + hash(self.status) + hash(self.errors) + hash(self.topology_version) - def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None,): + def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None, topology_version=None,): self.id = id self.name = name self.uptime_secs = uptime_secs self.executors = executors self.status = status self.errors = errors + self.topology_version = topology_version def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -2829,6 +2859,11 @@ def read(self, iprot): iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.topology_version = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -2873,6 +2908,10 @@ def write(self, oprot): oprot.writeListEnd() oprot.writeMapEnd() oprot.writeFieldEnd() + if self.topology_version is not None: + oprot.writeFieldBegin('topology_version', TType.STRING, 7) + oprot.writeString(self.topology_version.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -2889,6 +2928,8 @@ def validate(self): raise TProtocol.TProtocolException(message='Required field status is unset!') if self.errors is None: raise TProtocol.TProtocolException(message='Required field errors is unset!') + if self.topology_version is None: + raise TProtocol.TProtocolException(message='Required field topology_version is unset!') return diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 475acd6ac..f6e13eb6c 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -116,6 +116,7 @@ struct TopologySummary { 5: required i32 num_workers; 6: required i32 uptime_secs; 7: required string status; + 8: required string topology_version; } struct SupervisorSummary { @@ -175,6 +176,7 @@ struct ExecutorSummary { 3: required string host; 4: required i32 port; 5: required i32 uptime_secs; + 6: required string topology_version; 7: optional ExecutorStats stats; } @@ -185,6 +187,7 @@ struct TopologyInfo { 4: required list executors; 5: required string status; 6: required map> errors; + 7: required string topology_version; } struct KillOptions { @@ -208,6 +211,7 @@ struct SubmitOptions { service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); + void updateTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: NotAliveException e, 2: InvalidTopologyException ite); void killTopology(1: string name) throws (1: NotAliveException e); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) throws (1: NotAliveException e);