Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept any snapshot that allows replication #110

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,10 +1478,13 @@ func stepLeader(r *raft, m pb.Message) error {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
// TODO(tbg): we should also enter this branch if a snapshot is
// received that is below pr.PendingSnapshot but which makes it
// possible to use the log again.
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, it can also be (maybe should?) pr.Next >= r.raftLog.firstIndex().

Next is the next index to send. Indices between Match+1 and Next-1 are in-flight (by invariant which we haven't formalized yet), so don't necessarily need to be present in the log.

The MaybeUpdate() call 4 lines above should set Next to be at least Match+1, so this should be safe.

Feel free to leave this for #124.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it for #124. Semantically, Match+1 seems better since that's where the leader's and follower's logs match, but they're equivalent here and I don't have a particularly strong opinion either way.

// Note that we don't take into account PendingSnapshot to
// enter this branch. No matter at which index a snapshot
// was actually applied, as long as this allows catching up
// the follower from the log, we will accept it. This gives
// systems more flexibility in how they implement snapshots;
// see the comments on PendingSnapshot.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
Expand Down Expand Up @@ -1560,10 +1563,6 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State != tracker.StateSnapshot {
return nil
}
// TODO(tbg): this code is very similar to the snapshot handling in
// MsgAppResp above. In fact, the code there is more correct than the
// code here and should likely be updated to match (or even better, the
// logic pulled into a newly created Progress state machine handler).
Comment on lines -1563 to -1566
Copy link
Contributor

@pav-kv pav-kv Nov 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO wasn't true, because the code is quite different (we don't check Match/Next indices here whatsoever, or communicate the Index of the snapshot that succeeded - we should!), but I think something still needs to be done here. We should be able to transition from StateSnapshot straight to StateReplicate if the conditions allow for it (see other comments: the condition is that the follower's Match is re-connected to the leader's log). And we should share the same transition code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't check Match/Next indices here whatsoever, or communicate the Index of the snapshot that succeeded - we should!

Yeah, I think any better logic here is predicated on passing in the snapshot index. See cockroachdb/cockroach#87583.

We should be able to transition from StateSnapshot straight to StateReplicate if the conditions allow for it (see other comments: the condition is that the follower's Match is re-connected to the leader's log).

Without the snapshot index being passed in, I don't think we can know the Match until we receive an MsgAppResp, so moving to StateProbe seems like the right thing to do until that happens.

if !m.Reject {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
Expand Down
8 changes: 7 additions & 1 deletion rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// Example:
//
// deliver-msgs <idx>
// deliver-msgs <idx> type=MsgApp drop=(2,3)
err = env.handleDeliverMsgs(t, d)
case "process-ready":
// Example:
Expand Down Expand Up @@ -149,6 +149,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// forget-leader 1
err = env.handleForgetLeader(t, d)
case "send-snapshot":
// Sends a snapshot to a node. Takes the source and destination node.
// The message will be queued, but not delivered automatically.
//
// Example: send-snapshot 1 3
env.handleSendSnapshot(t, d)
case "propose":
// Propose an entry.
//
Expand Down
21 changes: 15 additions & 6 deletions rafttest/interaction_env_handler_deliver_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error {
var typ raftpb.MessageType = -1
var rs []Recipient
for _, arg := range d.CmdArgs {
if len(arg.Vals) == 0 {
Expand All @@ -50,11 +51,19 @@ func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData
t.Fatalf("can't both deliver and drop msgs to %d", id)
}
rs = append(rs, Recipient{ID: id, Drop: true})
case "type":
var s string
arg.Scan(t, i, &s)
v, ok := raftpb.MessageType_value[s]
if !ok {
t.Fatalf("unknown message type %s", s)
}
typ = raftpb.MessageType(v)
}
}
}

if n := env.DeliverMsgs(rs...); n == 0 {
if n := env.DeliverMsgs(typ, rs...); n == 0 {
env.Output.WriteString("no messages\n")
}
return nil
Expand All @@ -66,14 +75,14 @@ type Recipient struct {
}

// DeliverMsgs goes through env.Messages and, depending on the Drop flag,
// delivers or drops messages to the specified Recipients. Returns the
// number of messages handled (i.e. delivered or dropped). A handled message
// is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int {
// delivers or drops messages to the specified Recipients. Only messages of type
// typ are delivered (-1 for all types). Returns the number of messages handled
// (i.e. delivered or dropped). A handled message is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(typ raftpb.MessageType, rs ...Recipient) int {
var n int
for _, r := range rs {
var msgs []raftpb.Message
msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop)
msgs, env.Messages = splitMsgs(env.Messages, r.ID, typ, r.Drop)
n += len(msgs)
for _, msg := range msgs {
if r.Drop {
Expand Down
11 changes: 5 additions & 6 deletions rafttest/interaction_env_handler_process_append_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rafttest

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -86,13 +87,11 @@ func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftp
return err
}
}
if err := s.Append(ents); err != nil {
return err
}
if !raft.IsEmptySnap(snap) {
if err := s.ApplySnapshot(snap); err != nil {
return err
if len(ents) > 0 {
return errors.New("can't apply snapshot and entries at the same time")
}
return s.ApplySnapshot(snap)
}
return nil
return s.Append(ents)
}
50 changes: 50 additions & 0 deletions rafttest/interaction_env_handler_send_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

func (env *InteractionEnv) handleSendSnapshot(t *testing.T, d datadriven.TestData) error {
idxs := nodeIdxs(t, d)
require.Len(t, idxs, 2)
return env.SendSnapshot(idxs[0], idxs[1])
}

// SendSnapshot sends a snapshot.
func (env *InteractionEnv) SendSnapshot(fromIdx, toIdx int) error {
snap, err := env.Nodes[fromIdx].Snapshot()
if err != nil {
return err
}
from, to := uint64(fromIdx+1), uint64(toIdx+1)
msg := raftpb.Message{
Type: raftpb.MsgSnap,
Term: env.Nodes[fromIdx].BasicStatus().Term,
From: from,
To: to,
Snapshot: &snap,
}
env.Messages = append(env.Messages, msg)
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved
_, _ = env.Output.WriteString(raft.DescribeMessage(msg, nil))
return nil
}
10 changes: 6 additions & 4 deletions rafttest/interaction_env_handler_stabilize.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
id := rn.Status().ID
// NB: we grab the messages just to see whether to print the header.
// DeliverMsgs will do it again.
if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 {
if msgs, _ := splitMsgs(env.Messages, id, -1 /* typ */, false /* drop */); len(msgs) > 0 {
fmt.Fprintf(env.Output, "> %d receiving messages\n", id)
env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) })
env.withIndent(func() { env.DeliverMsgs(-1 /* typ */, Recipient{ID: id}) })
done = false
}
}
Expand Down Expand Up @@ -112,10 +112,12 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
}
}

func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// splitMsgs extracts messages for the given recipient of the given type (-1 for
// all types) from msgs, and returns them along with the remainder of msgs.
func splitMsgs(msgs []raftpb.Message, to uint64, typ raftpb.MessageType, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// NB: this method does not reorder messages.
for _, msg := range msgs {
if msg.To == to && !(drop && isLocalMsg(msg)) {
if msg.To == to && !(drop && isLocalMsg(msg)) && (typ < 0 || msg.Type == typ) {
toMsgs = append(toMsgs, msg)
} else {
rmdr = append(rmdr, msg)
Expand Down
163 changes: 163 additions & 0 deletions testdata/snapshot_succeed_via_app_resp_behind.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# This is a variant of snapshot_succeed_via_app_resp in which the snapshot
# that is being sent is behind the PendingSnapshot index tracked by the leader.

# Turn off output during the setup of the test.
log-level none
----
ok

# Start with three nodes, but the third is disconnected from the log.
add-nodes 2 voters=(1,2,3) index=10
----
ok

add-nodes 1 voters=(1,2,3) index=5
----
ok

# Elect 1 as leader. We first stabilize 3 to process the vote, then stabilize 1
# and 2 to complete the leader election. We don't stabilize 3 after the
# election, so that it does not receive and process any MsgApp yet.
campaign 1
----
ok

process-ready 1
----
ok

stabilize 3
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

# We now have a leader at index 11 (it appended an empty entry when elected). 3
# is still at index 5, and has not received any MsgApp from the leader yet.
raft-state
----
1: StateLeader (Voter) Term:1 Lead:1
2: StateFollower (Voter) Term:1 Lead:1
3: StateFollower (Voter) Term:1 Lead:0

status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateProbe match=0 next=11 paused inactive

raft-log 3
----
log is empty: first index=6, last index=5

# Send a manual snapshot from 1 to 3, which will be at index 11. This snapshot
# does not move 3 to StateSnapshot.
send-snapshot 1 3
----
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Propose and commit an additional entry, which makes the leader's
# last index 12, beyond the snapshot it sent at index 11.
log-level none
----
ok

propose 1 "foo"
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateProbe match=0 next=11 paused inactive

# 3 now gets the first MsgApp the leader originally sent, trying to append entry
# 11 but this is rejected because the follower's log started at index 5.
deliver-msgs 3 type=MsgApp
----
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1

# Note below that the RejectionHint is 5, which is below the first index 10 of the
# leader. Once the leader receives this, it will move 3 into StateSnapshot with
# PendingSnapshot=lastIndex=12.
process-ready 3
----
Ready MustSync=false:
Lead:1 State:StateFollower
Messages:
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)

# 3 receives and applies the snapshot, but doesn't respond with MsgAppResp yet.
deliver-msgs 3
----
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1]
INFO 3 [commit: 11] restored snapshot [index: 11, term: 1]


# 1 sees the MsgApp rejection and asks for a snapshot at index 12 (which is 1's
# current last index).
stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)
DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6]
DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy.
deliver-msgs drop=(3)
----
dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot
# at index 11.
stabilize 3
----
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:11
Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
Messages:
3->1 MsgAppResp Term:1 Log:0/11

stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""]

# 3 is in StateReplicate thanks to receiving the snapshot at index 11.
# This is despite its PendingSnapshot having been 12.
status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateReplicate match=11 next=13 inflight=1
Loading
Loading