diff --git a/raft.go b/raft.go index 86916626..9e1e6e08 100644 --- a/raft.go +++ b/raft.go @@ -283,6 +283,22 @@ type Config struct { // This behavior will become unconditional in the future. See: // https://github.com/etcd-io/raft/issues/83 StepDownOnRemoval bool + + // ResumeReplicateBelowPendingSnapshot allows the leader to resume replication + // to a follower (moving it to StateReplicate) if the follower is currently in + // StateSnapshot and we receive a MsgAppResp from it below the PendingSnapshot + // index but within the leader's log. In other words, the follower may apply a + // snapshot below the leader's PendingSnapshot, and the leader will resume + // replication as long as it connects the follower to the leader's log. + // + // Consider that complex systems may delegate the sending of snapshots to + // alternative datasources (i.e. not the leader). In such setups, it is + // difficult to manufacture a snapshot at a particular index requested by raft + // and the actual index may be ahead or behind. This should be okay, as long + // as the snapshot allows replication to resume. + // + // TODO(erikgrinaker): Consider making this the default behavior. + ResumeReplicateBelowPendingSnapshot bool } func (c *Config) validate() error { @@ -413,9 +429,10 @@ type raft struct { // randomizedElectionTimeout is a random number between // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. - randomizedElectionTimeout int - disableProposalForwarding bool - stepDownOnRemoval bool + randomizedElectionTimeout int + disableProposalForwarding bool + stepDownOnRemoval bool + resumeReplicateBelowPendingSnapshot bool tick func() step stepFunc @@ -440,22 +457,23 @@ func newRaft(c *Config) *raft { } r := &raft{ - id: c.ID, - lead: None, - isLearner: false, - raftLog: raftlog, - maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), - maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), - disableProposalForwarding: c.DisableProposalForwarding, - disableConfChangeValidation: c.DisableConfChangeValidation, - stepDownOnRemoval: c.StepDownOnRemoval, + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), + maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, + disableConfChangeValidation: c.DisableConfChangeValidation, + stepDownOnRemoval: c.StepDownOnRemoval, + resumeReplicateBelowPendingSnapshot: c.ResumeReplicateBelowPendingSnapshot, } cfg, prs, err := confchange.Restore(confchange.Changer{ @@ -1478,10 +1496,8 @@ func stepLeader(r *raft, m pb.Message) error { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: - // TODO(tbg): we should also enter this branch if a snapshot is - // received that is below pr.PendingSnapshot but which makes it - // possible to use the log again. + case pr.State == tracker.StateSnapshot && (pr.Match >= pr.PendingSnapshot || + r.resumeReplicateBelowPendingSnapshot && pr.Match >= max(0, r.raftLog.firstIndex()-1)): r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1560,10 +1576,6 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State != tracker.StateSnapshot { return nil } - // TODO(tbg): this code is very similar to the snapshot handling in - // MsgAppResp above. In fact, the code there is more correct than the - // code here and should likely be updated to match (or even better, the - // logic pulled into a newly created Progress state machine handler). if !m.Reject { pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index b833e057..c7cfea6f 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -1,8 +1,7 @@ # This is a variant of snapshot_succeed_via_app_resp in which the snapshot # that is being sent is behind the PendingSnapshot index tracked by the leader. -# -# In this case, the leader keeps the follower in StateSnapshot even though it -# could move it back to StateReplicate. +# The test verifies that the leader will move the follower back to StateReplicate +# regardless, since it is able to catch up the follower from the snapshot's index. # Turn off output during the setup of the test. log-level none @@ -123,11 +122,16 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/10 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=10 next=11 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/10 Commit:12 Entries:[1/11 EntryNormal "", 1/12 EntryNormal "\"foo\""] -# 3 is still pending a snapshot, even though its match is 10 which -# the leader could catch the follower up from. +# 3 is in StateReplicate thanks to receiving the snapshot at index 10. +# This is despite its PendingSnapshot having been 12. status 1 ---- 1: StateReplicate match=12 next=13 2: StateReplicate match=12 next=13 -3: StateSnapshot match=10 next=11 paused pendingSnap=12 +3: StateReplicate match=10 next=13 inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 5948fadf..92216f22 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -42,11 +42,23 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + // PendingSnapshot is used in StateSnapshot and tracks the last index of the + // leader at the time at which it realized a snapshot was necessary. This + // matches the index in the MsgSnap message emitted from raft. + // + // While there is a pending snapshot, replication to the follower is paused. + // The follower will transition back to StateReplicate if the leader + // receives an MsgAppResp from it at or above PendingSnapshot, or if + // ResumeReplicateBelowPendingSnapshot is enabled, one that reconnects the follower to the + // leader's log (such an MsgAppResp is emitted when the follower applies a + // snapshot). + // + // The follower will transition to StateProbe if ReportSnapshot is called on + // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the + // basis for the next attempt to append. In practice, the first mechanism is + // the one that is relevant in most cases. However, if this MsgAppResp is + // lost (fallible network) then the second mechanism ensures that in this + // case the follower does not erroneously remain in StateSnapshot. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages