-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-166:Nimbus HA solution based on Zookeeper #61
Changes from all commits
4f12373
4a4176f
91f3764
c7f47d7
2cb3616
a4b3b3f
4812f28
912cf59
68db8d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
(:use [backtype.storm bootstrap util]) | ||
(:use [backtype.storm.config :only [validate-configs-with-schemas]]) | ||
(:use [backtype.storm.daemon common]) | ||
(:use [backtype.storm.nimbus leadership]) | ||
(:gen-class | ||
:methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]])) | ||
|
||
|
@@ -892,10 +893,47 @@ | |
) | ||
) | ||
|
||
(defn- sync-storm-code-from-leader [nimbus] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if the standby nimbus has not downloaded everything when it becomes the master? The current code relies on the topology to be scheduled before the standby downloads anything. But there is a period of time between when the topology is submitted successfully and when the topology is scheduled where if nimbus goes down for whatever reason the topology will be killed. This is probably OK to start out with, but would be nice to fix. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case seems quite similar to nimbus hard-drive error in current version. They all make supervisors couldn't download topology codes from nimbus. Hoping storm-users might agree with us that it's acceptable to storm-users by re-submit the killed topology when this case happens. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see that, it just would be nice to prevent it if at all possible. And perhaps when we move to bit-torrent for downloading that will take care of this too. |
||
(let [conf (:conf nimbus) | ||
storm-cluster-state (:storm-cluster-state nimbus) | ||
storm-ids (.assignments storm-cluster-state nil) | ||
storm-code-map (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid nil)}) | ||
(apply merge) | ||
(filter-val not-nil?) | ||
(map-val :master-code-dir) | ||
) | ||
downloaded-storm-ids (set (map #(java.net.URLDecoder/decode %) (read-dir-contents (master-stormdist-root conf)))) | ||
tmproot (str (master-tmp-dir conf) file-path-separator (uuid))] | ||
(doseq [[storm-id master-code-dir] storm-code-map] | ||
(when (not (downloaded-storm-ids storm-id)) | ||
(log-message "Downloading code for storm id " storm-id " from " master-code-dir) | ||
|
||
(FileUtils/forceMkdir (File. tmproot)) | ||
(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (master-stormjar-path tmproot)) | ||
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (master-stormcode-path tmproot)) | ||
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (master-stormconf-path tmproot)) | ||
(FileUtils/moveDirectory (File. tmproot) (File. (master-stormdist-root conf storm-id))) | ||
|
||
(log-message "Finished downloading code for storm id " storm-id " from " master-code-dir) | ||
) | ||
) | ||
) | ||
) | ||
|
||
(defserverfn service-handler [conf inimbus] | ||
(.prepare inimbus conf (master-inimbus-dir conf)) | ||
(log-message "Starting Nimbus with conf " conf) | ||
(let [nimbus (nimbus-data conf inimbus)] | ||
(let [nimbus (nimbus-data conf inimbus) | ||
nimbus-leadership (nimbus-leadership conf)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because nimbus-leadership is opening a new connection to ZK is there ever a possibility that the nimbus-leadership connection will be lost (networking glitch) and the other ZK will not be? This could result in two nimbus instances both running at the same time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. About this i referred to the InterProcessMutex source code and believe that when networking glitch cause mutex.acquire() lose it's zk connection, an IOException will be throwed up by mutext.acquire() to cause this nimbus shutdown finally. Is there any other possibility may result in two or more nimbus instances? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern isn't in the acquire, it is after the acquire. The current code will open up two sockets to ZK. One is used for the mutex. The other is used for interaction with ZK. If the active nimbus, that has passed the mutex.acquire() already, now has a networking glitch that causes just the mutex connection to be dropped. I don't see how this will cause the currently active nimbus to get an IOException and shutdown. |
||
;; Schedule synchronize storm code from leader | ||
(schedule-recurring (:timer nimbus) | ||
10 | ||
10 | ||
(fn [] | ||
(sync-storm-code-from-leader nimbus) | ||
)) | ||
;; Compete to be nimbus leader | ||
(acquire-leadership nimbus-leadership) | ||
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) | ||
(cleanup-corrupt-topologies! nimbus) | ||
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] | ||
|
@@ -1141,6 +1179,7 @@ | |
(.disconnect (:storm-cluster-state nimbus)) | ||
(.cleanup (:downloaders nimbus)) | ||
(.cleanup (:uploaders nimbus)) | ||
(.close nimbus-leadership) | ||
(log-message "Shut down master") | ||
) | ||
DaemonCommon | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
;; Licensed to the Apache Software Foundation (ASF) under one | ||
;; or more contributor license agreements. See the NOTICE file | ||
;; distributed with this work for additional information | ||
;; regarding copyright ownership. The ASF licenses this file | ||
;; to you under the Apache License, Version 2.0 (the | ||
;; "License"); you may not use this file except in compliance | ||
;; with the License. You may obtain a copy of the License at | ||
;; | ||
;; http://www.apache.org/licenses/LICENSE-2.0 | ||
;; | ||
;; Unless required by applicable law or agreed to in writing, software | ||
;; distributed under the License is distributed on an "AS IS" BASIS, | ||
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
;; See the License for the specific language governing permissions and | ||
;; limitations under the License. | ||
(ns backtype.storm.nimbus.leadership | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs an apache license header. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix this |
||
(:import [backtype.storm.nimbus NimbusLeadership]) | ||
(:use [backtype.storm log])) | ||
|
||
(defn nimbus-leadership [conf] | ||
(NimbusLeadership. conf)) | ||
|
||
(defn get-nimbus-leader-address [conf] | ||
(.getNimbusLeaderAddress (nimbus-leadership conf))) | ||
|
||
(defn get-nimbus-hosts [conf] | ||
(.getNimbusHosts (nimbus-leadership conf))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We never call close on the nimbus-leadership. I believe that we are leaking a connection to ZK every time this this and get-nimbus-leader-address is called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The connection to ZK is closed when nimbus shutdown. Considering the "nimbus-leadership" function is only called in "service-handler" in nimbus.clj to acquire leadership when nimbus launching, the relative codes are placed at the end of "service-handler" as "(.close nimbus-leadership)" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see it now you are right. |
||
|
||
(defn acquire-leadership [nimbus-leadership] | ||
(when-let [nimbus-leader-address (.getNimbusLeaderAddress nimbus-leadership)] | ||
(log-message "Current Nimbus Leader: " nimbus-leader-address)) | ||
(log-message "acquiring nimbus leadership...") | ||
(.acquireLeaderShip nimbus-leadership) | ||
(log-message "acuqired nimbus leadership!")) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package backtype.storm.nimbus; | ||
|
||
import java.io.UnsupportedEncodingException; | ||
import java.net.InetAddress; | ||
import java.net.InetSocketAddress; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.framework.recipes.locks.InterProcessMutex; | ||
import org.apache.curator.utils.ZKPaths; | ||
|
||
import backtype.storm.Config; | ||
import backtype.storm.utils.Utils; | ||
|
||
@SuppressWarnings("rawtypes") | ||
public class NimbusLeadership { | ||
|
||
private static final String STORM_NIMBUS_LEADERSHIP_PATH = "/nimbus/leadership"; | ||
|
||
private Map conf; | ||
private CuratorFramework curator; | ||
private InterProcessMutex mutex; | ||
private boolean isLeader = false; | ||
|
||
public NimbusLeadership(final Map conf) { | ||
this.conf = conf; | ||
} | ||
|
||
public void acquireLeaderShip() throws Exception { | ||
String nimbusHostName = InetAddress.getLocalHost().getCanonicalHostName(); | ||
Object nimbusPort = conf.get(Config.NIMBUS_THRIFT_PORT); | ||
String nodeId = nimbusHostName + ":" + nimbusPort.toString(); | ||
initCurator(); | ||
initLeadershipMutex(nodeId); | ||
mutex.acquire(); | ||
isLeader = true; | ||
} | ||
|
||
public InetSocketAddress getNimbusLeaderAddress() throws Exception { | ||
InetSocketAddress leaderAddress = null; | ||
initCurator(); | ||
initLeadershipMutex(null); | ||
Collection<String> nimbusNodesPath = mutex.getParticipantNodes(); | ||
if (nimbusNodesPath.size() > 0) { | ||
leaderAddress = parseAddress(nimbusNodesPath.iterator().next()); | ||
} | ||
close(); | ||
return leaderAddress; | ||
} | ||
|
||
public List<InetSocketAddress> getNimbusHosts() throws Exception { | ||
List<InetSocketAddress> nimbusAddressList = new ArrayList<InetSocketAddress>(); | ||
initCurator(); | ||
initLeadershipMutex(null); | ||
Collection<String> nimbusNodesPath = mutex.getParticipantNodes(); | ||
for (String nimbusNodePath : nimbusNodesPath) { | ||
nimbusAddressList.add(parseAddress(nimbusNodePath)); | ||
} | ||
close(); | ||
return nimbusAddressList; | ||
} | ||
|
||
public void close() { | ||
if (isLeader) { | ||
try { | ||
mutex.release(); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Exception while releasing mutex", e); | ||
} | ||
} | ||
curator.close(); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private void initCurator() throws Exception { | ||
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); | ||
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); | ||
this.curator = Utils.newCuratorStarted(conf, servers, port); | ||
} | ||
|
||
private void initLeadershipMutex(final String nodeId) throws Exception { | ||
String path = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT) + STORM_NIMBUS_LEADERSHIP_PATH; | ||
ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), path); | ||
mutex = new InterProcessMutex(curator, path) { | ||
@Override | ||
protected byte[] getLockNodeBytes() { | ||
try { | ||
return nodeId == null ? null : nodeId.getBytes("UTF-8"); | ||
} catch (UnsupportedEncodingException e) { | ||
throw new RuntimeException("UTF-8 isn't supported", e); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
private InetSocketAddress parseAddress(String nimbusNodePath) throws Exception { | ||
String nimbusNodeData = new String(curator.getData().forPath(nimbusNodePath), "UTF-8"); | ||
String[] split = nimbusNodeData.split(":"); | ||
return new InetSocketAddress(split[0], Integer.parseInt(split[1])); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,19 +24,23 @@ | |
import org.apache.thrift.transport.TTransportException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import backtype.storm.nimbus.NimbusLeadership; | ||
|
||
public class NimbusClient extends ThriftClient { | ||
private Nimbus.Client _client; | ||
private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); | ||
|
||
public static NimbusClient getConfiguredClient(Map conf) { | ||
try { | ||
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); | ||
NimbusLeadership nimbusLeadership = new NimbusLeadership(conf); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leadership is never closed here either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The connection to ZK here is closed in next line "nimbusLeadership.getNimbusLeaderAddress()". The "getNimbusLeaderAddress()" function has finally closed CuratorFramework calling "close();" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that now. I messed the calls to close() in getNimbusLeaderAddress() and getNimbusHosts(). That seems counter intuitive to me but it works. |
||
String nimbusHost = nimbusLeadership.getNimbusLeaderAddress().getHostName(); | ||
int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); | ||
return new NimbusClient(conf, nimbusHost, nimbusPort); | ||
} catch (TTransportException ex) { | ||
throw new RuntimeException(ex); | ||
} | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public NimbusClient(Map conf, String host, int port) throws TTransportException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if nimbus.host is not used any more we should either deprecate it or just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nimbus.host relative config should be removed all over the source codes. I will fix this.