Skip to content

Commit

Permalink
sync2: do not run sync against already synced peers (#6551)
Browse files Browse the repository at this point in the history
## Motivation

Initiating sync against peers which are already in sync wastes network traffic and may cause unneeded database accesses. Before initiating any sync, the node probes its peers, and after the probe it can be immediately seen which nodes are in sync with this one.
  • Loading branch information
ivan4th committed Dec 18, 2024
1 parent 565d79b commit 471889f
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 71 deletions.
40 changes: 31 additions & 9 deletions sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type syncability struct {
splitSyncable []p2p.Peer
// Number of peers that are similar enough to this one for full sync
nearFullCount int
// Number of peers that are in sync with this one
inSyncCount int
}

type runner struct {
Expand Down Expand Up @@ -164,9 +166,6 @@ func NewMultiPeerReconciler(

func (mpr *MultiPeerReconciler) probePeers(ctx context.Context, syncPeers []p2p.Peer) (syncability, error) {
var s syncability
s.syncable = nil
s.splitSyncable = nil
s.nearFullCount = 0
type probeResult struct {
p p2p.Peer
rangesync.ProbeResult
Expand All @@ -183,13 +182,13 @@ func (mpr *MultiPeerReconciler) probePeers(ctx context.Context, syncPeers []p2p.
eg.Go(func() error {
mpr.logger.Debug("probe peer", zap.Stringer("peer", p))
pr, err := mpr.syncBase.Probe(ctx, p)
if err != nil {
mpr.logger.Warn("error probing the peer", zap.Any("peer", p), zap.Error(err))
if errors.Is(err, context.Canceled) {
return err
}
} else {
switch {
case err == nil:
probeCh <- probeResult{p, pr}
case errors.Is(err, context.Canceled):
return err
default:
mpr.logger.Warn("error probing the peer", zap.Any("peer", p), zap.Error(err))
}
return nil
})
Expand All @@ -204,11 +203,21 @@ func (mpr *MultiPeerReconciler) probePeers(ctx context.Context, syncPeers []p2p.
})

for pr := range probeCh {
if pr.InSync {
mpr.logger.Debug("peer already in sync",
zap.Stringer("peer", pr.p),
zap.Int("peerCount", pr.Count),
zap.Int("localCount", localCount))
s.inSyncCount++
continue
}

// We do not consider peers with substantially fewer items than the local
// set for active sync. It's these peers' responsibility to request sync
// against this node.
if pr.Count+mpr.cfg.MaxSyncDiff < localCount {
mpr.logger.Debug("skipping peer with low item count",
zap.Stringer("peer", pr.p),
zap.Int("peerCount", pr.Count),
zap.Int("localCount", localCount))
continue
Expand Down Expand Up @@ -315,11 +324,24 @@ func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool)
if err != nil {
return false, err
}
mpr.logger.Debug("probing peers done",
zap.Int("syncableCount", len(s.syncable)),
zap.Int("splitSyncableCount", len(s.splitSyncable)),
zap.Int("nearFullCount", s.nearFullCount),
zap.Int("inSyncCount", s.inSyncCount))
if len(s.syncable) != 0 {
break
}
}

// We try to sync against peers which are not in full sync with this one.
// If there are no such peers, but there are some which are in sync with
// us, we consider this node to be in sync.
if s.inSyncCount > 0 {
mpr.sl.NoteSync()
return true, nil
}

mpr.logger.Debug("no peers found, waiting", zap.Duration("duration", mpr.cfg.NoPeersRecheckInterval))
select {
case <-ctx.Done():
Expand Down
114 changes: 85 additions & 29 deletions sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,22 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Count().Return(50, nil).AnyTimes()
for i := 0; i < numSyncs; i++ {
plSplit := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.5, // too low for full sync
InSync: false,
Count: 100,
Sim: 0.5, // too low for full sync
})
mt.syncRunner.EXPECT().SplitSync(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, peers []p2p.Peer) error {
require.ElementsMatch(t, plSplit.get(), peers)
return nil
})
mt.clock.BlockUntilContext(ctx, 1)
plFull := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 1, // after sync
mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
InSync: true,
Count: 100,
Sim: 1, // after sync
})
mt.expectFullSync(plFull, numSyncPeers, 0)
// no full sync here as the node is already in sync according to the probe above
if i > 0 {
mt.clock.Advance(time.Minute)
} else if i < numSyncs-1 {
Expand All @@ -226,14 +226,70 @@ func TestMultiPeerSync(t *testing.T) {
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.99, // high enough for full sync
InSync: false,
Count: 100,
Sim: 0.99, // high enough for full sync
})
mt.expectFullSync(pl, numSyncPeers, 0)
}
expect()
// first full sync happens immediately
// first probe and first full sync happen immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
for i := 0; i < numSyncs; i++ {
expect()
mt.clock.Advance(time.Minute)
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
}
require.True(t, mt.reconciler.Synced())
})

t.Run("some in sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers-1, rangesync.ProbeResult{
InSync: false,
Count: 100,
Sim: 0.99,
})
mt.expectFullSync(pl, numSyncPeers-1, 0)
mt.expectProbe(1, rangesync.ProbeResult{
InSync: true,
Count: 100,
Sim: 1,
})
}
expect()
// first probe happens immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
for i := 0; i < numSyncs; i++ {
expect()
mt.clock.Advance(time.Minute)
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
}
require.True(t, mt.reconciler.Synced())
})

t.Run("all in sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
require.False(t, mt.reconciler.Synced())
expect := func() {
mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
InSync: true,
Count: 100,
Sim: 1,
})
}
expect()
// first probe happens immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
Expand All @@ -252,9 +308,9 @@ func TestMultiPeerSync(t *testing.T) {
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.99, // high enough for full sync
InSync: false,
Count: 100,
Sim: 0.99, // high enough for full sync
})
mt.expectFullSync(pl, numSyncPeers, 0)
}
Expand All @@ -281,16 +337,16 @@ func TestMultiPeerSync(t *testing.T) {
var pl peerList
for _, p := range addedPeers[:5] {
mt.expectSingleProbe(p, rangesync.ProbeResult{
FP: "foo",
Count: 1000,
Sim: 0.99, // high enough for full sync
InSync: false,
Count: 1000,
Sim: 0.99, // high enough for full sync
})
pl.add(p)
}
mt.expectSingleProbe(addedPeers[5], rangesync.ProbeResult{
FP: "foo",
Count: 800, // count too low, this peer should be ignored
Sim: 0.9,
InSync: false,
Count: 800, // count too low, this peer should be ignored
Sim: 0.9,
})
mt.expectFullSync(&pl, 5, 0)
}
Expand All @@ -313,9 +369,9 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Count().Return(50, nil).AnyTimes()
expect := func() {
pl := mt.expectProbe(1, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.5, // too low for full sync, but will have it anyway
InSync: false,
Count: 100,
Sim: 0.5, // too low for full sync, but will have it anyway
})
mt.expectFullSync(pl, 1, 0)
}
Expand All @@ -337,7 +393,7 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).
Return(rangesync.ProbeResult{}, errors.New("probe failed"))
pl := mt.expectProbe(5, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
pl := mt.expectProbe(5, rangesync.ProbeResult{InSync: false, Count: 100, Sim: 0.99})
// just 5 peers for which the probe worked will be checked
mt.expectFullSync(pl, 5, 0)
ctx := mt.start()
Expand All @@ -349,7 +405,7 @@ func TestMultiPeerSync(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{InSync: false, Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, numFails)
}
expect()
Expand All @@ -369,14 +425,14 @@ func TestMultiPeerSync(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()

pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{InSync: false, Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, numSyncPeers)

ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()

pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{InSync: false, Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, 0)
// Retry should happen after mere 5 seconds as no peers have succeeded, no
// need to wait full sync interval.
Expand All @@ -389,7 +445,7 @@ func TestMultiPeerSync(t *testing.T) {
t.Run("cancellation during sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectProbe(numSyncPeers, rangesync.ProbeResult{InSync: false, Count: 100, Sim: 0.99})
mt.syncRunner.EXPECT().FullSync(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, peers []p2p.Peer) error {
mt.cancel()
Expand Down
6 changes: 3 additions & 3 deletions sync2/multipeer/setsyncbase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func TestSetSyncBase(t *testing.T) {
t.Parallel()
st := newSetSyncBaseTester(t, nil)
expPr := rangesync.ProbeResult{
FP: rangesync.RandomFingerprint(),
Count: 42,
Sim: 0.99,
InSync: false,
Count: 42,
Sim: 0.99,
}
set := st.expectCopy()
st.ps.EXPECT().Probe(gomock.Any(), p2p.Peer("p1"), set, nil, nil).Return(expPr, nil)
Expand Down
4 changes: 2 additions & 2 deletions sync2/rangesync/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,15 @@ func testWireProbe(t *testing.T, getRequester getRequesterFunc) {
require.NoError(t, err)
prA, err := pss.Probe(ctx, cst.srvPeerID, st.setB, nil, nil)
require.NoError(t, err)
require.Equal(t, infoA.Fingerprint, prA.FP)
require.False(t, prA.InSync)
require.Equal(t, infoA.Count, prA.Count)
require.InDelta(t, 0.98, prA.Sim, 0.05, "sim")

splitInfo, err := st.setA.SplitRange(x, x, infoA.Count/2)
require.NoError(t, err)
prA, err = pss.Probe(ctx, cst.srvPeerID, st.setB, x, splitInfo.Middle)
require.NoError(t, err)
require.Equal(t, splitInfo.Parts[0].Fingerprint, prA.FP)
require.False(t, prA.InSync)
require.Equal(t, splitInfo.Parts[0].Count, prA.Count)
require.InDelta(t, 0.98, prA.Sim, 0.1, "sim")
}
Expand Down
24 changes: 15 additions & 9 deletions sync2/rangesync/rangesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func (t nullTracer) OnRecent(int, int) {}

// ProbeResult contains the result of a probe.
type ProbeResult struct {
// Fingerprint of the range.
FP any
// True if the peer's range (or full set) is fully in sync with the local range
// (or full set).
// Note that Sim==1 does not guarantee that the peer is in sync b/c simhash
// algorithm is not precise.
InSync bool
// Number of items in the range.
Count int
// An estimate of Jaccard similarity coefficient between the sets.
Expand Down Expand Up @@ -552,17 +555,20 @@ func (rsr *RangeSetReconciler) handleSample(
msg SyncMessage,
info RangeInfo,
) (pr ProbeResult, err error) {
pr.FP = msg.Fingerprint()
pr.InSync = msg.Fingerprint() == info.Fingerprint
pr.Count = msg.Count()
if pr.InSync && pr.Count != info.Count {
return ProbeResult{}, errors.New("mismatched count with matching fingerprint, possible collision")
}
if info.Fingerprint == msg.Fingerprint() {
pr.Sim = 1
} else {
localSample, err := Sample(info.Items, info.Count, rsr.cfg.SampleSize)
if err != nil {
return ProbeResult{}, fmt.Errorf("sampling local items: %w", err)
}
pr.Sim = CalcSim(localSample, msg.Sample())
return pr, nil
}
localSample, err := Sample(info.Items, info.Count, rsr.cfg.SampleSize)
if err != nil {
return ProbeResult{}, fmt.Errorf("sampling local items: %w", err)
}
pr.Sim = CalcSim(localSample, msg.Sample())
return pr, nil
}

Expand Down
Loading

0 comments on commit 471889f

Please sign in to comment.