Skip to content

Commit

Permalink
Document leader ignoring valid MsgAppResp from follower after snapshot
Browse files Browse the repository at this point in the history
This adds a test that documents the following behavior:

It the leader is tracking a follower as StateSnapshot with PendingSnapshot equal
to, say, 100, and the follower applies a snapshot at a lower index that
reconnects the follower to the leader's log, then the leader will still ignore
this snapshot.

Signed-off-by: Erik Grinaker <[email protected]>
Signed-off-by: Tobias Grieger <[email protected]>
  • Loading branch information
tbg authored and erikgrinaker committed Nov 17, 2023
1 parent 0511a18 commit 27f6ebc
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 14 deletions.
8 changes: 7 additions & 1 deletion rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// Example:
//
// deliver-msgs <idx>
// deliver-msgs <idx> type=MsgApp drop=3
err = env.handleDeliverMsgs(t, d)
case "process-ready":
// Example:
Expand Down Expand Up @@ -149,6 +149,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// forget-leader 1
err = env.handleForgetLeader(t, d)
case "send-snapshot":
// Sends a snapshot to a node. Takes the source and destination node.
// The message will be queued, but not delivered automatically.
//
// Example: send-snapshot 1 3
env.handleSendSnapshot(t, d)
case "propose":
// Propose an entry.
//
Expand Down
21 changes: 15 additions & 6 deletions rafttest/interaction_env_handler_deliver_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error {
var typ raftpb.MessageType = -1
var rs []Recipient
for _, arg := range d.CmdArgs {
if len(arg.Vals) == 0 {
Expand All @@ -50,11 +51,19 @@ func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData
t.Fatalf("can't both deliver and drop msgs to %d", id)
}
rs = append(rs, Recipient{ID: id, Drop: true})
case "type":
var s string
arg.Scan(t, i, &s)
v, ok := raftpb.MessageType_value[s]
if !ok {
t.Fatalf("unknown message type %s", s)
}
typ = raftpb.MessageType(v)
}
}
}

if n := env.DeliverMsgs(rs...); n == 0 {
if n := env.DeliverMsgs(typ, rs...); n == 0 {
env.Output.WriteString("no messages\n")
}
return nil
Expand All @@ -66,14 +75,14 @@ type Recipient struct {
}

// DeliverMsgs goes through env.Messages and, depending on the Drop flag,
// delivers or drops messages to the specified Recipients. Returns the
// number of messages handled (i.e. delivered or dropped). A handled message
// is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int {
// delivers or drops messages to the specified Recipients. Only messages of type
// typ are delivered (-1 for all types). Returns the number of messages handled
// (i.e. delivered or dropped). A handled message is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(typ raftpb.MessageType, rs ...Recipient) int {
var n int
for _, r := range rs {
var msgs []raftpb.Message
msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop)
msgs, env.Messages = splitMsgs(env.Messages, r.ID, typ, r.Drop)
n += len(msgs)
for _, msg := range msgs {
if r.Drop {
Expand Down
9 changes: 6 additions & 3 deletions rafttest/interaction_env_handler_process_append_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rafttest

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -86,13 +87,15 @@ func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftp
return err
}
}
if err := s.Append(ents); err != nil {
return err
}
if !raft.IsEmptySnap(snap) {
if len(ents) > 0 {
return errors.New("can't apply snapshot and entries at the same time")
}
if err := s.ApplySnapshot(snap); err != nil {
return err
}
} else if err := s.Append(ents); err != nil {
return err
}
return nil
}
49 changes: 49 additions & 0 deletions rafttest/interaction_env_handler_send_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"github.com/stretchr/testify/require"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
"testing"

"github.com/cockroachdb/datadriven"
)

func (env *InteractionEnv) handleSendSnapshot(t *testing.T, d datadriven.TestData) error {
idxs := nodeIdxs(t, d)
require.Len(t, idxs, 2)
return env.SendSnapshot(t, idxs[0], idxs[1])
}

// SendSnapshot sends a snapshot.
func (env *InteractionEnv) SendSnapshot(t *testing.T, fromIdx, toIdx int) error {
snap, err := env.Nodes[fromIdx].Snapshot()
if err != nil {
return err
}
from, to := uint64(fromIdx+1), uint64(toIdx+1)
msg := raftpb.Message{
Type: raftpb.MsgSnap,
Term: env.Nodes[fromIdx].BasicStatus().Term,
From: from,
To: to,
Snapshot: &snap,
}
env.Messages = append(env.Messages, msg)
_, _ = env.Output.WriteString(raft.DescribeMessage(msg, nil))
return nil
}
10 changes: 6 additions & 4 deletions rafttest/interaction_env_handler_stabilize.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
id := rn.Status().ID
// NB: we grab the messages just to see whether to print the header.
// DeliverMsgs will do it again.
if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 {
if msgs, _ := splitMsgs(env.Messages, id, -1 /* typ */, false /* drop */); len(msgs) > 0 {
fmt.Fprintf(env.Output, "> %d receiving messages\n", id)
env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) })
env.withIndent(func() { env.DeliverMsgs(-1 /* typ */, Recipient{ID: id}) })
done = false
}
}
Expand Down Expand Up @@ -112,10 +112,12 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
}
}

func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// splitMsgs extracts messages for the given recipient of the given type (-1 for
// all types) from msgs, and returns them along with the remainder of msgs.
func splitMsgs(msgs []raftpb.Message, to uint64, typ raftpb.MessageType, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// NB: this method does not reorder messages.
for _, msg := range msgs {
if msg.To == to && !(drop && isLocalMsg(msg)) {
if msg.To == to && !(drop && isLocalMsg(msg)) && (typ < 0 || msg.Type == typ) {
toMsgs = append(toMsgs, msg)
} else {
rmdr = append(rmdr, msg)
Expand Down
133 changes: 133 additions & 0 deletions testdata/snapshot_succeed_via_app_resp_behind.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# This is a variant of snapshot_succeed_via_app_resp in which the snapshot
# that is being sent is behind the PendingSnapshot index tracked by the leader.
#
# In this case, the leader keeps the follower in StateSnapshot even though it
# could move it back to StateReplicate.

# Turn off output during the setup of the test.
log-level none
----
ok

# Start with three nodes, but the third is disconnected from the log.
add-nodes 2 voters=(1,2,3) index=10
----
ok

add-nodes 1 voters=(1,2,3) index=5
----
ok

campaign 1
----
ok

process-ready 1
----
ok

stabilize 2 3
----
ok

# Now we have a leader, but it hasn't appended the empty entry yet.
# Initiate a snapshot from 1 to 3, which will be at the initial index, 10.
send-snapshot 1 3
----
ok

# 1 and 2 commit the empty entry.
stabilize 1 2
----
ok

# They also commit an additional entry. The leader's last index
# is then strictly non-adjacent to index 10 (of the snapshot).
propose 1 "foo"
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateProbe match=0 next=11 paused inactive

# n3 gets the first MsgApp the leader originally sent, trying to append entry
# 11 but this is rejected because the follower's log would start at index 5.
deliver-msgs 3 type=MsgApp
----
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1

# Note that the RejectionHint is 5, which is below the first index 10 of the
# leader. Once the leader receives this, it will move 3 into StateSnapshot
# with PendingSnapshot=lastIndex=12.
process-ready 3
----
Ready MustSync=false:
Lead:1 State:StateFollower
Messages:
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)

# 3 receives the snapshot, but doesn't handle it yet.
deliver-msgs 3
----
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 10, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 10, lastindex: 10, lastterm: 1] restored snapshot [index: 10, term: 1]
INFO 3 [commit: 10] restored snapshot [index: 10, term: 1]


# 1 sees the rejection and asks for a snapshot at index 12 (which is 1's current
# last index).
stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)
DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6]
DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy.
deliver-msgs drop=(3)
----
dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot
# at index 10.
stabilize 3
----
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:10
Snapshot Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
Messages:
3->1 MsgAppResp Term:1 Log:0/10

stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/10

# 3 is still pending a snapshot, even though its match is 10 which
# the leader could catch the follower up from.
status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateSnapshot match=10 next=11 paused pendingSnap=12

0 comments on commit 27f6ebc

Please sign in to comment.