diff --git a/raft.go b/raft.go index cb2c4f7d..5a150562 100644 --- a/raft.go +++ b/raft.go @@ -1478,10 +1478,13 @@ func stepLeader(r *raft, m pb.Message) error { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: - // TODO(tbg): we should also enter this branch if a snapshot is - // received that is below pr.PendingSnapshot but which makes it - // possible to use the log again. + case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex(): + // Note that we don't take into account PendingSnapshot to + // enter this branch. No matter at which index a snapshot + // was actually applied, as long as this allows catching up + // the follower from the log, we will accept it. This gives + // systems more flexibility in how they implement snapshots; + // see the comments on PendingSnapshot. r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1560,10 +1563,6 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State != tracker.StateSnapshot { return nil } - // TODO(tbg): this code is very similar to the snapshot handling in - // MsgAppResp above. In fact, the code there is more correct than the - // code here and should likely be updated to match (or even better, the - // logic pulled into a newly created Progress state machine handler). if !m.Reject { pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) diff --git a/rafttest/interaction_env_handler.go b/rafttest/interaction_env_handler.go index 30b83b8f..2ffec2b4 100644 --- a/rafttest/interaction_env_handler.go +++ b/rafttest/interaction_env_handler.go @@ -60,7 +60,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // // Example: // - // deliver-msgs + // deliver-msgs type=MsgApp drop=(2,3) err = env.handleDeliverMsgs(t, d) case "process-ready": // Example: @@ -149,6 +149,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // // forget-leader 1 err = env.handleForgetLeader(t, d) + case "send-snapshot": + // Sends a snapshot to a node. Takes the source and destination node. + // The message will be queued, but not delivered automatically. + // + // Example: send-snapshot 1 3 + env.handleSendSnapshot(t, d) case "propose": // Propose an entry. // diff --git a/rafttest/interaction_env_handler_deliver_msgs.go b/rafttest/interaction_env_handler_deliver_msgs.go index 671e0539..af8534cb 100644 --- a/rafttest/interaction_env_handler_deliver_msgs.go +++ b/rafttest/interaction_env_handler_deliver_msgs.go @@ -26,6 +26,7 @@ import ( ) func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error { + var typ raftpb.MessageType = -1 var rs []Recipient for _, arg := range d.CmdArgs { if len(arg.Vals) == 0 { @@ -50,11 +51,19 @@ func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData t.Fatalf("can't both deliver and drop msgs to %d", id) } rs = append(rs, Recipient{ID: id, Drop: true}) + case "type": + var s string + arg.Scan(t, i, &s) + v, ok := raftpb.MessageType_value[s] + if !ok { + t.Fatalf("unknown message type %s", s) + } + typ = raftpb.MessageType(v) } } } - if n := env.DeliverMsgs(rs...); n == 0 { + if n := env.DeliverMsgs(typ, rs...); n == 0 { env.Output.WriteString("no messages\n") } return nil @@ -66,14 +75,14 @@ type Recipient struct { } // DeliverMsgs goes through env.Messages and, depending on the Drop flag, -// delivers or drops messages to the specified Recipients. Returns the -// number of messages handled (i.e. delivered or dropped). A handled message -// is removed from env.Messages. -func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int { +// delivers or drops messages to the specified Recipients. Only messages of type +// typ are delivered (-1 for all types). Returns the number of messages handled +// (i.e. delivered or dropped). A handled message is removed from env.Messages. +func (env *InteractionEnv) DeliverMsgs(typ raftpb.MessageType, rs ...Recipient) int { var n int for _, r := range rs { var msgs []raftpb.Message - msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop) + msgs, env.Messages = splitMsgs(env.Messages, r.ID, typ, r.Drop) n += len(msgs) for _, msg := range msgs { if r.Drop { diff --git a/rafttest/interaction_env_handler_process_append_thread.go b/rafttest/interaction_env_handler_process_append_thread.go index 38c16f25..41363210 100644 --- a/rafttest/interaction_env_handler_process_append_thread.go +++ b/rafttest/interaction_env_handler_process_append_thread.go @@ -15,6 +15,7 @@ package rafttest import ( + "errors" "fmt" "testing" @@ -86,13 +87,11 @@ func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftp return err } } - if err := s.Append(ents); err != nil { - return err - } if !raft.IsEmptySnap(snap) { - if err := s.ApplySnapshot(snap); err != nil { - return err + if len(ents) > 0 { + return errors.New("can't apply snapshot and entries at the same time") } + return s.ApplySnapshot(snap) } - return nil + return s.Append(ents) } diff --git a/rafttest/interaction_env_handler_send_snapshot.go b/rafttest/interaction_env_handler_send_snapshot.go new file mode 100644 index 00000000..c91d4cc8 --- /dev/null +++ b/rafttest/interaction_env_handler_send_snapshot.go @@ -0,0 +1,50 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" + + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +func (env *InteractionEnv) handleSendSnapshot(t *testing.T, d datadriven.TestData) error { + idxs := nodeIdxs(t, d) + require.Len(t, idxs, 2) + return env.SendSnapshot(idxs[0], idxs[1]) +} + +// SendSnapshot sends a snapshot. +func (env *InteractionEnv) SendSnapshot(fromIdx, toIdx int) error { + snap, err := env.Nodes[fromIdx].Snapshot() + if err != nil { + return err + } + from, to := uint64(fromIdx+1), uint64(toIdx+1) + msg := raftpb.Message{ + Type: raftpb.MsgSnap, + Term: env.Nodes[fromIdx].BasicStatus().Term, + From: from, + To: to, + Snapshot: &snap, + } + env.Messages = append(env.Messages, msg) + _, _ = env.Output.WriteString(raft.DescribeMessage(msg, nil)) + return nil +} diff --git a/rafttest/interaction_env_handler_stabilize.go b/rafttest/interaction_env_handler_stabilize.go index 084022ab..56ae2e1e 100644 --- a/rafttest/interaction_env_handler_stabilize.go +++ b/rafttest/interaction_env_handler_stabilize.go @@ -76,9 +76,9 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error { id := rn.Status().ID // NB: we grab the messages just to see whether to print the header. // DeliverMsgs will do it again. - if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 { + if msgs, _ := splitMsgs(env.Messages, id, -1 /* typ */, false /* drop */); len(msgs) > 0 { fmt.Fprintf(env.Output, "> %d receiving messages\n", id) - env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) }) + env.withIndent(func() { env.DeliverMsgs(-1 /* typ */, Recipient{ID: id}) }) done = false } } @@ -112,10 +112,12 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error { } } -func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { +// splitMsgs extracts messages for the given recipient of the given type (-1 for +// all types) from msgs, and returns them along with the remainder of msgs. +func splitMsgs(msgs []raftpb.Message, to uint64, typ raftpb.MessageType, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { // NB: this method does not reorder messages. for _, msg := range msgs { - if msg.To == to && !(drop && isLocalMsg(msg)) { + if msg.To == to && !(drop && isLocalMsg(msg)) && (typ < 0 || msg.Type == typ) { toMsgs = append(toMsgs, msg) } else { rmdr = append(rmdr, msg) diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt new file mode 100644 index 00000000..09496982 --- /dev/null +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -0,0 +1,163 @@ +# This is a variant of snapshot_succeed_via_app_resp in which the snapshot +# that is being sent is behind the PendingSnapshot index tracked by the leader. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with three nodes, but the third is disconnected from the log. +add-nodes 2 voters=(1,2,3) index=10 +---- +ok + +add-nodes 1 voters=(1,2,3) index=5 +---- +ok + +# Elect 1 as leader. We first stabilize 3 to process the vote, then stabilize 1 +# and 2 to complete the leader election. We don't stabilize 3 after the +# election, so that it does not receive and process any MsgApp yet. +campaign 1 +---- +ok + +process-ready 1 +---- +ok + +stabilize 3 +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +# We now have a leader at index 11 (it appended an empty entry when elected). 3 +# is still at index 5, and has not received any MsgApp from the leader yet. +raft-state +---- +1: StateLeader (Voter) Term:1 Lead:1 +2: StateFollower (Voter) Term:1 Lead:1 +3: StateFollower (Voter) Term:1 Lead:0 + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateProbe match=0 next=11 paused inactive + +raft-log 3 +---- +log is empty: first index=6, last index=5 + +# Send a manual snapshot from 1 to 3, which will be at index 11. This snapshot +# does not move 3 to StateSnapshot. +send-snapshot 1 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Propose and commit an additional entry, which makes the leader's +# last index 12, beyond the snapshot it sent at index 11. +log-level none +---- +ok + +propose 1 "foo" +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateProbe match=0 next=11 paused inactive + +# 3 now gets the first MsgApp the leader originally sent, trying to append entry +# 11 but this is rejected because the follower's log started at index 5. +deliver-msgs 3 type=MsgApp +---- +1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1 + +# Note below that the RejectionHint is 5, which is below the first index 10 of the +# leader. Once the leader receives this, it will move 3 into StateSnapshot with +# PendingSnapshot=lastIndex=12. +process-ready 3 +---- +Ready MustSync=false: +Lead:1 State:StateFollower +Messages: +3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + +# 3 receives and applies the snapshot, but doesn't respond with MsgAppResp yet. +deliver-msgs 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false +INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1] +INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] + + +# 1 sees the MsgApp rejection and asks for a snapshot at index 12 (which is 1's +# current last index). +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 + DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] + DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot +# at index 11. +stabilize 3 +---- +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:11 + Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + Messages: + 3->1 MsgAppResp Term:1 Log:0/11 + +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/11 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""] + +# 3 is in StateReplicate thanks to receiving the snapshot at index 11. +# This is despite its PendingSnapshot having been 12. +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateReplicate match=11 next=13 inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 5948fadf..4002b1ec 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -42,11 +42,27 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + // PendingSnapshot is used in StateSnapshot and tracks the last index of the + // leader at the time at which it realized a snapshot was necessary. This + // matches the index in the MsgSnap message emitted from raft. + // + // While there is a pending snapshot, replication to the follower is paused. + // The follower will transition back to StateReplicate if the leader + // receives an MsgAppResp from it that reconnects the follower to the + // leader's log (such an MsgAppResp is emitted when the follower applies a + // snapshot). It may be surprising that PendingSnapshot is not taken into + // account here, but consider that complex systems may delegate the sending + // of snapshots to alternative datasources (i.e. not the leader). In such + // setups, it is difficult to manufacture a snapshot at a particular index + // requested by raft and the actual index may be ahead or behind. This + // should be okay, as long as the snapshot allows replication to resume. + // + // The follower will transition to StateProbe if ReportSnapshot is called on + // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the + // basis for the next attempt to append. In practice, the first mechanism is + // the one that is relevant in most cases. However, if this MsgAppResp is + // lost (fallible network) then the second mechanism ensures that in this + // case the follower does not erroneously remain in StateSnapshot. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages