From 567f72a6a2c68b5014704fb16f3d9a8ed19e54fa Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 17 Jul 2013 16:42:19 -0400 Subject: [PATCH 01/17] add ttorrent BitTorrent client dependency to storm core --- storm-core/project.clj | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storm-core/project.clj b/storm-core/project.clj index 9e356bc74..bf3450a62 100644 --- a/storm-core/project.clj +++ b/storm-core/project.clj @@ -1,5 +1,5 @@ (def ROOT-DIR (subs *file* 0 (- (count *file*) (count "project.clj")))) -(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp (.trim))) +(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp)) (defproject storm/storm-core VERSION :dependencies [[org.clojure/clojure "1.4.0"] @@ -19,7 +19,7 @@ [org.clojure/tools.logging "0.2.3"] [org.clojure/math.numeric-tower "0.0.1"] [storm/carbonite "1.5.0"] - [org.yaml/snakeyaml "1.11"] + [org.yaml/snakeyaml "1.9"] [org.apache.httpcomponents/httpclient "4.1.1"] [storm/tools.cli "0.2.2"] [com.googlecode.disruptor/disruptor "2.10.1"] @@ -27,6 +27,7 @@ [com.google.guava/guava "13.0"] [ch.qos.logback/logback-classic "1.0.6"] [org.slf4j/log4j-over-slf4j "1.6.6"] + [com.turn/ttorrent "1.2"] ] :source-paths ["src/clj"] From abe3414d0fe16a2ad5ab465be480821ca9ef3ff8 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 17 Jul 2013 16:43:54 -0400 Subject: [PATCH 02/17] Add bittorrent tracker and client code for use by nimbus and supervisor --- storm-core/src/jvm/backtype/storm/Config.java | 15 +++- .../backtype/storm/torrent/NimbusTracker.java | 76 +++++++++++++++++++ .../storm/torrent/SupervisorClient.java | 40 ++++++++++ 3 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java create mode 100644 storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 4cc6e9c6b..b8a8082e1 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -192,7 +192,20 @@ public class Config extends HashMap { public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port"; public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class; - + /** + * Which port the Nimbus BitTorrent tracker should bind to. + */ + public static final String NIMBUS_BITTORRENT_PORT = "nimbus.bittorrent.port"; + public static final Object NIMBUS_BITTORRENT_PORT_SCHEMA = Number.class; + + /** + * Which network interface the BitTorrent tracker should listen on. The + * default of 0.0.0.0 will listen on all interfaces. This can be an IP + * address or a hostname. + */ + public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address"; + public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class; + /** * This parameter is used by the storm-deploy project to configure the * jvm options for the nimbus daemon. diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java new file mode 100644 index 000000000..ee6235778 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -0,0 +1,76 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; +import com.turn.ttorrent.common.Torrent; +import com.turn.ttorrent.tracker.TrackedTorrent; +import com.turn.ttorrent.tracker.Tracker; + +public class NimbusTracker { + private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class); + private Tracker tracker; + private InetAddress nimbusHost; + private HashMap clients = new HashMap(); + private String hostName; + private Integer port; + + public void stop(String topologyId){ + LOG.info("Stop seeding/tracking for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + Torrent torrent = client.getTorrent(); + client.stop(); + this.tracker.remove(torrent); + } + } + + public void trackAndSeed(String fileName, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{ + File jarFile = new File(fileName); + File destDir = jarFile.getParentFile(); + LOG.info("Generating torrent for file: {}", jarFile.getAbsolutePath()); + + URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); + LOG.info("Creating torrent with annound URL: {}", uri); + Torrent torrent = Torrent.create(jarFile, uri, "storm-nimbus"); + File torrentFile = new File(destDir, "stormjar.torrent"); + torrent.save(new FileOutputStream(torrentFile)); + LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath()); + this.tracker.announce(new TrackedTorrent(torrent)); + LOG.info("Torrent announced to tracker."); + Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir, true)); + client.share(); + this.clients.put(topologyId, client); + LOG.info("Seeding torrent..."); + } + + + public NimbusTracker (Map conf) throws IOException{ + this.hostName = (String)conf.get(Config.NIMBUS_HOST); + this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT); + String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS); + + LOG.info("Starting bt tracker bound to interface '{}'", bindAddress); + this.nimbusHost = InetAddress.getByName(this.hostName); + InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port); + this.tracker = new Tracker(socketAddr); + LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); + this.tracker.start(); + } + +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java new file mode 100644 index 000000000..adce7dc77 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java @@ -0,0 +1,40 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; + +public class SupervisorClient { + private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class); + private String torrentPath; + + public SupervisorClient(String torrentPath){ + this.torrentPath = torrentPath; + } + + public void download() throws IOException, NoSuchAlgorithmException{ + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(this.torrentPath); + File destDir = torrentFile.getParentFile(); + + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + client.download(); + client.waitForCompletion(); + client.stop(); + + LOG.info("BitTorrent download complete."); + } + + +} From dcb609a4230190eeb7eea2e3e8380728b06020c6 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 17 Jul 2013 16:45:39 -0400 Subject: [PATCH 03/17] change nimbus and supervisor to use bittorrent for jar file distribution. --- storm-core/src/clj/backtype/storm/config.clj | 6 ++++++ storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 6 ++++-- storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 4 +++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 32f9141a9..c989a0007 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -119,6 +119,9 @@ ([conf storm-id] (str (master-stormdist-root conf) "/" storm-id))) +(defn master-stormtorrent-path [stormroot] + (str stormroot "/stormjar.torrent")) + (defn master-stormjar-path [stormroot] (str stormroot "/stormjar.jar")) @@ -150,6 +153,9 @@ ([conf storm-id] (str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id)))) +(defn supervisor-stormtorrent-path [stormroot] + (str stormroot "/stormjar.torrent")) + (defn supervisor-stormjar-path [stormroot] (str stormroot "/stormjar.jar")) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..fca9055b0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -290,11 +290,12 @@ ;; need to somehow maintain stream/component ids inside tuples topology) -(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology] +(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] (let [stormroot (master-stormdist-root conf storm-id)] (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) + (.trackAndSeed (:bt-tracker nimbus) (master-stormjar-path stormroot) storm-id) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) )) @@ -812,6 +813,7 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) + (.stop (:bt-tracker nimbus) id) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) @@ -928,7 +930,7 @@ ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) - (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) + (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..4a71b681d 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,5 +1,6 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) + (:import [backtype.storm.torrent SupervisorClient]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -393,7 +394,8 @@ stormroot (supervisor-stormdist-root conf storm-id)] (FileUtils/forceMkdir (File. tmproot)) - (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot)) + (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir) (supervisor-stormtorrent-path tmproot)) + (.download (SupervisorClient. (supervisor-stormtorrent-path tmproot))) (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot)) (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot)) (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) From f70022d8bddcf4d6d823f3a2c1bc05fe457f07ca Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 17 Jul 2013 16:46:32 -0400 Subject: [PATCH 04/17] add bittorrent configuration defaults --- conf/defaults.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b17e445a2..84115b483 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -23,6 +23,8 @@ storm.messaging.transport: "backtype.storm.messaging.zmq" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 +nimbus.bittorrent.port: 6969 +nimbus.bittorrent.bind.address: 0.0.0.0 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 From 539cf33d27b976444fab990b40fedddfd61814fe Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 17 Jul 2013 17:00:27 -0400 Subject: [PATCH 05/17] revert some changes that got lost by a stash apply --- storm-core/project.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storm-core/project.clj b/storm-core/project.clj index bf3450a62..737acdbe2 100644 --- a/storm-core/project.clj +++ b/storm-core/project.clj @@ -1,5 +1,5 @@ (def ROOT-DIR (subs *file* 0 (- (count *file*) (count "project.clj")))) -(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp)) +(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp (.trim))) (defproject storm/storm-core VERSION :dependencies [[org.clojure/clojure "1.4.0"] @@ -19,7 +19,7 @@ [org.clojure/tools.logging "0.2.3"] [org.clojure/math.numeric-tower "0.0.1"] [storm/carbonite "1.5.0"] - [org.yaml/snakeyaml "1.9"] + [org.yaml/snakeyaml "1.11"] [org.apache.httpcomponents/httpclient "4.1.1"] [storm/tools.cli "0.2.2"] [com.googlecode.disruptor/disruptor "2.10.1"] From 4a76e4cee834d6a9a52437ceecacd905da91a694 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 24 Jul 2013 13:52:10 -0400 Subject: [PATCH 06/17] add configuration values for bittorrent rate limits and seeding --- conf/defaults.yaml | 6 ++++ storm-core/src/jvm/backtype/storm/Config.java | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 84115b483..882d9c085 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -25,6 +25,8 @@ nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.bittorrent.port: 6969 nimbus.bittorrent.bind.address: 0.0.0.0 +nimbus.bittorrent.max.upload.rate: 0.0 +nimbus.bittorrent.max.download.rate: 0.0 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 @@ -73,6 +75,10 @@ supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true +supervisor.bittorrent.max.upload.rate: 0.0 +supervisor.bittorrent.max.download.rate: 0.0 +supervisor.bittorrent.seed.duration: 0 + ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index b8a8082e1..bb3c13391 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -206,6 +206,19 @@ public class Config extends HashMap { public static final String NIMBUS_BITTORRENT_BIND_ADDRESS = "nimbus.bittorrent.bind.address"; public static final Object NIMBUS_BITTORRENT_BIND_ADDRESS_SCHEMA = String.class; + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_UPLOAD_RATE = "nimbus.bittorrent.max.upload.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE = "nimbus.bittorrent.max.download.rate"; + public static final Object NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; + + /** * This parameter is used by the storm-deploy project to configure the * jvm options for the nimbus daemon. @@ -425,6 +438,27 @@ public class Config extends HashMap { */ public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs"; public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class; + + /** + * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE = "supervisor.bittorrent.max.upload.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; + + /** + * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. + */ + public static final String SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE = "supervisor.bittorrent.max.download.rate"; + public static final Object SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; + + /** + * Time in seconds that a supervisor should seed after completing a topology torrent download. + * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor + * should seed indefinitely (until the topology is killed). + */ + public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration"; + public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class; + /** * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced From 17f1acc3d03e99a1c4c7d96d9921211fb8115373 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 24 Jul 2013 13:54:35 -0400 Subject: [PATCH 07/17] implement configurable bitorrent rate limits and seeding for nimbus/supervisor --- .../backtype/storm/torrent/BaseTracker.java | 34 +++++++++ .../backtype/storm/torrent/NimbusTracker.java | 62 +++++++++------- .../storm/torrent/SupervisorClient.java | 40 ----------- .../storm/torrent/SupervisorTracker.java | 72 +++++++++++++++++++ 4 files changed, 143 insertions(+), 65 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java delete mode 100644 storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java create mode 100644 storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java diff --git a/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java b/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java new file mode 100644 index 000000000..ef4b23cd9 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java @@ -0,0 +1,34 @@ +package backtype.storm.torrent; + +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.turn.ttorrent.client.Client; + +public abstract class BaseTracker { + private static final Logger LOG = LoggerFactory.getLogger(BaseTracker.class); + + protected HashMap clients = new HashMap(); + protected Double maxDownload; + protected Double maxUpload; + + protected void rebalanceRates(){ + int clientCount = this.clients.size(); + if(clientCount > 0){ + double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; + double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; + LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); + LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); + for(Client client : this.clients.values()) { + client.setMaxDownloadRate(maxDl); + client.setMaxUploadRate(maxUl); + } + } + } + + protected static String format(double val){ + return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); + } +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java index ee6235778..c44fa36c5 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -8,6 +8,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -22,14 +23,30 @@ import com.turn.ttorrent.tracker.TrackedTorrent; import com.turn.ttorrent.tracker.Tracker; -public class NimbusTracker { +public class NimbusTracker extends BaseTracker { private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class); private Tracker tracker; private InetAddress nimbusHost; - private HashMap clients = new HashMap(); private String hostName; private Integer port; + public NimbusTracker (Map conf) throws IOException{ + this.hostName = (String)conf.get(Config.NIMBUS_HOST); + this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT); + this.maxDownload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.NIMBUS_BITTORRENT_MAX_UPLOAD_RATE); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + + String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS); + + LOG.info("Starting bt tracker bound to interface '{}'", bindAddress); + this.nimbusHost = InetAddress.getByName(this.hostName); + InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port); + this.tracker = new Tracker(socketAddr); + LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); + this.tracker.start(); + } + public void stop(String topologyId){ LOG.info("Stop seeding/tracking for topology {}", topologyId); Client client = this.clients.remove(topologyId); @@ -38,39 +55,34 @@ public void stop(String topologyId){ client.stop(); this.tracker.remove(torrent); } + rebalanceRates(); } - public void trackAndSeed(String fileName, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{ - File jarFile = new File(fileName); - File destDir = jarFile.getParentFile(); - LOG.info("Generating torrent for file: {}", jarFile.getAbsolutePath()); + public void trackAndSeed(String dir, String topologyId) throws IOException, NoSuchAlgorithmException, InterruptedException, URISyntaxException{ + + File destDir = new File(dir); + LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); - LOG.info("Creating torrent with annound URL: {}", uri); - Torrent torrent = Torrent.create(jarFile, uri, "storm-nimbus"); - File torrentFile = new File(destDir, "stormjar.torrent"); + LOG.info("Creating torrent with announce URL: {}", uri); + ArrayList files = new ArrayList(); + + files.add(new File(destDir, "stormjar.jar")); + files.add(new File(destDir, "stormconf.ser")); + files.add(new File(destDir, "stormcode.ser")); + + Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); + File torrentFile = new File(destDir, topologyId + ".torrent"); torrent.save(new FileOutputStream(torrentFile)); LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath()); this.tracker.announce(new TrackedTorrent(torrent)); LOG.info("Torrent announced to tracker."); - Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir, true)); - client.share(); + Client client = new Client(this.nimbusHost, new SharedTorrent(torrent, destDir.getParentFile(), true)); this.clients.put(topologyId, client); + rebalanceRates(); + client.share(); LOG.info("Seeding torrent..."); } - - - public NimbusTracker (Map conf) throws IOException{ - this.hostName = (String)conf.get(Config.NIMBUS_HOST); - this.port = (Integer)conf.get(Config.NIMBUS_BITTORRENT_PORT); - String bindAddress = (String)conf.get(Config.NIMBUS_BITTORRENT_BIND_ADDRESS); - - LOG.info("Starting bt tracker bound to interface '{}'", bindAddress); - this.nimbusHost = InetAddress.getByName(this.hostName); - InetSocketAddress socketAddr = new InetSocketAddress(bindAddress, port); - this.tracker = new Tracker(socketAddr); - LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); - this.tracker.start(); - } + } diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java deleted file mode 100644 index adce7dc77..000000000 --- a/storm-core/src/jvm/backtype/storm/torrent/SupervisorClient.java +++ /dev/null @@ -1,40 +0,0 @@ -package backtype.storm.torrent; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.NoSuchAlgorithmException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.turn.ttorrent.client.Client; -import com.turn.ttorrent.client.SharedTorrent; - -public class SupervisorClient { - private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class); - private String torrentPath; - - public SupervisorClient(String torrentPath){ - this.torrentPath = torrentPath; - } - - public void download() throws IOException, NoSuchAlgorithmException{ - LOG.info("Initiating BitTorrent download."); - InetAddress netAddr = InetAddress.getLocalHost(); - File torrentFile = new File(this.torrentPath); - File destDir = torrentFile.getParentFile(); - - SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - - Client client = new Client(netAddr, st); - client.download(); - client.waitForCompletion(); - client.stop(); - - LOG.info("BitTorrent download complete."); - } - - -} diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java new file mode 100644 index 000000000..e3f3f424c --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java @@ -0,0 +1,72 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; +import com.turn.ttorrent.client.Client.ClientState; +import com.turn.ttorrent.common.Torrent; +import com.turn.ttorrent.tracker.Tracker; + +public class SupervisorTracker extends BaseTracker{ + private static final Logger LOG = LoggerFactory.getLogger(SupervisorTracker.class); + + private Integer seedDuration; + + public SupervisorTracker(Map conf){ + LOG.info("Creating supervisor bt tracker."); + this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + } + + public void stop(String topologyId){ + LOG.info("Stopping bt client for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + client.stop(); + } + rebalanceRates(); + } + + + public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(torrentPath); + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + if(this.seedDuration == 0){ + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + LOG.info("BitTorrent download complete."); + } + +} From 3b56e94ec299cb08c689553cd0daedaa3de943e8 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 24 Jul 2013 13:57:45 -0400 Subject: [PATCH 08/17] modify nimbus/supervisor to exchange all topology code (.jar, .ser) via bittorrent --- storm-core/src/clj/backtype/storm/config.clj | 12 +++++----- .../src/clj/backtype/storm/daemon/nimbus.clj | 4 +++- .../clj/backtype/storm/daemon/supervisor.clj | 22 +++++++++---------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index c989a0007..4b0bee6da 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -119,8 +119,8 @@ ([conf storm-id] (str (master-stormdist-root conf) "/" storm-id))) -(defn master-stormtorrent-path [stormroot] - (str stormroot "/stormjar.torrent")) +(defn master-stormtorrent-path [stormroot storm-id] + (str stormroot "/" storm-id ".torrent")) (defn master-stormjar-path [stormroot] (str stormroot "/stormjar.jar")) @@ -153,11 +153,11 @@ ([conf storm-id] (str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id)))) -(defn supervisor-stormtorrent-path [stormroot] - (str stormroot "/stormjar.torrent")) +(defn supervisor-stormtorrent-path [stormroot storm-id] + (str stormroot "/" storm-id ".torrent")) -(defn supervisor-stormjar-path [stormroot] - (str stormroot "/stormjar.jar")) +(defn supervisor-stormjar-path [stormroot storm-id] + (str stormroot "/" storm-id "/stormjar.jar")) (defn supervisor-stormcode-path [stormroot] (str stormroot "/stormcode.ser")) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index fca9055b0..f2acd5054 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -9,6 +9,7 @@ (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) + (:import [backtype.storm.torrent NimbusTracker]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.daemon common]) (:gen-class @@ -60,6 +61,7 @@ (halt-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) + :bt-tracker (NimbusTracker. conf) })) (defn inbox [nimbus] @@ -295,9 +297,9 @@ (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) - (.trackAndSeed (:bt-tracker nimbus) (master-stormjar-path stormroot) storm-id) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) + (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id) )) (defn- read-storm-topology [conf storm-id] diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 4a71b681d..623204a98 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,6 +1,6 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) - (:import [backtype.storm.torrent SupervisorClient]) + (:import [backtype.storm.torrent SupervisorTracker]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -180,6 +180,7 @@ (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event") )) + :bt-tracker (SupervisorTracker. conf) }) (defn sync-processes [supervisor] @@ -218,6 +219,7 @@ ". Current supervisor time: " now ". State: " state ", Heartbeat: " (pr-str heartbeat)) + (.stop (:bt-tracker supervisor) (:storm-id heartbeat)) (shutdown-worker supervisor id) )) (doseq [id (vals new-worker-ids)] @@ -288,7 +290,7 @@ storm-id " from " master-code-dir) - (download-storm-code conf storm-id master-code-dir) + (download-storm-code conf storm-id master-code-dir supervisor) (log-message "Finished downloading code for storm id " storm-id " from " @@ -388,18 +390,14 @@ ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir] + :distributed [conf storm-id master-code-dir supervisor] ;; Downloading to permanent location is atomic (let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid)) stormroot (supervisor-stormdist-root conf storm-id)] - (FileUtils/forceMkdir (File. tmproot)) - - (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir) (supervisor-stormtorrent-path tmproot)) - (.download (SupervisorClient. (supervisor-stormtorrent-path tmproot))) - (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot)) - (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot)) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (FileUtils/forceMkdir (File. (supervisor-stormdist-root conf))) + (Utils/downloadFromMaster conf (master-stormtorrent-path master-code-dir storm-id) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id)) + (.download (:bt-tracker supervisor) (supervisor-stormtorrent-path (supervisor-stormdist-root conf) storm-id) storm-id) + (extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR (supervisor-stormdist-root conf)) )) @@ -408,7 +406,7 @@ (let [conf (:conf supervisor) storm-home (System/getProperty "storm.home") stormroot (supervisor-stormdist-root conf storm-id) - stormjar (supervisor-stormjar-path stormroot) + stormjar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) storm-conf (read-supervisor-storm-conf conf storm-id) classpath (add-to-classpath (current-classpath) [stormjar]) childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) From bf5de3f994f325fbe5b2283c7e750da35dce5a49 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Sat, 14 Sep 2013 15:35:40 -0400 Subject: [PATCH 09/17] fix local mode supervisor --- storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 2 +- .../src/jvm/backtype/storm/torrent/NimbusTracker.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 623204a98..f17edc915 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -437,7 +437,7 @@ first )) (defmethod download-storm-code - :local [conf storm-id master-code-dir] + :local [conf storm-id master-code-dir supervisor] (let [stormroot (supervisor-stormdist-root conf storm-id)] (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) (let [classloader (.getContextClassLoader (Thread/currentThread)) diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java index c44fa36c5..6231db08e 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -66,8 +66,12 @@ public void trackAndSeed(String dir, String topologyId) throws IOException, NoSu URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); LOG.info("Creating torrent with announce URL: {}", uri); ArrayList files = new ArrayList(); - - files.add(new File(destDir, "stormjar.jar")); + + File jar = new File(destDir, "stormjar.jar"); + // no storm jar in local mode + if(jar.exists()){ + files.add(jar); + } files.add(new File(destDir, "stormconf.ser")); files.add(new File(destDir, "stormcode.ser")); From dec77b7db73160085f0912952d932b34fd1542d8 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Sat, 14 Sep 2013 16:11:35 -0400 Subject: [PATCH 10/17] only start bittorrent tracker in distributed mode. --- .../src/clj/backtype/storm/daemon/supervisor.clj | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index f17edc915..0f1f97e99 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -10,6 +10,7 @@ (bootstrap) (defmulti download-storm-code cluster-mode) +(defmulti mk-bt-tracker cluster-mode) (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) ;; used as part of a map from port to this @@ -180,7 +181,7 @@ (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event") )) - :bt-tracker (SupervisorTracker. conf) + :bt-tracker (mk-bt-tracker conf) }) (defn sync-processes [supervisor] @@ -219,7 +220,9 @@ ". Current supervisor time: " now ". State: " state ", Heartbeat: " (pr-str heartbeat)) - (.stop (:bt-tracker supervisor) (:storm-id heartbeat)) + (if (:bt-tracker supervisor) + (.stop (:bt-tracker supervisor) (:storm-id heartbeat)) + ) (shutdown-worker supervisor id) )) (doseq [id (vals new-worker-ids)] @@ -400,6 +403,10 @@ (extract-dir-from-jar (supervisor-stormjar-path (supervisor-stormdist-root conf) storm-id) RESOURCES-SUBDIR (supervisor-stormdist-root conf)) )) +(defmethod mk-bt-tracker + :distributed [conf] + (SupervisorTracker. conf) + ) (defmethod launch-worker :distributed [supervisor storm-id port worker-id] @@ -456,6 +463,11 @@ )) ))) +(defmethod mk-bt-tracker + :local [conf] + nil + ) + (defmethod launch-worker :local [supervisor storm-id port worker-id] (let [conf (:conf supervisor) From efd84a682d9ea5b713a66246429afed512e4da6d Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Sat, 14 Sep 2013 16:22:55 -0400 Subject: [PATCH 11/17] only start Nimbus bittorrent tracker in distributed mode. --- .../src/clj/backtype/storm/daemon/nimbus.clj | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index f2acd5054..2ed749a47 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -44,6 +44,8 @@ scheduler )) +(defmulti mk-bt-tracker cluster-mode) + (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf @@ -61,7 +63,7 @@ (halt-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) - :bt-tracker (NimbusTracker. conf) + :bt-tracker (mk-bt-tracker conf) })) (defn inbox [nimbus] @@ -299,7 +301,7 @@ (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) - (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id) + (if (:bt-tracker nimbus) (.trackAndSeed (:bt-tracker nimbus) stormroot storm-id)) )) (defn- read-storm-topology [conf storm-id] @@ -815,7 +817,7 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (.stop (:bt-tracker nimbus) id) + (if (:bt-tracker nimbus) (.stop (:bt-tracker nimbus) id)) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) @@ -1155,12 +1157,21 @@ (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot))) )) +(defmethod mk-bt-tracker :local [conf] + (NimbusTracker. conf) + ) + + ;; local implementation (defmethod setup-jar :local [conf & args] nil ) +(defmethod mk-bt-tracker :local [conf] + nil + ) + (defn -launch [nimbus] (launch-server! (read-storm-config) nimbus)) From 2209da90861f9779dd4d621b5d6e11ba46493599 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Thu, 26 Sep 2013 10:35:35 -0400 Subject: [PATCH 12/17] address issues identified in #629 review. --- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 2 +- .../src/jvm/backtype/storm/torrent/NimbusTracker.java | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 2ed749a47..ce5fc30d0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -1157,7 +1157,7 @@ (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot))) )) -(defmethod mk-bt-tracker :local [conf] +(defmethod mk-bt-tracker :distributed [conf] (NimbusTracker. conf) ) diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java index 6231db08e..17313617d 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -66,12 +66,7 @@ public void trackAndSeed(String dir, String topologyId) throws IOException, NoSu URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); LOG.info("Creating torrent with announce URL: {}", uri); ArrayList files = new ArrayList(); - - File jar = new File(destDir, "stormjar.jar"); - // no storm jar in local mode - if(jar.exists()){ - files.add(jar); - } + files.add(new File(destDir, "stormjar.jar")); files.add(new File(destDir, "stormconf.ser")); files.add(new File(destDir, "stormcode.ser")); From 3b8059cf840c832a65bae8c27f447b2e82908e9f Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Thu, 26 Sep 2013 17:10:46 -0400 Subject: [PATCH 13/17] synchronize rebalanceRates() method and correct indentation. --- .../backtype/storm/torrent/BaseTracker.java | 2 +- .../storm/torrent/SupervisorTracker.java | 45 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java b/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java index ef4b23cd9..f62149ce0 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java @@ -14,7 +14,7 @@ public abstract class BaseTracker { protected Double maxDownload; protected Double maxUpload; - protected void rebalanceRates(){ + protected synchronized void rebalanceRates(){ int clientCount = this.clients.size(); if(clientCount > 0){ double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java index e3f3f424c..de6e0fd6b 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java @@ -41,32 +41,31 @@ public void stop(String topologyId){ rebalanceRates(); } - public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ - LOG.info("Initiating BitTorrent download."); - InetAddress netAddr = InetAddress.getLocalHost(); - File torrentFile = new File(torrentPath); - File destDir = torrentFile.getParentFile(); - LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); - LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); - SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - - Client client = new Client(netAddr, st); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(this.seedDuration); - if(this.seedDuration == 0){ - client.waitForCompletion(); - } else { - LOG.info("Waiting for seeding to begin..."); - while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(torrentPath); + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + if(this.seedDuration == 0){ + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ + try { + Thread.sleep(10); + } catch (InterruptedException e) { } } - LOG.info("BitTorrent download complete."); + } + LOG.info("BitTorrent download complete."); } } From 24ae037cda883b97351d19627578a7896d519801 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Fri, 27 Sep 2013 14:28:45 -0400 Subject: [PATCH 14/17] Rename BaseTracker and SupervisorTracker to BasePeer and SupervisorPeer --- .../clj/backtype/storm/daemon/supervisor.clj | 4 +- .../backtype/storm/torrent/BaseTracker.java | 34 --------- .../backtype/storm/torrent/NimbusTracker.java | 2 +- .../storm/torrent/SupervisorTracker.java | 71 ------------------- 4 files changed, 3 insertions(+), 108 deletions(-) delete mode 100644 storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java delete mode 100644 storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 0f1f97e99..210562f6b 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,6 +1,6 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) - (:import [backtype.storm.torrent SupervisorTracker]) + (:import [backtype.storm.torrent SupervisorPeer]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -405,7 +405,7 @@ (defmethod mk-bt-tracker :distributed [conf] - (SupervisorTracker. conf) + (SupervisorPeer. conf) ) (defmethod launch-worker diff --git a/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java b/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java deleted file mode 100644 index f62149ce0..000000000 --- a/storm-core/src/jvm/backtype/storm/torrent/BaseTracker.java +++ /dev/null @@ -1,34 +0,0 @@ -package backtype.storm.torrent; - -import java.util.HashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.turn.ttorrent.client.Client; - -public abstract class BaseTracker { - private static final Logger LOG = LoggerFactory.getLogger(BaseTracker.class); - - protected HashMap clients = new HashMap(); - protected Double maxDownload; - protected Double maxUpload; - - protected synchronized void rebalanceRates(){ - int clientCount = this.clients.size(); - if(clientCount > 0){ - double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; - double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; - LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); - LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); - for(Client client : this.clients.values()) { - client.setMaxDownloadRate(maxDl); - client.setMaxUploadRate(maxUl); - } - } - } - - protected static String format(double val){ - return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); - } -} diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java index 17313617d..826cb3cfa 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -23,7 +23,7 @@ import com.turn.ttorrent.tracker.TrackedTorrent; import com.turn.ttorrent.tracker.Tracker; -public class NimbusTracker extends BaseTracker { +public class NimbusTracker extends BasePeer { private static final Logger LOG = LoggerFactory.getLogger(NimbusTracker.class); private Tracker tracker; private InetAddress nimbusHost; diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java deleted file mode 100644 index de6e0fd6b..000000000 --- a/storm-core/src/jvm/backtype/storm/torrent/SupervisorTracker.java +++ /dev/null @@ -1,71 +0,0 @@ -package backtype.storm.torrent; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.NoSuchAlgorithmException; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; - -import com.turn.ttorrent.client.Client; -import com.turn.ttorrent.client.SharedTorrent; -import com.turn.ttorrent.client.Client.ClientState; -import com.turn.ttorrent.common.Torrent; -import com.turn.ttorrent.tracker.Tracker; - -public class SupervisorTracker extends BaseTracker{ - private static final Logger LOG = LoggerFactory.getLogger(SupervisorTracker.class); - - private Integer seedDuration; - - public SupervisorTracker(Map conf){ - LOG.info("Creating supervisor bt tracker."); - this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE); - this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE); - this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); - LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); - } - - public void stop(String topologyId){ - LOG.info("Stopping bt client for topology {}", topologyId); - Client client = this.clients.remove(topologyId); - if(client != null){ - client.stop(); - } - rebalanceRates(); - } - - public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ - LOG.info("Initiating BitTorrent download."); - InetAddress netAddr = InetAddress.getLocalHost(); - File torrentFile = new File(torrentPath); - File destDir = torrentFile.getParentFile(); - LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); - LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); - SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - - Client client = new Client(netAddr, st); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(this.seedDuration); - if(this.seedDuration == 0){ - client.waitForCompletion(); - } else { - LOG.info("Waiting for seeding to begin..."); - while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } - } - LOG.info("BitTorrent download complete."); - } - -} From 755bebf0b98b9f5cb689b54e9c766b923c662733 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Fri, 27 Sep 2013 14:30:35 -0400 Subject: [PATCH 15/17] organize imports --- storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java index 826cb3cfa..0283dea28 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java +++ b/storm-core/src/jvm/backtype/storm/torrent/NimbusTracker.java @@ -9,7 +9,6 @@ import java.net.URISyntaxException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; From f784ef3ae596a221cfd25b45c70c38e1737367b6 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Fri, 27 Sep 2013 14:32:53 -0400 Subject: [PATCH 16/17] Rename BaseTracker, SupervisorTracker --> BasePeer, SupervisorPeer --- .../jvm/backtype/storm/torrent/BasePeer.java | 34 ++++++++++ .../storm/torrent/SupervisorPeer.java | 67 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 storm-core/src/jvm/backtype/storm/torrent/BasePeer.java create mode 100644 storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java diff --git a/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java new file mode 100644 index 000000000..9b9391281 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/BasePeer.java @@ -0,0 +1,34 @@ +package backtype.storm.torrent; + +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.turn.ttorrent.client.Client; + +public abstract class BasePeer { + private static final Logger LOG = LoggerFactory.getLogger(BasePeer.class); + + protected HashMap clients = new HashMap(); + protected Double maxDownload; + protected Double maxUpload; + + protected synchronized void rebalanceRates(){ + int clientCount = this.clients.size(); + if(clientCount > 0){ + double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; + double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; + LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); + LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); + for(Client client : this.clients.values()) { + client.setMaxDownloadRate(maxDl); + client.setMaxUploadRate(maxUl); + } + } + } + + protected static String format(double val){ + return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); + } +} diff --git a/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java new file mode 100644 index 000000000..6c5820ac5 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java @@ -0,0 +1,67 @@ +package backtype.storm.torrent; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; + +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.Client.ClientState; +import com.turn.ttorrent.client.SharedTorrent; + +public class SupervisorPeer extends BasePeer{ + private static final Logger LOG = LoggerFactory.getLogger(SupervisorPeer.class); + + private Integer seedDuration; + + public SupervisorPeer(Map conf){ + LOG.info("Creating supervisor bt tracker."); + this.maxDownload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxDownload), format(this.maxDownload)); + } + + public void stop(String topologyId){ + LOG.info("Stopping bt client for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if(client != null){ + client.stop(); + } + rebalanceRates(); + } + + public void download(String torrentPath, String topologyId) throws IOException, NoSuchAlgorithmException{ + LOG.info("Initiating BitTorrent download."); + InetAddress netAddr = InetAddress.getLocalHost(); + File torrentFile = new File(torrentPath); + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + + Client client = new Client(netAddr, st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + if(this.seedDuration == 0){ + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while(client.getState() != ClientState.SEEDING && client.getState() != ClientState.ERROR){ + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + LOG.info("BitTorrent download complete."); + } + +} From ff79d6f857faf1002e0577295dd2c73e8ce2a482 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 8 Oct 2013 10:51:36 -0400 Subject: [PATCH 17/17] exclude log4j dependencies from ttorrent dependency --- storm-core/project.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storm-core/project.clj b/storm-core/project.clj index 737acdbe2..d51a047e6 100644 --- a/storm-core/project.clj +++ b/storm-core/project.clj @@ -27,7 +27,8 @@ [com.google.guava/guava "13.0"] [ch.qos.logback/logback-classic "1.0.6"] [org.slf4j/log4j-over-slf4j "1.6.6"] - [com.turn/ttorrent "1.2"] + [com.turn/ttorrent "1.2" + :exclusions[org.slf4j/slf4j-log4j12]] ] :source-paths ["src/clj"]