-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace jar distribution strategy with bittorrent #629
base: master
Are you sure you want to change the base?
Changes from 13 commits
567f72a
abe3414
dcb609a
f70022d
539cf33
4a76e4c
17f1acc
3b56e94
bf5de3f
dec77b7
efd84a6
2209da9
3b8059c
24ae037
755bebf
f784ef3
ff79d6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
(:use [backtype.storm.scheduler.DefaultScheduler]) | ||
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails | ||
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) | ||
(:import [backtype.storm.torrent NimbusTracker]) | ||
(:use [backtype.storm bootstrap util]) | ||
(:use [backtype.storm.daemon common]) | ||
(:gen-class | ||
|
@@ -43,6 +44,8 @@ | |
scheduler | ||
)) | ||
|
||
(defmulti mk-bt-tracker cluster-mode) | ||
|
||
(defn nimbus-data [conf inimbus] | ||
(let [forced-scheduler (.getForcedScheduler inimbus)] | ||
{:conf conf | ||
|
@@ -60,6 +63,7 @@ | |
(halt-process! 20 "Error when processing an event") | ||
)) | ||
:scheduler (mk-scheduler conf inimbus) | ||
:bt-tracker (mk-bt-tracker conf) | ||
})) | ||
|
||
(defn inbox [nimbus] | ||
|
@@ -290,13 +294,14 @@ | |
;; need to somehow maintain stream/component ids inside tuples | ||
topology) | ||
|
||
(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology] | ||
(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] | ||
(let [stormroot (master-stormdist-root conf storm-id)] | ||
(FileUtils/forceMkdir (File. stormroot)) | ||
(FileUtils/cleanDirectory (File. stormroot)) | ||
(setup-jar conf tmp-jar-location stormroot) | ||
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) | ||
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) | ||
(if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id)) | ||
)) | ||
|
||
(defn- read-storm-topology [conf storm-id] | ||
|
@@ -812,6 +817,7 @@ | |
(when-not (empty? to-cleanup-ids) | ||
(doseq [id to-cleanup-ids] | ||
(log-message "Cleaning up " id) | ||
(if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id)) | ||
(.teardown-heartbeats! storm-cluster-state id) | ||
(.teardown-topology-errors! storm-cluster-state id) | ||
(rmr (master-stormdist-root conf id)) | ||
|
@@ -928,7 +934,7 @@ | |
;; lock protects against multiple topologies being submitted at once and | ||
;; cleanup thread killing topology in b/w assignment and starting the topology | ||
(locking (:submit-lock nimbus) | ||
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) | ||
(setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) | ||
(.setup-heartbeats! storm-cluster-state storm-id) | ||
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive | ||
TopologyInitialStatus/ACTIVE :active}] | ||
|
@@ -1151,12 +1157,21 @@ | |
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot))) | ||
)) | ||
|
||
(defmethod mk-bt-tracker :distributed [conf] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we require the use of bit-torrent distribution? It will be better if we could make it optional. For simple topology, one may just prefer not to use BT at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it is required. There was some discussion around making the distribution mechanism pluggable, but Nathan argued against it, as making it pluggable would increase complexity (#629 (comment)). |
||
(NimbusTracker. conf) | ||
) | ||
|
||
|
||
;; local implementation | ||
|
||
(defmethod setup-jar :local [conf & args] | ||
nil | ||
) | ||
|
||
(defmethod mk-bt-tracker :local [conf] | ||
nil | ||
) | ||
|
||
(defn -launch [nimbus] | ||
(launch-server! (read-storm-config) nimbus)) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
(ns backtype.storm.daemon.supervisor | ||
(:import [backtype.storm.scheduler ISupervisor]) | ||
(:import [backtype.storm.torrent SupervisorTracker]) | ||
(:use [backtype.storm bootstrap]) | ||
(:use [backtype.storm.daemon common]) | ||
(:require [backtype.storm.daemon [worker :as worker]]) | ||
|
@@ -9,6 +10,7 @@ | |
(bootstrap) | ||
|
||
(defmulti download-storm-code cluster-mode) | ||
(defmulti mk-bt-tracker cluster-mode) | ||
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) | ||
|
||
;; used as part of a map from port to this | ||
|
@@ -179,6 +181,7 @@ | |
(log-error t "Error when processing event") | ||
(halt-process! 20 "Error when processing an event") | ||
)) | ||
:bt-tracker (mk-bt-tracker conf) | ||
}) | ||
|
||
(defn sync-processes [supervisor] | ||
|
@@ -217,6 +220,9 @@ | |
". Current supervisor time: " now | ||
". State: " state | ||
", Heartbeat: " (pr-str heartbeat)) | ||
(if (:bt-tracker supervisor) | ||
(.stop (:bt-tracker supervisor) (:storm-id heartbeat)) | ||
) | ||
(shutdown-worker supervisor id) | ||
)) | ||
(doseq [id (vals new-worker-ids)] | ||
|
@@ -287,7 +293,7 @@ | |
storm-id | ||
" from " | ||
master-code-dir) | ||
(download-storm-code conf storm-id master-code-dir) | ||
(download-storm-code conf storm-id master-code-dir supervisor) | ||
(log-message "Finished downloading code for storm id " | ||
storm-id | ||
" from " | ||
|
@@ -387,26 +393,27 @@ | |
;; distributed implementation | ||
|
||
(defmethod download-storm-code | ||
:distributed [conf storm-id master-code-dir] | ||
:distributed [conf storm-id master-code-dir supervisor] | ||
;; Downloading to permanent location is atomic | ||
(let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid)) | ||
stormroot (supervisor-stormdist-root conf storm-id)] | ||
(FileUtils/forceMkdir (File. tmproot)) | ||
|
||
(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot)) | ||
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot)) | ||
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot)) | ||
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) | ||
(FileUtils/moveDirectory (File. tmproot) (File. stormroot)) | ||
(FileUtils/forceMkdir (File. (supervisor-stormdist-root conf))) | ||
(Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we still use Utils/downloadFromMaster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To download the .torrent file. Nimbus will generate the .torrent file, then supervisors need to download it. |
||
(.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this code break if BT tracker is not configured (ex. local mode)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it shouldn't. That's the |
||
(extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR (supervisor-stormdist-root conf)) | ||
)) | ||
|
||
(defmethod mk-bt-tracker | ||
:distributed [conf] | ||
(SupervisorTracker. conf) | ||
) | ||
|
||
(defmethod launch-worker | ||
:distributed [supervisor storm-id port worker-id] | ||
(let [conf (:conf supervisor) | ||
storm-home (System/getProperty "storm.home") | ||
stormroot (supervisor-stormdist-root conf storm-id) | ||
stormjar (supervisor-stormjar-path stormroot) | ||
stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) | ||
storm-conf (read-supervisor-storm-conf conf storm-id) | ||
classpath (add-to-classpath (current-classpath) [stormjar]) | ||
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) | ||
|
@@ -437,7 +444,7 @@ | |
first )) | ||
|
||
(defmethod download-storm-code | ||
:local [conf storm-id master-code-dir] | ||
:local [conf storm-id master-code-dir supervisor] | ||
(let [stormroot (supervisor-stormdist-root conf storm-id)] | ||
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) | ||
(let [classloader (.getContextClassLoader (Thread/currentThread)) | ||
|
@@ -456,6 +463,11 @@ | |
)) | ||
))) | ||
|
||
(defmethod mk-bt-tracker | ||
:local [conf] | ||
nil | ||
) | ||
|
||
(defmethod launch-worker | ||
:local [supervisor storm-id port worker-id] | ||
(let [conf (:conf supervisor) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package backtype.storm.torrent; | ||
|
||
import java.util.HashMap; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import com.turn.ttorrent.client.Client; | ||
|
||
public abstract class BaseTracker { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that you want to call it as BaseTracker. NimbusTracker seems to make sense, but SupervisorTracker is not. From BT point of view, we only have 1 tracker located on Nimbus box. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll admit that the "Tracker" name is a bit overloaded here, and I'm not tied to any naming convention. I re-used the "Tracker" name because both the SupervisorTracker and NimbusTracker classes have to "keep track of" BT clients. I'm perfectly open to renaming those classes. |
||
private static final Logger LOG = LoggerFactory.getLogger(BaseTracker.class); | ||
|
||
protected HashMap<String, Client> clients = new HashMap<String, Client>(); | ||
protected Double maxDownload; | ||
protected Double maxUpload; | ||
|
||
protected synchronized void rebalanceRates(){ | ||
int clientCount = this.clients.size(); | ||
if(clientCount > 0){ | ||
double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; | ||
double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; | ||
LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); | ||
LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); | ||
for(Client client : this.clients.values()) { | ||
client.setMaxDownloadRate(maxDl); | ||
client.setMaxUploadRate(maxUl); | ||
} | ||
} | ||
} | ||
|
||
protected static String format(double val){ | ||
return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.0 means no limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, after read the comments in
Config.java
, I know that0.0
meansunlimited
, do we want to give user a more practical default?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed to me that throttling would be enabled for special cases -- big cluster, large topology, etc. I would imagine for most cases we would not want to throttle.