Skip to content

Commit

Permalink
Improve performance of proposal builder by not DB inserting active se…
Browse files Browse the repository at this point in the history
…t if not necessary (#6422)

## Motivation

Closes #6418

The first proposal created by a node for any identity triggers persisting the activeset that is used by all identities of the node. This can be a slow process (especially when many ATXs are received by the node) and is usually not necessary, because the activeset is most likely already in the DB.

This PR moves persisting the active set to the beginning of an epoch and avoids a DB write if not necessary (which will usually be the case).
  • Loading branch information
fasmat committed Oct 29, 2024
1 parent 258e0f7 commit cb85d2c
Show file tree
Hide file tree
Showing 26 changed files with 101 additions and 124 deletions.
2 changes: 1 addition & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *HandlerV1) storeAtx(
proof *mwire.MalfeasanceProof
malicious bool
)
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
var err error
malicious, err = identities.IsMalicious(tx, atx.SmesherID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if len(watx.marriages) != 0 {
newMarriageID, err := marriage.NewID(tx)
if err != nil {
Expand Down Expand Up @@ -927,7 +927,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
atxs.AtxAdded(h.cdb, atx)

malicious := false
err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// malfeasance check happens after storing the ATX because storing updates the marriage set
// that is needed for the malfeasance proof
// TODO(mafa): don't store own ATX if it would mark the node as malicious
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestTransactionService_StreamResults(t *testing.T) {
gen := fixture.NewTransactionResultGenerator().
WithAddresses(2)
txs := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txs {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/v2alpha1/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransactionService_List(t *testing.T) {

gen := fixture.NewTransactionResultGenerator().WithAddresses(2)
txsList := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txsList {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion blocks/certifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (c *Certifier) save(
if len(valid)+len(invalid) == 0 {
return certificates.Add(c.db, lid, cert)
}
return c.db.WithTx(ctx, func(dbtx sql.Transaction) error {
return c.db.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := certificates.Add(dbtx, lid, cert); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func Recover(
}
defer localDB.Close()
logger.Info("clearing atx and malfeasance sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx sql.Transaction) error {
if err := localDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if err := atxsync.Clear(tx); err != nil {
return err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func RecoverFromLocalFile(
zap.Int("num accounts", len(data.accounts)),
zap.Int("num atxs", len(data.atxs)),
)
if err = newDB.WithTx(ctx, func(tx sql.Transaction) error {
if err = newDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
for _, acct := range data.accounts {
if err = accounts.Update(tx, acct); err != nil {
return fmt.Errorf("restore account snapshot: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/merge-nodes/internal/merge_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error {
}

dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to))
err = dstDB.WithTx(ctx, func(tx sql.Transaction) error {
err = dstDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
enc := func(stmt *sql.Statement) {
stmt.BindText(1, filepath.Join(from, localDbFile))
}
Expand Down
5 changes: 5 additions & 0 deletions common/types/poet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type PoetServer struct {
Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"`
}

func ByteToPoetProofRef(b []byte) (ref PoetProofRef) {
copy(ref[:], b)
return ref
}

type PoetProofRef Hash32

func (r *PoetProofRef) String() string {
Expand Down
6 changes: 2 additions & 4 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,11 @@ func (bs *BlobStore) Has(hint Hint, key []byte) (bool, error) {
case TXDB:
return transactions.Has(bs.DB, types.TransactionID(types.BytesToHash(key)))
case POETDB:
var ref types.PoetProofRef
copy(ref[:], key)
return poets.Has(bs.DB, ref)
return poets.Has(bs.DB, types.ByteToPoetProofRef(key))
case Malfeasance:
return identities.IsMalicious(bs.DB, types.BytesToNodeID(key))
case ActiveSet:
return activesets.Has(bs.DB, key)
return activesets.Has(bs.DB, types.BytesToHash(key))
}
return false, fmt.Errorf("blob store not found %s", hint)
}
2 changes: 1 addition & 1 deletion malfeasance/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof)
return types.EmptyNodeID, errors.Join(err, pubsub.ErrValidationReject)
}
proofBytes := codec.MustEncode(p)
if err := h.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
malicious, err := identities.IsMalicious(dbtx, nodeID)
if err != nil {
return fmt.Errorf("check known malicious: %w", err)
Expand Down
12 changes: 3 additions & 9 deletions mesh/ballotwriter/ballotwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var writerDelay = 100 * time.Millisecond

type BallotWriter struct {
db db
db sql.StateDatabase
logger *zap.Logger

atxMu sync.Mutex
Expand All @@ -30,7 +30,7 @@ type BallotWriter struct {
ballotBatchResult *batchResult
}

func New(db db, logger *zap.Logger) *BallotWriter {
func New(db sql.StateDatabase, logger *zap.Logger) *BallotWriter {
// create a stopped ticker that can be started later
timer := time.NewTicker(writerDelay)
timer.Stop()
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *BallotWriter) Start(ctx context.Context) {
// we use a context.Background() because: on shutdown the canceling of the
// context may exit the transaction halfway and leave the db in some state where it
// causes crawshaw to panic on a "not all connections returned to pool".
if err := w.db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := w.db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
for _, ballot := range batch {
if !ballot.IsMalicious() {
layerBallotStart := time.Now()
Expand Down Expand Up @@ -163,9 +163,3 @@ type batchResult struct {
doneC chan struct{}
err error
}

type db interface {
sql.Executor

WithTx(context.Context, func(sql.Transaction) error) error
}
6 changes: 3 additions & 3 deletions mesh/ballotwriter/ballotwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
if err := writeFn(a[i], tx); err != nil {
b.Fatal(err)
}
Expand All @@ -138,7 +138,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/1000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 1000); i < (j*1000)+1000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand All @@ -156,7 +156,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/5000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 5000); i < (j*5000)+5000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMesh(
}

genesis := types.GetEffectiveGenesis()
if err = db.WithTx(context.Background(), func(dbtx sql.Transaction) error {
if err = db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error {
if err = layers.SetProcessed(dbtx, genesis); err != nil {
return fmt.Errorf("mesh init: %w", err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error
return fmt.Errorf("execute block %v/%v: %w", layer.Layer, target, err)
}
}
if err := msh.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := msh.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := layers.SetApplied(dbtx, layer.Layer, target); err != nil {
return fmt.Errorf("set applied for %v/%v: %w", layer.Layer, target, err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (msh *Mesh) saveHareOutput(ctx context.Context, lid types.LayerID, bid type
certs []certificates.CertValidity
err error
)
if err = msh.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err = msh.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// check if a certificate has been generated or sync'ed.
// - node generated the certificate when it collected enough certify messages
// - hare outputs are processed in layer order. i.e. when hare fails for a previous layer N,
Expand Down
51 changes: 26 additions & 25 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ProposalBuilder struct {
logger *zap.Logger
cfg config

db sql.Executor
db sql.StateDatabase
localdb sql.Executor
atxsdata atxsData
clock layerClock
Expand Down Expand Up @@ -204,7 +204,7 @@ func WithLayerSize(size uint32) Opt {
}
}

// WithWorkersLimit configures paralelization factor for builder operation when working with
// WithWorkersLimit configures parallelization factor for builder operation when working with
// more than one signer.
func WithWorkersLimit(limit int) Opt {
return func(pb *ProposalBuilder) {
Expand Down Expand Up @@ -270,7 +270,7 @@ func WithActivesetPreparation(prep ActiveSetPreparation) Opt {
// New creates a struct of block builder type.
func New(
clock layerClock,
db sql.Executor,
db sql.StateDatabase,
localdb sql.Executor,
atxsdata atxsData,
publisher pubsub.Publisher,
Expand Down Expand Up @@ -449,7 +449,7 @@ func (pb *ProposalBuilder) UpdateActiveSet(target types.EpochID, set []types.ATX
pb.activeGen.updateFallback(target, set)
}

func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.LayerID) error {
if pb.shared.epoch != current.GetEpoch() {
pb.shared = sharedSession{epoch: current.GetEpoch()}
}
Expand All @@ -476,7 +476,27 @@ func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
pb.shared.active.id = id
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil

// Ideally we only persist the active set when we are actually eligible with at least one identity in at least one
// layer, but since at the moment we use a bootstrapped activeset, `activesets.Has` will always return
// true anyways.
//
// Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but
// maybe we should revisit this when activesets are no longer bootstrapped.
return pb.db.WithTx(ctx, func(tx sql.Transaction) error {
yes, err := activesets.Has(tx, pb.shared.active.id)
if err != nil {
return err
}
if yes {
return nil
}

return activesets.Add(tx, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
})
}

func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) error {
Expand Down Expand Up @@ -548,7 +568,7 @@ func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID)

func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
buildStartTime := time.Now()
if err := pb.initSharedData(lid); err != nil {
if err := pb.initSharedData(ctx, lid); err != nil {
return err
}

Expand Down Expand Up @@ -578,17 +598,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
return meshHash
})

persistActiveSetOnce := sync.OnceValue(func() error {
err := activesets.Add(pb.db, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return err
}
return nil
})

// Two stage pipeline, with the stages running in parallel.
// 1. Initializes signers. Runs limited number of goroutines because the initialization is CPU and DB bound.
// 2. Collects eligible signers' sessions from the stage 1 and creates and publishes proposals.
Expand Down Expand Up @@ -662,14 +671,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
ss.latency.hash = time.Since(start)

eg2.Go(func() error {
// needs to be saved before publishing, as we will query it in handler
if ss.session.ref == types.EmptyBallotID {
start := time.Now()
if err := persistActiveSetOnce(); err != nil {
return err
}
ss.latency.activeSet = time.Since(start)
}
proofs := ss.session.eligibilities.proofs[lid]

start = time.Now()
Expand Down
17 changes: 0 additions & 17 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package miner
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -1272,19 +1271,3 @@ func BenchmarkDoubleCache(b *testing.B) {

require.Equal(b, types.EmptyATXID, found)
}

func BenchmarkDB(b *testing.B) {
db, err := statesql.Open("file:state.sql")
require.NoError(b, err)
defer db.Close()

bytes, err := hex.DecodeString("00003ce28800fadd692c522f7b1db219f675b49108aec7f818e2c4fd935573f6")
require.NoError(b, err)
nodeID := types.BytesToNodeID(bytes)
var found types.ATXID
b.ResetTimer()
for i := 0; i < b.N; i++ {
found, _ = atxs.GetByEpochAndNodeID(db, 30, nodeID)
}
require.NotEqual(b, types.EmptyATXID, found)
}
13 changes: 5 additions & 8 deletions sql/activesets/activesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Add(db sql.Executor, id types.Hash32, set *types.EpochActiveSet) error {
(id, epoch, active_set)
values (?1, ?2, ?3);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, id[:])
stmt.BindBytes(1, id.Bytes())
stmt.BindInt64(2, int64(set.Epoch))
stmt.BindBytes(3, codec.MustEncode(set))
}, nil)
Expand Down Expand Up @@ -100,9 +100,7 @@ func getBlob(ctx context.Context, db sql.Executor, id []byte) ([]byte, error) {

func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
_, err := db.Exec("delete from activesets where epoch < ?1;",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
},
func(stmt *sql.Statement) { stmt.BindInt64(1, int64(epoch)) },
nil,
)
if err != nil {
Expand All @@ -111,10 +109,9 @@ func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
return nil
}

func Has(db sql.Executor, id []byte) (bool, error) {
rows, err := db.Exec(
"select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id) },
func Has(db sql.Executor, id types.Hash32) (bool, error) {
rows, err := db.Exec("select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id.Bytes()) },
nil,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.Equal(t, 11, db.QueryCount())
}

require.NoError(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx5, types.AtxBlob{})
return nil
}))
Expand All @@ -445,7 +445,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids3)
require.Equal(t, 13, db.QueryCount()) // not incremented after Add

require.Error(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.Error(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx6, types.AtxBlob{})
return errors.New("fail") // rollback
}))
Expand Down
Loading

0 comments on commit cb85d2c

Please sign in to comment.