Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sql: remove epoch info query cache #6575

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ func (h *HandlerV1) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
return fmt.Errorf("store atx: %w", err)
}

atxs.AtxAdded(h.cdb, atx)
if proof != nil {
if err := h.malPublisher.PublishProof(ctx, atx.SmesherID, proof); err != nil {
return fmt.Errorf("publishing malfeasance proof: %w", err)
Expand Down
2 changes: 0 additions & 2 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,8 +1003,6 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
return fmt.Errorf("store atx: %w", err)
}

atxs.AtxAdded(h.cdb, atx)

malicious := false
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// malfeasance check happens after storing the ATX because storing updates the marriage set
Expand Down
39 changes: 16 additions & 23 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
"github.com/spacemeshos/go-spacemesh/sql/layers"
)

const (
fetchSubKey sql.QueryCacheSubKey = "epoch-info-req"
)

type handler struct {
logger *zap.Logger
cdb *datastore.CachedDB
Expand Down Expand Up @@ -82,25 +78,22 @@
return nil, err
}

cacheKey := sql.QueryCacheKey(atxs.CacheKindEpochATXs, epoch.String())
return sql.WithCachedSubKey(ctx, h.cdb, cacheKey, fetchSubKey, func(ctx context.Context) ([]byte, error) {
atxids, err := atxs.GetIDsByEpoch(ctx, h.cdb, epoch)
if err != nil {
return nil, fmt.Errorf("getting ATX IDs: %w", err)
}
ed := EpochData{
AtxIDs: atxids,
}
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.Fatal("failed to serialize EpochData",
zap.Uint32("epoch", epoch.Uint32()),
log.ZContext(ctx),
zap.Error(err),
)
}
return bts, nil
})
atxids, err := atxs.GetIDsByEpoch(ctx, h.cdb, epoch)
if err != nil {
return nil, fmt.Errorf("getting ATX IDs: %w", err)
}
ed := EpochData{
AtxIDs: atxids,
}
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.Fatal("failed to serialize EpochData",
zap.Uint32("epoch", epoch.Uint32()),
log.ZContext(ctx),
zap.Error(err),
)
}

Check warning on line 95 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L90-L95

Added lines #L90 - L95 were not covered by tests
return bts, nil
}

// handleEpochInfoReq streams the ATXs published in the specified epoch.
Expand Down
64 changes: 0 additions & 64 deletions fetch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,70 +341,6 @@ func TestHandleEpochInfoReq(t *testing.T) {
}
}

func testHandleEpochInfoReqWithQueryCache(
tb testing.TB,
getInfo func(th *testHandler, req []byte, ed *EpochData),
) {
th := createTestHandler(tb, sql.WithQueryCache(true))
require.True(tb, th.cdb.Database.IsCached())
require.True(tb, sql.IsCached(th.cdb))
epoch := types.EpochID(11)
var expected EpochData

for i := 0; i < 10; i++ {
vatx := newAtx(tb, epoch)
require.NoError(tb, atxs.Add(th.cdb, vatx, types.AtxBlob{}))
atxs.AtxAdded(th.cdb, vatx)
expected.AtxIDs = append(expected.AtxIDs, vatx.ID())
}

require.Equal(tb, 20, th.cdb.Database.QueryCount())
epochBytes, err := codec.Encode(epoch)
require.NoError(tb, err)

var got EpochData
for i := 0; i < 3; i++ {
getInfo(th, epochBytes, &got)
require.ElementsMatch(tb, expected.AtxIDs, got.AtxIDs)
require.Equal(tb, 21, th.cdb.Database.QueryCount(), "query count @ i = %d", i)
}

// Add another ATX which should be appended to the cached slice
vatx := newAtx(tb, epoch)
require.NoError(tb, atxs.Add(th.cdb, vatx, types.AtxBlob{}))
atxs.AtxAdded(th.cdb, vatx)
expected.AtxIDs = append(expected.AtxIDs, vatx.ID())
require.Equal(tb, 23, th.cdb.Database.QueryCount())

getInfo(th, epochBytes, &got)
require.ElementsMatch(tb, expected.AtxIDs, got.AtxIDs)
// The query count is not incremented as the slice is still
// cached and the new atx is just appended to it, even though
// the response is re-serialized.
require.Equal(tb, 23, th.cdb.Database.QueryCount())
}

func TestHandleEpochInfoReqWithQueryCache(t *testing.T) {
testHandleEpochInfoReqWithQueryCache(t, func(th *testHandler, req []byte, ed *EpochData) {
out, err := th.handleEpochInfoReq(context.Background(), p2p.Peer(""), req)
require.NoError(t, err)
require.NoError(t, codec.Decode(out, ed))
})
}

func TestHandleEpochInfoReqStreamWithQueryCache(t *testing.T) {
testHandleEpochInfoReqWithQueryCache(t, func(th *testHandler, req []byte, ed *EpochData) {
var b bytes.Buffer
err := th.handleEpochInfoReqStream(context.Background(), p2p.Peer(""), req, &b)
require.NoError(t, err)
n, err := server.ReadResponse(&b, func(resLen uint32) (int, error) {
return codec.DecodeFrom(&b, ed)
})
require.NoError(t, err)
require.NotZero(t, n)
})
}

func TestHandleMaliciousIDsReq(t *testing.T) {
tt := []struct {
name string
Expand Down
6 changes: 0 additions & 6 deletions sql/atxs/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,6 @@ func AddBlob(db sql.Executor, id types.ATXID, blob []byte, version types.AtxVers
return nil
}

// AtxAdded updates epoch query cache with new ATX, if the query cache is enabled.
func AtxAdded(db sql.Executor, atx *types.ActivationTx) {
epochCacheKey := sql.QueryCacheKey(CacheKindEpochATXs, atx.PublishEpoch.String())
sql.AppendToCachedSlice(db, epochCacheKey, atx.ID())
}

type Filter func(types.ATXID) bool

func FilterAll(types.ATXID) bool { return true }
Expand Down
75 changes: 0 additions & 75 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package atxs_test

import (
"context"
"errors"
"fmt"
"os"
"slices"
Expand Down Expand Up @@ -383,80 +382,6 @@ func TestGetIDsByEpoch(t *testing.T) {
require.ElementsMatch(t, []types.ATXID{atx4.ID()}, ids3)
}

func TestGetIDsByEpochCached(t *testing.T) {
db := statesql.InMemoryTest(t, sql.WithQueryCache(true))
ctx := context.Background()

sig1, err := signing.NewEdSigner()
require.NoError(t, err)
sig2, err := signing.NewEdSigner()
require.NoError(t, err)

e1 := types.EpochID(1)
e2 := types.EpochID(2)
e3 := types.EpochID(3)

atx1 := newAtx(t, sig1, withPublishEpoch(e1))
atx2 := newAtx(t, sig1, withPublishEpoch(e2))
atx3 := newAtx(t, sig2, withPublishEpoch(e2))
atx4 := newAtx(t, sig2, withPublishEpoch(e3))
atx5 := newAtx(t, sig2, withPublishEpoch(e3))
atx6 := newAtx(t, sig2, withPublishEpoch(e3))

for _, atx := range []*types.ActivationTx{atx1, atx2, atx3, atx4} {
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
atxs.AtxAdded(db, atx)
}

// insert atx + insert blob for each ATX
require.Equal(t, 8, db.QueryCount())

for i := 0; i < 3; i++ {
ids1, err := atxs.GetIDsByEpoch(ctx, db, e1)
require.NoError(t, err)
require.ElementsMatch(t, []types.ATXID{atx1.ID()}, ids1)
require.Equal(t, 9, db.QueryCount())
}

for i := 0; i < 3; i++ {
ids2, err := atxs.GetIDsByEpoch(ctx, db, e2)
require.NoError(t, err)
require.Contains(t, ids2, atx2.ID())
require.Contains(t, ids2, atx3.ID())
require.Equal(t, 10, db.QueryCount())
}

for i := 0; i < 3; i++ {
ids3, err := atxs.GetIDsByEpoch(ctx, db, e3)
require.NoError(t, err)
require.ElementsMatch(t, []types.ATXID{atx4.ID()}, ids3)
require.Equal(t, 11, db.QueryCount())
}

require.NoError(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx5, types.AtxBlob{})
return nil
}))
atxs.AtxAdded(db, atx5)
require.Equal(t, 13, db.QueryCount())

ids3, err := atxs.GetIDsByEpoch(ctx, db, e3)
require.NoError(t, err)
require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids3)
require.Equal(t, 13, db.QueryCount()) // not incremented after Add

require.Error(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx6, types.AtxBlob{})
return errors.New("fail") // rollback
}))

// atx6 should not be in the cache
ids4, err := atxs.GetIDsByEpoch(ctx, db, e3)
require.NoError(t, err)
require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids4)
require.Equal(t, 16, db.QueryCount()) // not incremented after Add
}

func Test_IterateAtxsWithMalfeasance(t *testing.T) {
db := statesql.InMemoryTest(t)

Expand Down
1 change: 0 additions & 1 deletion sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ func (db *sqliteDatabase) withTx(ctx context.Context, initstmt string, exec func
}
}()
if err := exec(tx); err != nil {
tx.queryCache.ClearCache()
return err
}
return tx.Commit()
Expand Down
Loading
Loading