diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b17e445a2..882d9c085 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -23,6 +23,10 @@ storm.messaging.transport: "backtype.storm.messaging.zmq" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 +nimbus.bittorrent.port: 6969 +nimbus.bittorrent.bind.address: 0.0.0.0 +nimbus.bittorrent.max.upload.rate: 0.0 +nimbus.bittorrent.max.download.rate: 0.0 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 @@ -71,6 +75,10 @@ supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true +supervisor.bittorrent.max.upload.rate: 0.0 +supervisor.bittorrent.max.download.rate: 0.0 +supervisor.bittorrent.seed.duration: 0 + ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 diff --git a/storm-core/project.clj b/storm-core/project.clj index 9e356bc74..d51a047e6 100644 --- a/storm-core/project.clj +++ b/storm-core/project.clj @@ -27,6 +27,8 @@ [com.google.guava/guava "13.0"] [ch.qos.logback/logback-classic "1.0.6"] [org.slf4j/log4j-over-slf4j "1.6.6"] + [com.turn/ttorrent "1.2" + :exclusions[org.slf4j/slf4j-log4j12]] ] :source-paths ["src/clj"] diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 32f9141a9..4b0bee6da 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -119,6 +119,9 @@ ([conf storm-id] (str (master-stormdist-root conf) "/" storm-id))) +(defn master-stormtorrent-path [stormroot storm-id] + (str stormroot "/" storm-id ".torrent")) + (defn master-stormjar-path [stormroot] (str stormroot "/stormjar.jar")) @@ -150,8 +153,11 @@ ([conf storm-id] (str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id)))) -(defn supervisor-stormjar-path [stormroot] - (str stormroot "/stormjar.jar")) +(defn supervisor-stormtorrent-path [stormroot storm-id] + (str stormroot "/" storm-id ".torrent")) + +(defn supervisor-stormjar-path [stormroot storm-id] + (str stormroot "/" storm-id "/stormjar.jar")) (defn supervisor-stormcode-path [stormroot] (str stormroot "/stormcode.ser")) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..ce5fc30d0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -9,6 +9,7 @@ (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) + (:import [backtype.storm.torrent NimbusTracker]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.daemon common]) (:gen-class @@ -43,6 +44,8 @@ scheduler )) +(defmulti mk-bt-tracker cluster-mode) + (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf @@ -60,6 +63,7 @@ (halt-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) + :bt-tracker (mk-bt-tracker conf) })) (defn inbox [nimbus] @@ -290,13 +294,14 @@ ;; need to somehow maintain stream/component ids inside tuples topology) -(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology] +(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] (let [stormroot (master-stormdist-root conf storm-id)] (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) + (if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id)) )) (defn- read-storm-topology [conf storm-id] @@ -812,6 +817,7 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) + (if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id)) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) @@ -928,7 +934,7 @@ ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) - (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) + (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] @@ -1151,12 +1157,21 @@ (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot))) )) +(defmethod mk-bt-tracker :distributed [conf] + (NimbusTracker. conf) + ) + + ;; local implementation (defmethod setup-jar :local [conf & args] nil ) +(defmethod mk-bt-tracker :local [conf] + nil + ) + (defn -launch [nimbus] (launch-server! (read-storm-config) nimbus)) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..210562f6b 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,5 +1,6 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) + (:import [backtype.storm.torrent SupervisorPeer]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -9,6 +10,7 @@ (bootstrap) (defmulti download-storm-code cluster-mode) +(defmulti mk-bt-tracker cluster-mode) (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) ;; used as part of a map from port to this @@ -179,6 +181,7 @@ (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event") )) + :bt-tracker (mk-bt-tracker conf) }) (defn sync-processes [supervisor] @@ -217,6 +220,9 @@ ". Current supervisor time: " now ". State: " state ", Heartbeat: " (pr-str heartbeat)) + (if (:bt-tracker supervisor) + (.stop (:bt-tracker supervisor) (:storm-id heartbeat)) + ) (shutdown-worker supervisor id) )) (doseq [id (vals new-worker-ids)] @@ -287,7 +293,7 @@ storm-id " from " master-code-dir) - (download-storm-code conf storm-id master-code-dir) + (download-storm-code conf storm-id master-code-dir supervisor) (log-message "Finished downloading code for storm id " storm-id " from " @@ -387,26 +393,27 @@ ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir] + :distributed [conf storm-id master-code-dir supervisor] ;; Downloading to permanent location is atomic (let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid)) stormroot (supervisor-stormdist-root conf storm-id)] - (FileUtils/forceMkdir (File. tmproot)) - - (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot)) - (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot)) - (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot)) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (FileUtils/forceMkdir (File. (supervisor-stormdist-root conf))) + (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id)) + (.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id) + (extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR (supervisor-stormdist-root conf)) )) +(defmethod mk-bt-tracker + :distributed [conf] + (SupervisorPeer. conf) + ) (defmethod launch-worker :distributed [supervisor storm-id port worker-id] (let [conf (:conf supervisor) storm-home (System/getProperty "storm.home") stormroot (supervisor-stormdist-root conf storm-id) - stormjar (supervisor-stormjar-path stormroot) + stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) storm-conf (read-supervisor-storm-conf conf storm-id) classpath (add-to-classpath (current-classpath) [stormjar]) childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) @@ -437,7 +444,7 @@ first )) (defmethod download-storm-code - :local [conf storm-id master-code-dir] + :local [conf storm-id master-code-dir supervisor] (let [stormroot (supervisor-stormdist-root conf storm-id)] (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) (let [classloader (.getContextClassLoader (Thread/currentThread)) @@ -456,6 +463,11 @@ )) ))) +(defmethod mk-bt-tracker + :local [conf] + nil + ) + (defmethod launch-worker :local [supervisor storm-id port worker-id] (let [conf (:conf supervisor) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 4cc6e9c6b..bb3c13391 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -192,7 +192,33 @@ public class Config extends HashMap { public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port"; public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class; + /** + * Which port the Nimbus BitTorrent tracker should bind to. + */ + public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port"; + public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class; + + /** + * Which network interface the BitTorrent tracker should listen on. The + * default of 0.0.0.0 will listen on all interfaces. This can be an IP + * address or a hostname. + */ + public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address"; + public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class; + + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; + /** * This parameter is used by the storm-deploy project to configure the * jvm options for the nimbus daemon. @@ -412,6 +438,27 @@ public class Config extends HashMap { */ public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs"; public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class; + + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; + + /** + * Time in seconds that a supervisor should seed after completing a topology torrent download. + * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor + * should seed indefinitely (until the topology is killed). + */ + public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration"; + public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class; + /** * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced diff --git a/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java new file mode 100644 index 000000000..9b9391281 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java @@ -0,0 +1,34 @@ +package backtype.storm.torrent; + +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.turn.ttorrent.client.Client; + +public abstract class BasePeer { + private static final Logger LOG = LoggerFactory.getLogger(BasePeer.class); + + protected HashMap clients = new HashMap(); + protected Double maxDownload; + protected Double maxUpload; + + protected synchronized void rebalanceRates(){ + int clientCount = this.clients.size(); + if(clientCount > 0){ + double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; + double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; + LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); + LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); + for(Client client : this.clients.values()) { + client.setMaxDownloadRate(maxDl); + client.setMaxUploadRate(maxUl); + } + } + } + + protected static String format(double val){ + return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); + } +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java new file mode 100644 index 000000000..0283dea28 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -0,0 +1,86 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; +import com.turn.ttorrent.common.Torrent; +import com.turn.ttorrent.tracker.TrackedTorrent; +import com.turn.ttorrent.tracker.Tracker; + +public class NimbusTracker extends BasePeer { + private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class); + private Tracker tracker; + private InetAddress nimbusHost; + private String hostName; + private Integer port; + + public NimbusTracker (Map conf) throws IOException{ + this.hostName = (String)conf.get(Config.NIMBUS_HOST); + this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT); + this.maxDownload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_UPLOAD_RATE); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + + String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS); + + LOG.info("Starting bt tracker bound to interface '{}'", bindAddress); + this.nimbusHost = InetAddress.getByName(this.hostName); + InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port); + this.tracker = new Tracker(socketAddr); + LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); + this.tracker.start(); + } + + public void stop(String topologyId){ + LOG.info("Stop seeding/tracking for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + Torrent torrent = client.getTorrent(); + client.stop(); + this.tracker.remove(torrent); + } + rebalanceRates(); + } + + public void trackAndSeed(String dir, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{ + + File destDir = new File(dir); + LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); + + URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); + LOG.info("Creating torrent with announce URL: {}", uri); + ArrayList files = new ArrayList(); + files.add(new File(destDir, "stormjar.jar")); + files.add(new File(destDir, "stormconf.ser")); + files.add(new File(destDir, "stormcode.ser")); + + Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); + File torrentFile = new File(destDir, topologyId + ".torrent"); + torrent.save(new FileOutputStream(torrentFile)); + LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath()); + this.tracker.announce(new TrackedTorrent(torrent)); + LOG.info("Torrent announced to tracker."); + Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir.getParentFile(), true)); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(); + LOG.info("Seeding torrent..."); + } + + +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java new file mode 100644 index 000000000..6c5820ac5 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java @@ -0,0 +1,67 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.Client.ClientState; +import com.turn.ttorrent.client.SharedTorrent; + +public class SupervisorPeer extends BasePeer{ + private static final Logger LOG = LoggerFactory.getLogger(SupervisorPeer.class); + + private Integer seedDuration; + + public SupervisorPeer(Map conf){ + LOG.info("Creating supervisor bt tracker."); + this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + } + + public void stop(String topologyId){ + LOG.info("Stopping bt client for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + client.stop(); + } + rebalanceRates(); + } + + public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(torrentPath); + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + if(this.seedDuration == 0){ + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + LOG.info("BitTorrent download complete."); + } + +}