Skip to content

Commit

Permalink
sync2: eliminate unneeded DB accesses when peers are in sync
Browse files Browse the repository at this point in the history
`FPTree`, and thus `DBSet` hold enough information in memory for peers
to decide whether they are in sync or not. In this state, the peers
only exchange probe requests that cover the whole set, and we only
need to know fingerprint and count of the set. For that, no database
queries are necessary.

This change eliminates unnecessary database access, simplifies
`OrderedSet` interface and adds a test case that checks that synced
peers do not make any queries except those needed to advance `DBSet`
(include any newly added items).
  • Loading branch information
ivan4th committed Dec 27, 2024
1 parent b905784 commit 8ea4415
Show file tree
Hide file tree
Showing 16 changed files with 551 additions and 347 deletions.
40 changes: 20 additions & 20 deletions sync2/dbset/dbset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbset

import (
"context"
"errors"
"fmt"
"maps"
"sync"
Expand Down Expand Up @@ -118,16 +119,12 @@ func (d *DBSet) Receive(k rangesync.KeyBytes) error {
return nil
}

func (d *DBSet) firstItem() (rangesync.KeyBytes, error) {
if err := d.EnsureLoaded(); err != nil {
return nil, err
}
return d.ft.All().First()
}

// GetRangeInfo returns information about the range of items in the DBSet.
// RangeInfo returns information about the range of items in the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) GetRangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, error) {
func (d *DBSet) RangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, error) {
if x == nil || y == nil {
return rangesync.RangeInfo{}, errors.New("bad range")
}
if err := d.EnsureLoaded(); err != nil {
return rangesync.RangeInfo{}, err
}
Expand All @@ -136,17 +133,6 @@ func (d *DBSet) GetRangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, erro
Items: rangesync.EmptySeqResult(),
}, nil
}
if x == nil || y == nil {
if x != nil || y != nil {
panic("BUG: GetRangeInfo called with one of x/y nil but not both")
}
var err error
x, err = d.firstItem()
if err != nil {
return rangesync.RangeInfo{}, fmt.Errorf("getting first item: %w", err)
}
y = x
}
fpr, err := d.ft.FingerprintInterval(x, y, -1)
if err != nil {
return rangesync.RangeInfo{}, err
Expand Down Expand Up @@ -192,6 +178,20 @@ func (d *DBSet) SplitRange(x, y rangesync.KeyBytes, count int) (rangesync.SplitI
}, nil
}

// SetInfo returns RangeInfo for the whole DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) SetInfo() (rangesync.RangeInfo, error) {
if err := d.EnsureLoaded(); err != nil {
return rangesync.RangeInfo{}, err
}
fpr := d.ft.FingerprintAll()
return rangesync.RangeInfo{
Fingerprint: fpr.FP,
Count: int(fpr.Count),
Items: fpr.Items,
}, nil
}

// Items returns a sequence of all items in the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) Items() rangesync.SeqResult {
Expand Down
214 changes: 123 additions & 91 deletions sync2/dbset/dbset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ func TestDBSet_Empty(t *testing.T) {
requireEmpty(t, s.Items())
requireEmpty(t, s.Received())

info, err := s.GetRangeInfo(nil, nil)
info, err := s.SetInfo()
require.NoError(t, err)
require.Equal(t, 0, info.Count)
require.Equal(t, "000000000000000000000000", info.Fingerprint.String())
requireEmpty(t, info.Items)

info, err = s.GetRangeInfo(
info, err = s.RangeInfo(
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
require.Equal(t, 0, info.Count)
require.Equal(t, "000000000000000000000000", info.Fingerprint.String())
requireEmpty(t, info.Items)

info, err = s.GetRangeInfo(
info, err = s.RangeInfo(
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("9999000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
Expand All @@ -69,7 +69,7 @@ func TestDBSet_Empty(t *testing.T) {

func TestDBSet(t *testing.T) {
ids := []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("1111111111111111111111111111111111111111111111111111111111111111"),
rangesync.MustParseHexKeyBytes("123456789abcdef0000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("5555555555555555555555555555555555555555555555555555555555555555"),
rangesync.MustParseHexKeyBytes("8888888888888888888888888888888888888888888888888888888888888888"),
Expand All @@ -81,94 +81,126 @@ func TestDBSet(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
require.Equal(t, "1111111111111111111111111111111111111111111111111111111111111111",
firstKey(t, s.Items()).String())
has, err := s.Has(
rangesync.MustParseHexKeyBytes("9876000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
require.False(t, has)

for _, tc := range []struct {
xIdx, yIdx int
limit int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 1,
yIdx: 1,
limit: -1,
fp: "642464b773377bbddddddddd",
count: 5,
startIdx: 1,
endIdx: 1,
},
{
xIdx: -1,
yIdx: -1,
limit: -1,
fp: "642464b773377bbddddddddd",
count: 5,
startIdx: 0,
endIdx: 0,
},
{
xIdx: 0,
yIdx: 3,
limit: -1,
fp: "4761032dcfe98ba555555555",
count: 3,
startIdx: 0,
endIdx: 3,
},
{
xIdx: 2,
yIdx: 0,
limit: -1,
fp: "761032cfe98ba54ddddddddd",
count: 3,
startIdx: 2,
endIdx: 0,
},
{
xIdx: 3,
yIdx: 2,
limit: 3,
fp: "2345679abcdef01888888888",
count: 3,
startIdx: 3,
endIdx: 1,
},
} {
name := fmt.Sprintf("%d-%d_%d", tc.xIdx, tc.yIdx, tc.limit)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
if tc.xIdx >= 0 {
t.Run("RangeInfo", func(t *testing.T) {
for _, tc := range []struct {
xIdx, yIdx int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 1,
yIdx: 1,
fp: "753575a662266aaccccccccc",
count: 5,
startIdx: 1,
endIdx: 1,
},
{
xIdx: 0,
yIdx: 3,
fp: "5670123cdef89ab444444444",
count: 3,
startIdx: 0,
endIdx: 3,
},
{
xIdx: 2,
yIdx: 0,
fp: "761032cfe98ba54ddddddddd",
count: 3,
startIdx: 2,
endIdx: 0,
},
} {
name := fmt.Sprintf("%d-%d", tc.xIdx, tc.yIdx)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
x = ids[tc.xIdx]
y = ids[tc.yIdx]
}
t.Logf("x %v y %v limit %d", x, y, tc.limit)
var info rangesync.RangeInfo
if tc.limit < 0 {
info, err = s.GetRangeInfo(x, y)
t.Logf("x %v y %v", x, y)
var info rangesync.RangeInfo
info, err = s.RangeInfo(x, y)
require.NoError(t, err)
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
} else {
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
})

t.Run("SplitRange", func(t *testing.T) {
for _, tc := range []struct {
xIdx, yIdx int
limit int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 3,
yIdx: 2,
limit: 3,
fp: "3254768badcfe10999999999",
count: 3,
startIdx: 3,
endIdx: 1,
},
{
xIdx: 3,
yIdx: 3,
limit: 2,
fp: "2345679abcdef01888888888",
count: 2,
startIdx: 3,
endIdx: 3,
},
} {
name := fmt.Sprintf("%d-%d_%d", tc.xIdx, tc.yIdx, tc.limit)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
x = ids[tc.xIdx]
y = ids[tc.yIdx]
t.Logf("x %v y %v limit %d", x, y, tc.limit)
var info rangesync.RangeInfo
sr, err := s.SplitRange(x, y, tc.limit)
require.NoError(t, err)
info = sr.Parts[0]
}
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
})

t.Run("SetInfo", func(t *testing.T) {
info, err := s.SetInfo()
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "753575a662266aaccccccccc", info.Fingerprint.String())
items, err := info.Items.Collect()
require.NoError(t, err)
require.Equal(t, ids, items)
})
}

func TestDBSet_Receive(t *testing.T) {
Expand All @@ -195,7 +227,7 @@ func TestDBSet_Receive(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)

info, err := s.GetRangeInfo(ids[2], ids[0])
info, err := s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand All @@ -218,7 +250,7 @@ func TestDBSet_Copy(t *testing.T) {
firstKey(t, s.Items()).String())

require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[2], ids[0])
info, err := copy.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand All @@ -228,15 +260,15 @@ func TestDBSet_Copy(t *testing.T) {
"abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
info, err = s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

requireEmpty(t, s.Received())

info, err = s.GetRangeInfo(ids[2], ids[0])
info, err = s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand Down Expand Up @@ -267,13 +299,13 @@ func TestDBItemStore_Advance(t *testing.T) {
require.NoError(t, os.EnsureLoaded())

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := os.GetRangeInfo(ids[0], ids[0])
info, err := os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
Expand All @@ -284,27 +316,27 @@ func TestDBItemStore_Advance(t *testing.T) {
"abcdef1234567890000000000000000000000000000000000000000000000000"),
})

info, err = os.GetRangeInfo(ids[0], ids[0])
info, err = os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

require.NoError(t, os.Advance())

info, err = os.GetRangeInfo(ids[0], ids[0])
info, err = os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
Expand All @@ -314,7 +346,7 @@ func TestDBItemStore_Advance(t *testing.T) {
}))

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[0], ids[0])
info, err := copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
Expand Down
Loading

0 comments on commit 8ea4415

Please sign in to comment.