Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace jar distribution strategy with bittorrent #629

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ storm.messaging.transport: "backtype.storm.messaging.zmq"
### nimbus.* configs are for the master
nimbus.host: "localhost"
nimbus.thrift.port: 6627
nimbus.bittorrent.port: 6969
nimbus.bittorrent.bind.address: 0.0.0.0
nimbus.bittorrent.max.upload.rate: 0.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.0 means no limit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, after read the comments in Config.java, I know that 0.0 means unlimited, do we want to give user a more practical default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed to me that throttling would be enabled for special cases -- big cluster, large topology, etc. I would imagine for most cases we would not want to throttle.

nimbus.bittorrent.max.download.rate: 0.0
nimbus.childopts: "-Xmx1024m"
nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
Expand Down Expand Up @@ -71,6 +75,10 @@ supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true

supervisor.bittorrent.max.upload.rate: 0.0
supervisor.bittorrent.max.download.rate: 0.0
supervisor.bittorrent.seed.duration: 0

### worker.* configs are for task workers
worker.childopts: "-Xmx768m"
worker.heartbeat.frequency.secs: 1
Expand Down
1 change: 1 addition & 0 deletions storm-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
[com.google.guava/guava "13.0"]
[ch.qos.logback/logback-classic "1.0.6"]
[org.slf4j/log4j-over-slf4j "1.6.6"]
[com.turn/ttorrent "1.2"]
]

:source-paths ["src/clj"]
Expand Down
10 changes: 8 additions & 2 deletions storm-core/src/clj/backtype/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@
([conf storm-id]
(str (master-stormdist-root conf) "/" storm-id)))

(defn master-stormtorrent-path [stormroot storm-id]
(str stormroot "/" storm-id ".torrent"))

(defn master-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))

Expand Down Expand Up @@ -150,8 +153,11 @@
([conf storm-id]
(str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id))))

(defn supervisor-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))
(defn supervisor-stormtorrent-path [stormroot storm-id]
(str stormroot "/" storm-id ".torrent"))

(defn supervisor-stormjar-path [stormroot storm-id]
(str stormroot "/" storm-id "/stormjar.jar"))

(defn supervisor-stormcode-path [stormroot]
(str stormroot "/stormcode.ser"))
Expand Down
19 changes: 17 additions & 2 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
(:import [backtype.storm.torrent NimbusTracker])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
(:gen-class
Expand Down Expand Up @@ -43,6 +44,8 @@
scheduler
))

(defmulti mk-bt-tracker cluster-mode)

(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
Expand All @@ -60,6 +63,7 @@
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
:bt-tracker (mk-bt-tracker conf)
}))

(defn inbox [nimbus]
Expand Down Expand Up @@ -290,13 +294,14 @@
;; need to somehow maintain stream/component ids inside tuples
topology)

(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
(let [stormroot (master-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
(if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id))
))

(defn- read-storm-topology [conf storm-id]
Expand Down Expand Up @@ -812,6 +817,7 @@
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
(if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id))
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
Expand Down Expand Up @@ -928,7 +934,7 @@
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
Expand Down Expand Up @@ -1151,12 +1157,21 @@
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
))

(defmethod mk-bt-tracker :distributed [conf]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we require the use of bit-torrent distribution? It will be better if we could make it optional. For simple topology, one may just prefer not to use BT at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is required. There was some discussion around making the distribution mechanism pluggable, but Nathan argued against it, as making it pluggable would increase complexity (#629 (comment)).

(NimbusTracker. conf)
)


;; local implementation

(defmethod setup-jar :local [conf & args]
nil
)

(defmethod mk-bt-tracker :local [conf]
nil
)

(defn -launch [nimbus]
(launch-server! (read-storm-config) nimbus))

Expand Down
34 changes: 23 additions & 11 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns backtype.storm.daemon.supervisor
(:import [backtype.storm.scheduler ISupervisor])
(:import [backtype.storm.torrent SupervisorTracker])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
Expand All @@ -9,6 +10,7 @@
(bootstrap)

(defmulti download-storm-code cluster-mode)
(defmulti mk-bt-tracker cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))

;; used as part of a map from port to this
Expand Down Expand Up @@ -179,6 +181,7 @@
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:bt-tracker (mk-bt-tracker conf)
})

(defn sync-processes [supervisor]
Expand Down Expand Up @@ -217,6 +220,9 @@
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(if (:bt-tracker supervisor)
(.stop (:bt-tracker supervisor) (:storm-id heartbeat))
)
(shutdown-worker supervisor id)
))
(doseq [id (vals new-worker-ids)]
Expand Down Expand Up @@ -287,7 +293,7 @@
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir)
(download-storm-code conf storm-id master-code-dir supervisor)
(log-message "Finished downloading code for storm id "
storm-id
" from "
Expand Down Expand Up @@ -387,26 +393,27 @@
;; distributed implementation

(defmethod download-storm-code
:distributed [conf storm-id master-code-dir]
:distributed [conf storm-id master-code-dir supervisor]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. tmproot))

(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
(FileUtils/forceMkdir (File. (supervisor-stormdist-root conf)))
(Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still use Utils/downloadFromMaster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To download the .torrent file.

Nimbus will generate the .torrent file, then supervisors need to download it.

(.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this code break if BT tracker is not configured (ex. local mode)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it shouldn't. That's the :distributed implementation of download-storm-code. The :local implementation (below) does not use the BT tracker.

(extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR (supervisor-stormdist-root conf))
))

(defmethod mk-bt-tracker
:distributed [conf]
(SupervisorTracker. conf)
)

(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot)
stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
Expand Down Expand Up @@ -437,7 +444,7 @@
first ))

(defmethod download-storm-code
:local [conf storm-id master-code-dir]
:local [conf storm-id master-code-dir supervisor]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
Expand All @@ -456,6 +463,11 @@
))
)))

(defmethod mk-bt-tracker
:local [conf]
nil
)

(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
Expand Down
47 changes: 47 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,33 @@ public class Config extends HashMap<String, Object> {
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;

/**
* Which port the Nimbus BitTorrent tracker should bind to.
*/
public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port";
public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class;

/**
* Which network interface the BitTorrent tracker should listen on. The
* default of 0.0.0.0 will listen on all interfaces. This can be an IP
* address or a hostname.
*/
public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address";
public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class;

/**
* Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate";
public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;

/**
* Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate";
public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;


/**
* This parameter is used by the storm-deploy project to configure the
* jvm options for the nimbus daemon.
Expand Down Expand Up @@ -412,6 +438,27 @@ public class Config extends HashMap<String, Object> {
*/
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;

/**
* Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate";
public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;

/**
* Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
*/
public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate";
public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;

/**
* Time in seconds that a supervisor should seed after completing a topology torrent download.
* A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor
* should seed indefinitely (until the topology is killed).
*/
public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration";
public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class;


/**
* The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
Expand Down
34 changes: 34 additions & 0 deletions storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package backtype.storm.torrent;

import java.util.HashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.turn.ttorrent.client.Client;

public abstract class BaseTracker {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that you want to call it as BaseTracker. NimbusTracker seems to make sense, but SupervisorTracker is not. From BT point of view, we only have 1 tracker located on Nimbus box.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit that the "Tracker" name is a bit overloaded here, and I'm not tied to any naming convention.

I re-used the "Tracker" name because both the SupervisorTracker and NimbusTracker classes have to "keep track of" BT clients.

I'm perfectly open to renaming those classes.

private static final Logger LOG = LoggerFactory.getLogger(BaseTracker.class);

protected HashMap<String, Client> clients = new HashMap<String, Client>();
protected Double maxDownload;
protected Double maxUpload;

protected synchronized void rebalanceRates(){
int clientCount = this.clients.size();
if(clientCount > 0){
double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
for(Client client : this.clients.values()) {
client.setMaxDownloadRate(maxDl);
client.setMaxUploadRate(maxUl);
}
}
}

protected static String format(double val){
return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
}
}
Loading