Skip to content

Commit

Permalink
tmp: refactor maybeSendAppend
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 25, 2024
1 parent 8e5b410 commit 3464994
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 222 deletions.
3 changes: 1 addition & 2 deletions confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id u
// at all (and will thus likely need a snapshot), though the app may
// have applied a snapshot out of band before adding the replica (thus
// making the first index the better choice).
Next: c.LastIndex,
Match: 0,
Watermark: tracker.Watermark{Next: c.LastIndex},
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
Expand Down
218 changes: 121 additions & 97 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,85 +586,121 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
// maybeSendAppend sends an append RPC with new entries and commit index to the
// given peer, if necessary. Returns true if a message was sent.
//
// May send an empty message, to convey an update Commit index.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if to == r.id {
return false
}
if st := pr.State; st != tracker.StateProbe && st != tracker.StateReplicate {
return false
} else if st == tracker.StateProbe && pr.Pause {
return false
}

lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)

var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
prevTerm, err := r.raftLog.term(pr.Next - 1)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.sendSnapshot(to, pr)
}
if pr.State == tracker.StateProbe { // !pr.Pause
pr.Pause = true
// r.logger.Info(to, "cw probe = true")
return r.sendAppend(to, pr, prevTerm)
}
// StateReplicate
repl := (*tracker.ProgressReplicate)(pr)

if len(ents) == 0 && !sendIfEmpty {
lastIndex, commit := r.raftLog.lastIndex(), r.raftLog.committed
if repl.UpToDate(lastIndex, commit) {
// r.logger.Info(to, "up to date")
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
if !repl.IsThrottled(lastIndex) {
// r.logger.Info(to, "regular append")
return r.sendAppend(to, pr, prevTerm)
}
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if all the inflight messages are dropped, replication to that
// follower stalls. We send an empty MsgApp periodically, so that eventually
// it reaches the follower, and the latter acks or rejects it. The Pause flag
// is reset by a HeartbeatResp message, which is guaranteed if the follower is
// connected.
//
// Also, ensure sending an empty MsgApp if the follower's commit index can be
// moved forward.
if !repl.Pause || repl.ShouldSendCommit(commit) {
return r.sendMsgAppPing(to, pr, prevTerm)
}
return false
}

func (r *raft) sendMsgAppPing(to uint64, pr *tracker.Progress, prevTerm uint64) bool {
commit := r.raftLog.committed
// r.logger.Info("send probe for", to)
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: pr.Next - 1,
LogTerm: prevTerm,
Commit: commit,
})
pr.Pause = true
pr.Commit.Sent(commit)
pr.Commit.Pause = true
return true
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
func (r *raft) sendAppend(to uint64, pr *tracker.Progress, prevTerm uint64) bool {
entries, err := r.raftLog.entries(pr.Next, r.maxMsgSize)
if err != nil { // send snapshot if we failed to get entries
return r.sendSnapshot(to, pr)
}
// NB: pr has been updated, but we make sure to only use its old values below.
prevIndex := pr.Next - 1
pr.UpdateOnEntriesSend(len(entries), uint64(payloadsSize(entries)))
commit := r.raftLog.committed
pr.Commit.Sent(commit)
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: lastIndex,
LogTerm: lastTerm,
Entries: ents,
Commit: r.raftLog.committed,
Index: prevIndex,
LogTerm: prevTerm,
Entries: entries,
Commit: commit,
})
return true
}

func (r *raft) sendSnapshot(to uint64, pr *tracker.Progress) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
Expand All @@ -687,11 +723,8 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
r.maybeSendAppend(id, pr)
})
}

Expand Down Expand Up @@ -773,8 +806,7 @@ func (r *raft) reset(term uint64) {
r.trk.ResetVotes()
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
Watermark: tracker.Watermark{Next: r.raftLog.lastIndex() + 1},
Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes),
IsLearner: pr.IsLearner,
}
Expand Down Expand Up @@ -1462,11 +1494,10 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From, pr)
}
} else {
oldPaused := pr.IsPaused()
pr.UpdateCommit(m.Commit)
pr.Commit.Update(m.Commit)
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand All @@ -1478,6 +1509,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
// r.logger.Info("become replicate", m.From)
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
// Note that we don't take into account PendingSnapshot to
Expand All @@ -1499,13 +1531,11 @@ func stepLeader(r *raft, m pb.Message) error {
}

if r.maybeCommit() {
// r.logger.Info("commit", m.From)
// committed index has progressed for the term, so it is safe
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused && r.id != m.From && pr.Commit < r.raftLog.committed {
// The node is potentially missing the latest commit index. Send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
Expand All @@ -1514,7 +1544,8 @@ func stepLeader(r *raft, m pb.Message) error {
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
// r.logger.Info("iterate", m.From)
for r.maybeSendAppend(m.From, pr) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1526,7 +1557,8 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false
pr.Pause = false
pr.Commit.Pause = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
Expand All @@ -1541,9 +1573,7 @@ func stepLeader(r *raft, m pb.Message) error {
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
r.maybeSendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1576,7 +1606,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.Pause = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1613,7 +1643,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
r.maybeSendAppend(leadTransferee, pr)
}
}
return nil
Expand Down Expand Up @@ -1947,21 +1977,15 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
8 changes: 4 additions & 4 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].Pause {
t.Errorf("Pause = %v, want true", sm.trk.Progress[2].Pause)
}
}

Expand All @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].Pause {
t.Errorf("Pause = %v, want true", sm.trk.Progress[2].Pause)
}
}

Expand Down
Loading

0 comments on commit 3464994

Please sign in to comment.