Skip to content

Commit

Permalink
recover only N latest layers required for correct ballots decoding (#…
Browse files Browse the repository at this point in the history
…5109)

closes: #3006

tortoise recovers state to decode votes (that are encoded as base ballot + delta) and verify that signed opinion is available locally. when we are accepting votes from network we need to verify those two things. you can see how this recursiveness goes back to genesis. current code reloads ballots from genesis as it is the most fail safe approach. but this have a downside that overtime we will have to load unbounded amount of data, which is not practical. 

in this change tortoise loads ballots that are useful only for next ballots that we expect to receive from the network. the algorithm looks up latest applied layer, subtracts number of layers which are set as tortoise window (10 000), and finds the first layer of that epoch.
  • Loading branch information
dshulyak committed Oct 6, 2023
1 parent cb9c70f commit 26b8b55
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 151 deletions.
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (app *App) initServices(ctx context.Context) error {
trtl, err := tortoise.Recover(
ctx,
app.cachedDB,
app.clock.CurrentLayer(), beaconProtocol, trtlopts...,
app.clock.CurrentLayer(), trtlopts...,
)
if err != nil {
return fmt.Errorf("can't recover tortoise state: %w", err)
Expand Down
38 changes: 38 additions & 0 deletions tortoise/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func New(opts ...Opt) (*Tortoise, error) {
return t, nil
}

func (t *Tortoise) RecoverFrom(lid types.LayerID, opinion, prev types.Hash32) {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("recover from",
zap.Uint32("lid", lid.Uint32()),
log.ZShortStringer("opinion", opinion),
log.ZShortStringer("prev opinion", prev),
)
t.trtl.evicted = lid - 1
t.trtl.pending = lid
t.trtl.verified = lid
t.trtl.processed = lid
t.trtl.last = lid
layer := t.trtl.layer(lid)
layer.opinion = opinion
layer.prevOpinion = &prev
}

// LatestComplete returns the latest verified layer.
func (t *Tortoise) LatestComplete() types.LayerID {
t.mu.Lock()
Expand Down Expand Up @@ -300,6 +318,21 @@ func (t *Tortoise) OnBallot(ballot *types.BallotTortoiseData) {
if t.tracer != nil {
t.tracer.On(&BallotTrace{Ballot: ballot})
}
}

// OnRecoveredBallot is called for ballots recovered from database.
//
// For recovered ballots base ballot is not required to be in state therefore
// opinion is not recomputed, but instead recovered from database state.
func (t *Tortoise) OnRecoveredBallot(ballot *types.BallotTortoiseData) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.trtl.onRecoveredBallot(ballot); err != nil {
errorsCounter.Inc()
t.logger.Error("failed to save state from recovered ballot",
zap.Stringer("ballot", ballot.ID),
zap.Error(err))
}
if t.tracer != nil {
t.tracer.On(&BallotTrace{Ballot: ballot})
}
Expand Down Expand Up @@ -536,6 +569,11 @@ func (t *Tortoise) Mode() Mode {
// pending layer to the layer above equal layer.
// this method is meant to be used only in recovery from disk codepath.
func (t *Tortoise) resetPending(lid types.LayerID, opinion types.Hash32) bool {
t.logger.Debug("reset pending",
zap.Uint32("lid", lid.Uint32()),
log.ZShortStringer("computed", t.trtl.layer(lid).opinion),
log.ZShortStringer("stored", opinion),
)
if t.trtl.layer(lid).opinion == opinion {
t.trtl.pending = lid + 1
return true
Expand Down
46 changes: 12 additions & 34 deletions tortoise/model/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/beacons"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/layers"
Expand All @@ -32,13 +33,12 @@ func newCore(rng *rand.Rand, id string, logger log.Log) *core {
panic(err)
}
c := &core{
id: id,
logger: logger,
rng: rng,
cdb: cdb,
beacons: newBeaconStore(),
units: units,
signer: sig,
id: id,
logger: logger,
rng: rng,
cdb: cdb,
units: units,
signer: sig,
}
cfg := tortoise.DefaultConfig()
cfg.LayerSize = layerSize
Expand All @@ -59,7 +59,6 @@ type core struct {
rng *rand.Rand

cdb *datastore.CachedDB
beacons *beaconStore
tortoise *tortoise.Tortoise

// generated on setup
Expand Down Expand Up @@ -107,11 +106,11 @@ func (c *core) OnMessage(m Messenger, event Message) {
if c.refBallot != nil {
ballot.RefBallot = *c.refBallot
} else {
beacon, err := c.beacons.GetBeacon(ev.LayerID.GetEpoch())
beacon, err := beacons.Get(c.cdb, ev.LayerID.GetEpoch())
if err != nil {
beacon = types.Beacon{}
c.rng.Read(beacon[:])
c.beacons.StoreBeacon(ev.LayerID.GetEpoch(), beacon)
beacons.Set(c.cdb, ev.LayerID.GetEpoch(), beacon)
}
ballot.EpochData = &types.EpochData{
ActiveSetHash: types.Hash32{1, 2, 3},
Expand All @@ -129,7 +128,8 @@ func (c *core) OnMessage(m Messenger, event Message) {
m.Send(MessageBallot{Ballot: ballot})
case MessageLayerEnd:
if ev.LayerID.After(types.GetEffectiveGenesis()) {
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, c.beacons, ev.LayerID, ev.LayerID, ev.LayerID, ev.LayerID)
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, ev.LayerID, c.tortoise.OnBallot)
c.tortoise.TallyVotes(context.Background(), ev.LayerID)
m.Notify(EventVerified{ID: c.id, Verified: c.tortoise.LatestComplete(), Layer: ev.LayerID})
}

Expand Down Expand Up @@ -172,30 +172,8 @@ func (c *core) OnMessage(m Messenger, event Message) {
}
atxs.Add(c.cdb, vAtx)
case MessageBeacon:
c.beacons.StoreBeacon(ev.EpochID, ev.Beacon)
beacons.Add(c.cdb, ev.EpochID+1, ev.Beacon)
case MessageCoinflip:
layers.SetWeakCoin(c.cdb, ev.LayerID, ev.Coinflip)
}
}

func newBeaconStore() *beaconStore {
return &beaconStore{
beacons: map[types.EpochID]types.Beacon{},
}
}

type beaconStore struct {
beacons map[types.EpochID]types.Beacon
}

func (b *beaconStore) GetBeacon(eid types.EpochID) (types.Beacon, error) {
beacon, exist := b.beacons[eid-1]
if !exist {
return types.Beacon{}, sql.ErrNotFound
}
return beacon, nil
}

func (b *beaconStore) StoreBeacon(eid types.EpochID, beacon types.Beacon) {
b.beacons[eid] = beacon
}
87 changes: 56 additions & 31 deletions tortoise/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/beacons"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/system"
)

// Recover tortoise state from database.
func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID, beacon system.BeaconGetter, opts ...Opt) (*Tortoise, error) {
func Recover(ctx context.Context, db *datastore.CachedDB, current types.LayerID, opts ...Opt) (*Tortoise, error) {
trtl, err := New(opts...)
if err != nil {
return nil, err
Expand All @@ -29,6 +29,25 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,
return nil, fmt.Errorf("failed to load latest known layer: %w", err)
}

applied, err := layers.GetLastApplied(db)
if err != nil {
return nil, fmt.Errorf("get last applied: %w", err)
}
start := types.GetEffectiveGenesis() + 1
if applied > types.LayerID(trtl.cfg.WindowSize) {
window := applied - types.LayerID(trtl.cfg.WindowSize)
window = window.GetEpoch().FirstLayer() // windback to the start of the epoch to load ref ballots
if window > start {
prev, err1 := layers.GetAggregatedHash(db, window-1)
opinion, err2 := layers.GetAggregatedHash(db, window)
if err1 == nil && err2 == nil {
// tortoise will need reference to previous layer
trtl.RecoverFrom(window, opinion, prev)
start = window
}
}
}

malicious, err := identities.GetMalicious(db)
if err != nil {
return nil, fmt.Errorf("recover malicious %w", err)
Expand All @@ -39,7 +58,7 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,

if types.GetEffectiveGenesis() != types.FirstEffectiveGenesis() {
// need to load the golden atxs after a checkpoint recovery
if err := recoverEpoch(types.GetEffectiveGenesis().Add(1).GetEpoch(), trtl, db, beacon); err != nil {
if err := recoverEpoch(types.GetEffectiveGenesis().Add(1).GetEpoch(), trtl, db); err != nil {
return nil, err
}
}
Expand All @@ -51,42 +70,65 @@ func Recover(ctx context.Context, db *datastore.CachedDB, latest types.LayerID,
epoch++ // recoverEpoch expects target epoch, rather than publish
if last.GetEpoch() != epoch {
for eid := last.GetEpoch(); eid <= epoch; eid++ {
if err := recoverEpoch(eid, trtl, db, beacon); err != nil {
if err := recoverEpoch(eid, trtl, db); err != nil {
return nil, err
}
}
}
start := types.GetEffectiveGenesis().Add(1)
for lid := start; !lid.After(last); lid = lid.Add(1) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := RecoverLayer(ctx, trtl, db, beacon, start, lid, last, min(last, latest)); err != nil {
if err := RecoverLayer(ctx, trtl, db, lid, trtl.OnRecoveredBallot); err != nil {
return nil, fmt.Errorf("failed to load tortoise state at layer %d: %w", lid, err)
}
}
if last == 0 {
last = current
} else {
last = min(last, current)
}
if last < start {
return trtl, nil
}
trtl.TallyVotes(ctx, last)
// find topmost layer that was already applied and reset pending
// so that result for that layer is not returned
for prev := last - 1; prev >= start; prev-- {
opinion, err := layers.GetAggregatedHash(db, prev)
if err == nil && opinion != types.EmptyLayerHash {
if trtl.resetPending(prev, opinion) {
break
}
}
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, fmt.Errorf("check opinion %w", err)
}
}
return trtl, nil
}

func recoverEpoch(epoch types.EpochID, trtl *Tortoise, db *datastore.CachedDB, beacondb system.BeaconGetter) error {
func recoverEpoch(epoch types.EpochID, trtl *Tortoise, db *datastore.CachedDB) error {
if err := db.IterateEpochATXHeaders(epoch, func(header *types.ActivationTxHeader) error {
trtl.OnAtx(header.ToData())
return nil
}); err != nil {
return err
}
beacon, err := beacondb.GetBeacon(epoch)
if err == nil {
beacon, err := beacons.Get(db, epoch)
if err == nil && beacon != types.EmptyBeacon {
trtl.OnBeacon(epoch, beacon)
}
return nil
}

func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, beacon system.BeaconGetter, start, lid, last, current types.LayerID) error {
type ballotFunc func(*types.BallotTortoiseData)

func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, lid types.LayerID, onBallot ballotFunc) error {
if lid.FirstInEpoch() {
if err := recoverEpoch(lid.GetEpoch(), trtl, db, beacon); err != nil {
if err := recoverEpoch(lid.GetEpoch(), trtl, db); err != nil {
return err
}
}
Expand Down Expand Up @@ -119,36 +161,19 @@ func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, b
}
for _, ballot := range ballotsrst {
if ballot.EpochData != nil {
trtl.OnBallot(ballot.ToTortoiseData())
onBallot(ballot.ToTortoiseData())
}
}
for _, ballot := range ballotsrst {
if ballot.EpochData == nil {
trtl.OnBallot(ballot.ToTortoiseData())
onBallot(ballot.ToTortoiseData())
}
}
coin, err := layers.GetWeakCoin(db, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return err
}
if err == nil {
} else if err == nil {
trtl.OnWeakCoin(lid, coin)
}
if lid <= current && (lid%types.LayerID(trtl.cfg.WindowSize) == 0 || lid == last) {
trtl.TallyVotes(ctx, lid)
// find topmost layer that was already applied and reset pending
// so that result for that layer is not returned
for prev := lid - 1; prev >= start; prev-- {
opinion, err := layers.GetAggregatedHash(db, prev)
if err == nil {
if trtl.resetPending(prev, opinion) {
return nil
}
} else if !errors.Is(err, sql.ErrNotFound) {
return fmt.Errorf("check opinion %w", err)
}
}

}
return nil
}
Loading

0 comments on commit 26b8b55

Please sign in to comment.