From 5e2260fc30a0aa272a945e3072c0d35fb8fbc8d1 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 17 Nov 2023 11:48:06 +0000 Subject: [PATCH 1/2] Document leader ignoring valid MsgAppResp from follower after snapshot This adds a test that documents the following behavior: It the leader is tracking a follower as StateSnapshot with PendingSnapshot equal to, say, 100, and the follower applies a snapshot at a lower index that reconnects the follower to the leader's log, then the leader will still ignore this snapshot. Signed-off-by: Erik Grinaker Signed-off-by: Tobias Grieger --- rafttest/interaction_env_handler.go | 8 +- .../interaction_env_handler_deliver_msgs.go | 21 ++- ...ction_env_handler_process_append_thread.go | 11 +- .../interaction_env_handler_send_snapshot.go | 50 ++++++ rafttest/interaction_env_handler_stabilize.go | 10 +- .../snapshot_succeed_via_app_resp_behind.txt | 161 ++++++++++++++++++ 6 files changed, 244 insertions(+), 17 deletions(-) create mode 100644 rafttest/interaction_env_handler_send_snapshot.go create mode 100644 testdata/snapshot_succeed_via_app_resp_behind.txt diff --git a/rafttest/interaction_env_handler.go b/rafttest/interaction_env_handler.go index 30b83b8f..2ffec2b4 100644 --- a/rafttest/interaction_env_handler.go +++ b/rafttest/interaction_env_handler.go @@ -60,7 +60,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // // Example: // - // deliver-msgs + // deliver-msgs type=MsgApp drop=(2,3) err = env.handleDeliverMsgs(t, d) case "process-ready": // Example: @@ -149,6 +149,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // // forget-leader 1 err = env.handleForgetLeader(t, d) + case "send-snapshot": + // Sends a snapshot to a node. Takes the source and destination node. + // The message will be queued, but not delivered automatically. + // + // Example: send-snapshot 1 3 + env.handleSendSnapshot(t, d) case "propose": // Propose an entry. // diff --git a/rafttest/interaction_env_handler_deliver_msgs.go b/rafttest/interaction_env_handler_deliver_msgs.go index 671e0539..af8534cb 100644 --- a/rafttest/interaction_env_handler_deliver_msgs.go +++ b/rafttest/interaction_env_handler_deliver_msgs.go @@ -26,6 +26,7 @@ import ( ) func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error { + var typ raftpb.MessageType = -1 var rs []Recipient for _, arg := range d.CmdArgs { if len(arg.Vals) == 0 { @@ -50,11 +51,19 @@ func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData t.Fatalf("can't both deliver and drop msgs to %d", id) } rs = append(rs, Recipient{ID: id, Drop: true}) + case "type": + var s string + arg.Scan(t, i, &s) + v, ok := raftpb.MessageType_value[s] + if !ok { + t.Fatalf("unknown message type %s", s) + } + typ = raftpb.MessageType(v) } } } - if n := env.DeliverMsgs(rs...); n == 0 { + if n := env.DeliverMsgs(typ, rs...); n == 0 { env.Output.WriteString("no messages\n") } return nil @@ -66,14 +75,14 @@ type Recipient struct { } // DeliverMsgs goes through env.Messages and, depending on the Drop flag, -// delivers or drops messages to the specified Recipients. Returns the -// number of messages handled (i.e. delivered or dropped). A handled message -// is removed from env.Messages. -func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int { +// delivers or drops messages to the specified Recipients. Only messages of type +// typ are delivered (-1 for all types). Returns the number of messages handled +// (i.e. delivered or dropped). A handled message is removed from env.Messages. +func (env *InteractionEnv) DeliverMsgs(typ raftpb.MessageType, rs ...Recipient) int { var n int for _, r := range rs { var msgs []raftpb.Message - msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop) + msgs, env.Messages = splitMsgs(env.Messages, r.ID, typ, r.Drop) n += len(msgs) for _, msg := range msgs { if r.Drop { diff --git a/rafttest/interaction_env_handler_process_append_thread.go b/rafttest/interaction_env_handler_process_append_thread.go index 38c16f25..41363210 100644 --- a/rafttest/interaction_env_handler_process_append_thread.go +++ b/rafttest/interaction_env_handler_process_append_thread.go @@ -15,6 +15,7 @@ package rafttest import ( + "errors" "fmt" "testing" @@ -86,13 +87,11 @@ func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftp return err } } - if err := s.Append(ents); err != nil { - return err - } if !raft.IsEmptySnap(snap) { - if err := s.ApplySnapshot(snap); err != nil { - return err + if len(ents) > 0 { + return errors.New("can't apply snapshot and entries at the same time") } + return s.ApplySnapshot(snap) } - return nil + return s.Append(ents) } diff --git a/rafttest/interaction_env_handler_send_snapshot.go b/rafttest/interaction_env_handler_send_snapshot.go new file mode 100644 index 00000000..c91d4cc8 --- /dev/null +++ b/rafttest/interaction_env_handler_send_snapshot.go @@ -0,0 +1,50 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" + + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +func (env *InteractionEnv) handleSendSnapshot(t *testing.T, d datadriven.TestData) error { + idxs := nodeIdxs(t, d) + require.Len(t, idxs, 2) + return env.SendSnapshot(idxs[0], idxs[1]) +} + +// SendSnapshot sends a snapshot. +func (env *InteractionEnv) SendSnapshot(fromIdx, toIdx int) error { + snap, err := env.Nodes[fromIdx].Snapshot() + if err != nil { + return err + } + from, to := uint64(fromIdx+1), uint64(toIdx+1) + msg := raftpb.Message{ + Type: raftpb.MsgSnap, + Term: env.Nodes[fromIdx].BasicStatus().Term, + From: from, + To: to, + Snapshot: &snap, + } + env.Messages = append(env.Messages, msg) + _, _ = env.Output.WriteString(raft.DescribeMessage(msg, nil)) + return nil +} diff --git a/rafttest/interaction_env_handler_stabilize.go b/rafttest/interaction_env_handler_stabilize.go index 084022ab..56ae2e1e 100644 --- a/rafttest/interaction_env_handler_stabilize.go +++ b/rafttest/interaction_env_handler_stabilize.go @@ -76,9 +76,9 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error { id := rn.Status().ID // NB: we grab the messages just to see whether to print the header. // DeliverMsgs will do it again. - if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 { + if msgs, _ := splitMsgs(env.Messages, id, -1 /* typ */, false /* drop */); len(msgs) > 0 { fmt.Fprintf(env.Output, "> %d receiving messages\n", id) - env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) }) + env.withIndent(func() { env.DeliverMsgs(-1 /* typ */, Recipient{ID: id}) }) done = false } } @@ -112,10 +112,12 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error { } } -func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { +// splitMsgs extracts messages for the given recipient of the given type (-1 for +// all types) from msgs, and returns them along with the remainder of msgs. +func splitMsgs(msgs []raftpb.Message, to uint64, typ raftpb.MessageType, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { // NB: this method does not reorder messages. for _, msg := range msgs { - if msg.To == to && !(drop && isLocalMsg(msg)) { + if msg.To == to && !(drop && isLocalMsg(msg)) && (typ < 0 || msg.Type == typ) { toMsgs = append(toMsgs, msg) } else { rmdr = append(rmdr, msg) diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt new file mode 100644 index 00000000..f40336de --- /dev/null +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -0,0 +1,161 @@ +# This is a variant of snapshot_succeed_via_app_resp in which the snapshot +# that is being sent is behind the PendingSnapshot index tracked by the leader. +# +# In this case, the leader keeps the follower in StateSnapshot even though it +# could move it back to StateReplicate. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with three nodes, but the third is disconnected from the log. +add-nodes 2 voters=(1,2,3) index=10 +---- +ok + +add-nodes 1 voters=(1,2,3) index=5 +---- +ok + +# Elect 1 as leader. We first stabilize 3 to process the vote, then stabilize 1 +# and 2 to complete the leader election. We don't stabilize 3 after the +# election, so that it does not receive and process any MsgApp yet. +campaign 1 +---- +ok + +process-ready 1 +---- +ok + +stabilize 3 +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +# We now have a leader at index 11 (it appended an empty entry when elected). 3 +# is still at index 5, and has not received any MsgApp from the leader yet. +raft-state +---- +1: StateLeader (Voter) Term:1 Lead:1 +2: StateFollower (Voter) Term:1 Lead:1 +3: StateFollower (Voter) Term:1 Lead:0 + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateProbe match=0 next=11 paused inactive + +raft-log 3 +---- +log is empty: first index=6, last index=5 + +# Send a manual snapshot from 1 to 3, which will be at index 11. This snapshot +# does not move 3 to StateSnapshot. +send-snapshot 1 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Propose and commit an additional entry, which makes the leader's +# last index 12, beyond the snapshot it sent at index 11. +log-level none +---- +ok + +propose 1 "foo" +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateProbe match=0 next=11 paused inactive + +# 3 now gets the first MsgApp the leader originally sent, trying to append entry +# 11 but this is rejected because the follower's log started at index 5. +deliver-msgs 3 type=MsgApp +---- +1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1 + +# Note below that the RejectionHint is 5, which is below the first index 10 of the +# leader. Once the leader receives this, it will move 3 into StateSnapshot with +# PendingSnapshot=lastIndex=12. +process-ready 3 +---- +Ready MustSync=false: +Lead:1 State:StateFollower +Messages: +3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + +# 3 receives and applies the snapshot, but doesn't respond with MsgAppResp yet. +deliver-msgs 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false +INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1] +INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] + + +# 1 sees the MsgApp rejection and asks for a snapshot at index 12 (which is 1's +# current last index). +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 + DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] + DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot +# at index 11. +stabilize 3 +---- +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:11 + Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + Messages: + 3->1 MsgAppResp Term:1 Log:0/11 + +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/11 + +# 3 is still pending a snapshot, even though its match is 11 which +# the leader could catch the follower up from. +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateSnapshot match=11 next=12 paused pendingSnap=12 From d87942f571dc8f37a3b7d614bfe048e20dacd780 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 17 Nov 2023 11:51:39 +0000 Subject: [PATCH 2/2] Accept any snapshot that allows replication A leader will not take into account snapshots reported by a follower unless they match or exceed the tracked PendingSnapshot index (which is the leader's last indexat the time of requesting the snapshot). This is too inflexible: the leader should take into account any snapshot that reconnects the follower to its log. This PR makes that change. In doing so, it addresses long-standing problems that we've encountered in CockroachDB. Unless you create the snapshot immediately and locally when raft emits an MsgSnap, it's difficult/impossible to later synthesize a snapshot at the requested index. It is possible to get one above the requested index which raft always accepted, but CockroachDB delegates snapshots to followers who might be behind on applying the log, and it is awkward to have to wait for log application to send the snapshot just to satisfy an overly strict condition in raft. Additionally, CockroachDB also sends snapshots preemptively when adding a new replica since there are qualitative differences between an initial snapshot and one needed to reconnect to the log and one does not want to wait for raft to round-trip to the follower to realize that a snapshot is needed. In this case, the sent snapshot is commonly behind the PendingSnapshot since the leader transitions the follower into StateProbe when a snapshot is already in flight. Touches https://github.com/cockroachdb/cockroach/issues/84242. Touches https://github.com/cockroachdb/cockroach/issues/87553. Touches https://github.com/cockroachdb/cockroach/pull/87554. Touches https://github.com/cockroachdb/cockroach/issues/97971. Touches https://github.com/cockroachdb/cockroach/issues/114349. See also https://github.com/cockroachdb/cockroach/blob/2b91c3829270eb512c5380201c26a3d838fc567a/pkg/kv/kvserver/raft_snapshot_queue.go#L131-L143. Signed-off-by: Erik Grinaker Signed-off-by: Tobias Grieger --- raft.go | 15 +++++------ .../snapshot_succeed_via_app_resp_behind.txt | 14 +++++----- tracker/progress.go | 26 +++++++++++++++---- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/raft.go b/raft.go index cb2c4f7d..5a150562 100644 --- a/raft.go +++ b/raft.go @@ -1478,10 +1478,13 @@ func stepLeader(r *raft, m pb.Message) error { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: - // TODO(tbg): we should also enter this branch if a snapshot is - // received that is below pr.PendingSnapshot but which makes it - // possible to use the log again. + case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex(): + // Note that we don't take into account PendingSnapshot to + // enter this branch. No matter at which index a snapshot + // was actually applied, as long as this allows catching up + // the follower from the log, we will accept it. This gives + // systems more flexibility in how they implement snapshots; + // see the comments on PendingSnapshot. r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1560,10 +1563,6 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State != tracker.StateSnapshot { return nil } - // TODO(tbg): this code is very similar to the snapshot handling in - // MsgAppResp above. In fact, the code there is more correct than the - // code here and should likely be updated to match (or even better, the - // logic pulled into a newly created Progress state machine handler). if !m.Reject { pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index f40336de..09496982 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -1,8 +1,5 @@ # This is a variant of snapshot_succeed_via_app_resp in which the snapshot # that is being sent is behind the PendingSnapshot index tracked by the leader. -# -# In this case, the leader keeps the follower in StateSnapshot even though it -# could move it back to StateReplicate. # Turn off output during the setup of the test. log-level none @@ -151,11 +148,16 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""] -# 3 is still pending a snapshot, even though its match is 11 which -# the leader could catch the follower up from. +# 3 is in StateReplicate thanks to receiving the snapshot at index 11. +# This is despite its PendingSnapshot having been 12. status 1 ---- 1: StateReplicate match=12 next=13 2: StateReplicate match=12 next=13 -3: StateSnapshot match=11 next=12 paused pendingSnap=12 +3: StateReplicate match=11 next=13 inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 5948fadf..4002b1ec 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -42,11 +42,27 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + // PendingSnapshot is used in StateSnapshot and tracks the last index of the + // leader at the time at which it realized a snapshot was necessary. This + // matches the index in the MsgSnap message emitted from raft. + // + // While there is a pending snapshot, replication to the follower is paused. + // The follower will transition back to StateReplicate if the leader + // receives an MsgAppResp from it that reconnects the follower to the + // leader's log (such an MsgAppResp is emitted when the follower applies a + // snapshot). It may be surprising that PendingSnapshot is not taken into + // account here, but consider that complex systems may delegate the sending + // of snapshots to alternative datasources (i.e. not the leader). In such + // setups, it is difficult to manufacture a snapshot at a particular index + // requested by raft and the actual index may be ahead or behind. This + // should be okay, as long as the snapshot allows replication to resume. + // + // The follower will transition to StateProbe if ReportSnapshot is called on + // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the + // basis for the next attempt to append. In practice, the first mechanism is + // the one that is relevant in most cases. However, if this MsgAppResp is + // lost (fallible network) then the second mechanism ensures that in this + // case the follower does not erroneously remain in StateSnapshot. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages