diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/Peer.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/Peer.java index b56168f..8dc8a4b 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/Peer.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/Peer.java @@ -11,19 +11,30 @@ * Created by wenweihu86 on 2017/5/5. */ public class Peer { + + private PeerId peerId; + private RaftMessage.Server server; + private RPCClient rpcClient; + private RaftConsensusService raftConsensusService; + private RaftConsensusServiceAsync raftConsensusServiceAsync; + // 需要发送给follower的下一个日志条目的索引值,只对leader有效 private long nextIndex; + // 已复制日志的最高索引值 private long matchIndex; + private volatile Boolean voteGranted; + private volatile boolean isCatchUp; public Peer(RaftMessage.Server server) { this.server = server; + this.peerId = new PeerId(server.getServerId()); this.rpcClient = new RPCClient(new EndPoint( server.getEndPoint().getHost(), server.getEndPoint().getPort())); @@ -40,6 +51,10 @@ public RPCClient getRpcClient() { return rpcClient; } + public PeerId getPeerId() { + return peerId; + } + public RaftConsensusService getRaftConsensusService() { return raftConsensusService; } diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/PeerId.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/PeerId.java new file mode 100644 index 0000000..61a2e1e --- /dev/null +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/PeerId.java @@ -0,0 +1,13 @@ +package com.github.wenweihu86.raft; + +public class PeerId { + private Integer peerId; + + public PeerId(Integer peerId) { + this.peerId = peerId; + } + + public Integer getPeerId() { + return peerId; + } +} diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java index e0ebfc8..b03fee7 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java @@ -2,10 +2,10 @@ import com.github.wenweihu86.raft.proto.RaftMessage; import com.github.wenweihu86.raft.storage.SegmentedLog; -import com.github.wenweihu86.raft.util.ConfigurationUtils; -import com.google.protobuf.ByteString; import com.github.wenweihu86.raft.storage.Snapshot; +import com.github.wenweihu86.raft.util.ConfigurationUtils; import com.github.wenweihu86.rpc.client.RPCCallback; +import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.commons.io.FileUtils; @@ -17,7 +17,10 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Created by wenweihu86 on 2017/5/2. @@ -40,13 +43,13 @@ public enum NodeState { private RaftOptions raftOptions; private RaftMessage.Configuration configuration; - private ConcurrentMap peerMap = new ConcurrentHashMap<>(); + private ConcurrentMap peerMap = new ConcurrentHashMap<>(); private RaftMessage.Server localServer; private StateMachine stateMachine; private SegmentedLog raftLog; private Snapshot snapshot; - private NodeState state = NodeState.STATE_FOLLOWER; + private volatile NodeState state; // 服务器最后一次知道的任期号(初始化为 0,持续递增) private long currentTerm; // 在当前获得选票的候选人的Id @@ -66,6 +69,8 @@ public enum NodeState { private ScheduledFuture electionScheduledFuture; private ScheduledFuture heartbeatScheduledFuture; + private AtomicBoolean hasInit = new AtomicBoolean(false); + public RaftNode(RaftOptions raftOptions, List servers, RaftMessage.Server localServer, @@ -79,6 +84,13 @@ public RaftNode(RaftOptions raftOptions, this.localServer = localServer; this.stateMachine = stateMachine; + } + + public void init() { + + if (!hasInit.compareAndSet(false, true)) { + throw new IllegalStateException("Raft Node has init before"); + } // load log and snapshot raftLog = new SegmentedLog(raftOptions.getDataDir(), raftOptions.getMaxSegmentFileSize()); @@ -86,7 +98,9 @@ public RaftNode(RaftOptions raftOptions, snapshot.reload(); currentTerm = raftLog.getMetaData().getCurrentTerm(); + votedFor = raftLog.getMetaData().getVotedFor(); + commitIndex = Math.max(snapshot.getMetaData().getLastIncludedIndex(), commitIndex); // discard old log entries if (snapshot.getMetaData().getLastIncludedIndex() > 0 @@ -110,15 +124,16 @@ public RaftNode(RaftOptions raftOptions, } } lastAppliedIndex = commitIndex; - } - public void init() { + // init state is FOLLOWER + state = NodeState.STATE_FOLLOWER; + for (RaftMessage.Server server : configuration.getServersList()) { - if (!peerMap.containsKey(server.getServerId()) + if (!peerMap.containsKey(new PeerId(server.getServerId())) && server.getServerId() != localServer.getServerId()) { Peer peer = new Peer(server); peer.setNextIndex(raftLog.getLastLogIndex() + 1); - peerMap.put(server.getServerId(), peer); + peerMap.put(new PeerId(server.getServerId()), peer); } } @@ -159,7 +174,7 @@ public boolean replicate(byte[] data, RaftMessage.EntryType entryType) { raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex()); for (RaftMessage.Server server : configuration.getServersList()) { - final Peer peer = peerMap.get(server.getServerId()); + final Peer peer = peerMap.get(new PeerId(server.getServerId())); executorService.submit(new Runnable() { @Override public void run() { @@ -259,7 +274,7 @@ public void appendEntries(Peer peer) { peer.getServer().getEndPoint().getHost(), peer.getServer().getEndPoint().getPort()); if (!ConfigurationUtils.containsServer(configuration, peer.getServer().getServerId())) { - peerMap.remove(peer.getServer().getServerId()); + peerMap.remove(new PeerId(peer.getServer().getServerId())); peer.getRpcClient().stop(); } return; @@ -404,11 +419,11 @@ public void applyConfiguration(RaftMessage.LogEntry entry) { configuration = newConfiguration; // update peerMap for (RaftMessage.Server server : newConfiguration.getServersList()) { - if (!peerMap.containsKey(server.getServerId()) + if (!peerMap.containsKey(new PeerId(server.getServerId())) && server.getServerId() != localServer.getServerId()) { Peer peer = new Peer(server); peer.setNextIndex(raftLog.getLastLogIndex() + 1); - peerMap.put(server.getServerId(), peer); + peerMap.put(new PeerId(server.getServerId()), peer); } } LOG.info("new conf is {}, leaderId={}", PRINTER.print(newConfiguration), leaderId); @@ -473,7 +488,7 @@ private void startPreVote() { if (server.getServerId() == localServer.getServerId()) { continue; } - final Peer peer = peerMap.get(server.getServerId()); + final Peer peer = peerMap.get(new PeerId(server.getServerId())); executorService.submit(new Runnable() { @Override public void run() { @@ -507,7 +522,7 @@ private void startVote() { if (server.getServerId() == localServer.getServerId()) { continue; } - final Peer peer = peerMap.get(server.getServerId()); + final Peer peer = peerMap.get(new PeerId(server.getServerId())); executorService.submit(new Runnable() { @Override public void run() { @@ -597,7 +612,7 @@ public void success(RaftMessage.VoteResponse response) { if (server.getServerId() == localServer.getServerId()) { continue; } - Peer peer1 = peerMap.get(server.getServerId()); + Peer peer1 = peerMap.get(new PeerId(server.getServerId())); if (peer1.isVoteGranted() != null && peer1.isVoteGranted() == true) { voteGrantedNum += 1; } @@ -664,7 +679,7 @@ public void success(RaftMessage.VoteResponse response) { if (server.getServerId() == localServer.getServerId()) { continue; } - Peer peer1 = peerMap.get(server.getServerId()); + Peer peer1 = peerMap.get(new PeerId(server.getServerId())); if (peer1.isVoteGranted() != null && peer1.isVoteGranted() == true) { voteGrantedNum += 1; } @@ -741,7 +756,7 @@ private void advanceCommitIndex() { int i = 0; for (RaftMessage.Server server : configuration.getServersList()) { if (server.getServerId() != localServer.getServerId()) { - Peer peer = peerMap.get(server.getServerId()); + Peer peer = peerMap.get(new PeerId(server.getServerId())); matchIndexes[i++] = peer.getMatchIndex(); } } @@ -928,6 +943,18 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest( return requestBuilder.build(); } + public void removePeer(PeerId peerId) { + peerMap.remove(peerId); + } + + public boolean containsPeer(PeerId peerId) { + return peerMap.containsKey(peerId); + } + + public void addPeer(Peer peer) { + peerMap.put(new PeerId(peer.getServer().getServerId()), peer); + } + public Lock getLock() { return lock; } @@ -992,14 +1019,6 @@ public RaftMessage.Server getLocalServer() { return localServer; } - public NodeState getState() { - return state; - } - - public ConcurrentMap getPeerMap() { - return peerMap; - } - public ExecutorService getExecutorService() { return executorService; } diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java index c27efdd..864818c 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java @@ -1,6 +1,7 @@ package com.github.wenweihu86.raft.service.impl; import com.github.wenweihu86.raft.Peer; +import com.github.wenweihu86.raft.PeerId; import com.github.wenweihu86.raft.RaftNode; import com.github.wenweihu86.raft.proto.RaftMessage; import com.github.wenweihu86.raft.service.RaftClientService; @@ -99,7 +100,7 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request return responseBuilder.build(); } for (RaftMessage.Server server : request.getServersList()) { - if (raftNode.getPeerMap().containsKey(server.getServerId())) { + if (raftNode.containsPeer(new PeerId(server.getServerId()))) { LOG.warn("already be added/adding to configuration"); responseBuilder.setResMsg("already be added/adding to configuration"); return responseBuilder.build(); @@ -110,7 +111,7 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request final Peer peer = new Peer(server); peer.setNextIndex(1); requestPeers.add(peer); - raftNode.getPeerMap().putIfAbsent(server.getServerId(), peer); + raftNode.addPeer(peer); raftNode.getExecutorService().submit(new Runnable() { @Override public void run() { @@ -163,7 +164,7 @@ public void run() { try { for (Peer peer : requestPeers) { peer.getRpcClient().stop(); - raftNode.getPeerMap().remove(peer.getServer().getServerId()); + raftNode.removePeer(new PeerId(peer.getServer().getServerId())); } } finally { raftNode.getLock().unlock();