Skip to content

Commit

Permalink
atxs: drop legacy code AwaitATX (#5120)
Browse files Browse the repository at this point in the history
## Motivation
Closes #5015
  • Loading branch information
countvonzero committed Oct 3, 2023
1 parent 68d096d commit f74d4b2
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 291 deletions.
21 changes: 2 additions & 19 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ type Builder struct {
layersPerEpoch uint32
regossipInterval time.Duration
cdb *datastore.CachedDB
atxHandler atxHandler
publisher pubsub.Publisher
nipostBuilder nipostBuilder
postSetupProvider postSetupProvider
Expand Down Expand Up @@ -151,7 +150,6 @@ func NewBuilder(
nodeID types.NodeID,
signer *signing.EdSigner,
cdb *datastore.CachedDB,
hdlr atxHandler,
publisher pubsub.Publisher,
nipostBuilder nipostBuilder,
postSetupProvider postSetupProvider,
Expand All @@ -169,7 +167,6 @@ func NewBuilder(
layersPerEpoch: conf.LayersPerEpoch,
regossipInterval: conf.RegossipInterval,
cdb: cdb,
atxHandler: hdlr,
publisher: publisher,
nipostBuilder: nipostBuilder,
postSetupProvider: postSetupProvider,
Expand Down Expand Up @@ -608,13 +605,10 @@ func (b *Builder) PublishActivationTx(ctx context.Context) error {
}

atx := b.pendingATX
atxReceived := b.atxHandler.AwaitAtx(atx.ID())
defer b.atxHandler.UnsubscribeAtx(atx.ID())
size, err := b.broadcast(ctx, atx)
if err != nil {
return fmt.Errorf("broadcast: %w", err)
}

logger.Event().Info("atx published", log.Inline(atx), log.Int("size", size))

events.EmitAtxPublished(
Expand All @@ -623,19 +617,8 @@ func (b *Builder) PublishActivationTx(ctx context.Context) error {
time.Until(b.layerClock.LayerToTime(atx.TargetEpoch().FirstLayer())),
)

select {
case <-atxReceived:
logger.With().Info("received atx in db", atx.ID())
if err := b.discardChallenge(); err != nil {
return fmt.Errorf("%w: after published atx", err)
}
case <-b.layerClock.AwaitLayer((atx.TargetEpoch()).FirstLayer()):
if err := b.discardChallenge(); err != nil {
return fmt.Errorf("%w: publish epoch has passed", err)
}
return fmt.Errorf("%w: publish epoch has passed", ErrATXChallengeExpired)
case <-ctx.Done():
return ctx.Err()
if err := b.discardChallenge(); err != nil {
return fmt.Errorf("discarding challenge after published ATX: %w", err)
}
return nil
}
Expand Down
76 changes: 10 additions & 66 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type testAtxBuilder struct {
coinbase types.Address
goldenATXID types.ATXID

mhdlr *MockatxHandler
mpub *mocks.MockPublisher
mnipost *MocknipostBuilder
mpost *MockpostSetupProvider
Expand All @@ -116,7 +115,6 @@ func newTestBuilder(tb testing.TB, opts ...BuilderOption) *testAtxBuilder {
nodeID: edSigner.NodeID(),
coinbase: types.GenerateAddress([]byte("33333")),
goldenATXID: types.ATXID(types.HexToHash32("77777")),
mhdlr: NewMockatxHandler(ctrl),
mpub: mocks.NewMockPublisher(ctrl),
mnipost: NewMocknipostBuilder(ctrl),
mpost: NewMockpostSetupProvider(ctrl),
Expand All @@ -139,7 +137,7 @@ func newTestBuilder(tb testing.TB, opts ...BuilderOption) *testAtxBuilder {
return ch
}).AnyTimes()

b := NewBuilder(cfg, tab.nodeID, tab.sig, tab.cdb, tab.mhdlr, tab.mpub, tab.mnipost, tab.mpost,
b := NewBuilder(cfg, tab.nodeID, tab.sig, tab.cdb, tab.mpub, tab.mnipost, tab.mpost,
tab.mclock, tab.msync, lg, opts...)
b.initialPost = &types.Post{
Nonce: 0,
Expand Down Expand Up @@ -173,7 +171,6 @@ func assertLastAtx(r *require.Assertions, nodeID types.NodeID, poetRef types.Has
func publishAtx(
t *testing.T,
tab *testAtxBuilder,
posAtxId types.ATXID,
posEpoch types.EpochID,
currLayer *types.LayerID, // pointer to keep current layer consistent across calls
buildNIPostLayerDuration uint32,
Expand Down Expand Up @@ -218,13 +215,6 @@ func publishAtx(
require.NoError(t, atxs.Add(tab.cdb, vatx))
return nil
})
never := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(ch)
tab.mclock.EXPECT().AwaitLayer((publishEpoch + 1).FirstLayer()).Return(never)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any()).Do(
func(got types.ATXID) {
require.Equal(t, built.ID(), got)
})
// create and publish ATX
err := tab.PublishActivationTx(context.Background())
return built, err
Expand Down Expand Up @@ -470,14 +460,14 @@ func TestBuilder_PublishActivationTx_HappyFlow(t *testing.T) {

// create and publish ATX
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).Times(5)
atx1, err := publishAtx(t, tab, prevAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
atx1, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, atx1)

// create and publish another ATX
currLayer = (posEpoch + 1).FirstLayer()
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).Times(5)
atx2, err := publishAtx(t, tab, atx1.ID(), atx1.PublishEpoch, &currLayer, layersPerEpoch)
atx2, err := publishAtx(t, tab, atx1.PublishEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotEqual(t, atx1, atx2)
require.Equal(t, atx1.TargetEpoch()+1, atx2.TargetEpoch())
Expand Down Expand Up @@ -574,16 +564,12 @@ func TestBuilder_PublishActivationTx_FaultyNet(t *testing.T) {
require.NoError(t, built.Initialize())
return publishErr
})
never := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(never)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any())
// create and publish ATX
err = tab.PublishActivationTx(context.Background())
require.ErrorIs(t, err, publishErr)
require.NotNil(t, built)

// now causing it to publish again, it should use the same atx
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(never)
tab.mpub.EXPECT().Publish(gomock.Any(), pubsub.AtxProtocol, gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, got []byte) error {
var gotAtx types.ActivationTx
Expand All @@ -594,13 +580,7 @@ func TestBuilder_PublishActivationTx_FaultyNet(t *testing.T) {
require.Equal(t, &gotAtx, built)
return nil
})
expireEpoch := publishEpoch + 1
tab.mclock.EXPECT().AwaitLayer(expireEpoch.FirstLayer()).Return(done)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any()).Do(
func(got types.ATXID) {
require.Equal(t, built.ID(), got)
})
require.ErrorIs(t, tab.PublishActivationTx(context.Background()), ErrATXChallengeExpired)
require.NoError(t, tab.PublishActivationTx(context.Background()))

// if the network works and we try to publish a new ATX, the timeout should result in a clean state (so a NIPost should be built)
posEpoch = posEpoch + 1
Expand All @@ -611,7 +591,7 @@ func TestBuilder_PublishActivationTx_FaultyNet(t *testing.T) {
require.NoError(t, err)
require.NoError(t, atxs.Add(tab.cdb, vPosAtx))
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).AnyTimes()
built2, err := publishAtx(t, tab, posAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
built2, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, built2)
require.NotEqual(t, built.NIPostChallenge, built2.NIPostChallenge)
Expand Down Expand Up @@ -669,9 +649,6 @@ func TestBuilder_PublishActivationTx_RebuildNIPostWhenTargetEpochPassed(t *testi
require.NoError(t, built.Initialize())
return publishErr
})
never := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(never)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any())
// create and publish ATX
err = tab.PublishActivationTx(context.Background())
require.ErrorIs(t, err, publishErr)
Expand All @@ -689,7 +666,7 @@ func TestBuilder_PublishActivationTx_RebuildNIPostWhenTargetEpochPassed(t *testi
require.NoError(t, err)
require.NoError(t, atxs.Add(tab.cdb, vPosAtx))
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).AnyTimes()
built2, err := publishAtx(t, tab, posAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
built2, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, built2)
require.NotEqual(t, built.NIPostChallenge, built2.NIPostChallenge)
Expand All @@ -715,7 +692,7 @@ func TestBuilder_PublishActivationTx_NoPrevATX(t *testing.T) {
tab.mpost.EXPECT().VRFNonce().Return(&vrfNonce, nil)
tab.mpost.EXPECT().CommitmentAtx().Return(types.RandomATXID(), nil)
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).AnyTimes()
atx, err := publishAtx(t, tab, posAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
atx, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, atx)
}
Expand Down Expand Up @@ -775,11 +752,6 @@ func TestBuilder_PublishActivationTx_PrevATXWithoutPrevATX(t *testing.T) {
return ch
}).Times(1)

tab.mclock.EXPECT().AwaitLayer(gomock.Not(vPosAtx.PublishEpoch.FirstLayer().Add(layersPerEpoch))).DoAndReturn(func(types.LayerID) <-chan struct{} {
ch := make(chan struct{})
return ch
}).Times(1)

lastOpts := DefaultPostSetupOpts()
tab.mpost.EXPECT().LastOpts().Return(&lastOpts).AnyTimes()

Expand All @@ -789,10 +761,6 @@ func TestBuilder_PublishActivationTx_PrevATXWithoutPrevATX(t *testing.T) {
return newNIPostWithChallenge(t, challenge.Hash(), poetBytes), nil
})

atxChan := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(atxChan)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any())

tab.mpub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, _ string, msg []byte) error {
var atx types.ActivationTx
require.NoError(t, codec.Decode(msg, &atx))
Expand All @@ -813,7 +781,6 @@ func TestBuilder_PublishActivationTx_PrevATXWithoutPrevATX(t *testing.T) {
r.Equal(postAtxPubEpoch+1, atx.PublishEpoch)
r.Equal(types.BytesToHash(poetBytes), atx.GetPoetProofRef())

close(atxChan)
return nil
})

Expand Down Expand Up @@ -859,11 +826,6 @@ func TestBuilder_PublishActivationTx_TargetsEpochBasedOnPosAtx(t *testing.T) {
return ch
}).Times(1)

tab.mclock.EXPECT().AwaitLayer(gomock.Not(vPosAtx.PublishEpoch.FirstLayer().Add(layersPerEpoch))).DoAndReturn(func(types.LayerID) <-chan struct{} {
ch := make(chan struct{})
return ch
}).Times(1)

lastOpts := DefaultPostSetupOpts()
tab.mpost.EXPECT().LastOpts().Return(&lastOpts).AnyTimes()

Expand All @@ -876,10 +838,6 @@ func TestBuilder_PublishActivationTx_TargetsEpochBasedOnPosAtx(t *testing.T) {
return newNIPostWithChallenge(t, challenge.Hash(), poetBytes), nil
})

atxChan := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(atxChan)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any())

tab.mpub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, _ string, msg []byte) error {
var atx types.ActivationTx
require.NoError(t, codec.Decode(msg, &atx))
Expand All @@ -900,7 +858,6 @@ func TestBuilder_PublishActivationTx_TargetsEpochBasedOnPosAtx(t *testing.T) {
r.Equal(posEpoch+1, atx.PublishEpoch)
r.Equal(types.BytesToHash(poetBytes), atx.GetPoetProofRef())

close(atxChan)
return nil
})

Expand Down Expand Up @@ -1022,9 +979,6 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.NoError(t, built.Initialize())
return publishErr
})
never := make(chan struct{})
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(never)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any())
// create and publish ATX
err = tab.PublishActivationTx(context.Background())
require.ErrorIs(t, err, publishErr)
Expand All @@ -1036,7 +990,6 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.NotEmpty(t, got)

// now causing it to publish again, it should use the same atx
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(never)
tab.mpub.EXPECT().Publish(gomock.Any(), pubsub.AtxProtocol, gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, got []byte) error {
var gotAtx types.ActivationTx
Expand All @@ -1047,14 +1000,7 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.Equal(t, &gotAtx, built)
return nil
})
expireEpoch := publishEpoch + 1
tab.mclock.EXPECT().AwaitLayer(expireEpoch.FirstLayer()).Return(done)
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any()).Do(
func(got types.ATXID) {
require.Equal(t, built.ID(), got)
})
// This 👇 ensures that handing of the challenge succeeded and the code moved on to the next part
require.ErrorIs(t, tab.PublishActivationTx(context.Background()), ErrATXChallengeExpired)
require.NoError(t, tab.PublishActivationTx(context.Background()))
got, err = LoadNipostChallenge(tab.nipostBuilder.DataDir())
require.ErrorIs(t, err, os.ErrNotExist)
require.Empty(t, got)
Expand All @@ -1067,7 +1013,7 @@ func TestBuilder_NIPostPublishRecovery(t *testing.T) {
require.NoError(t, err)
require.NoError(t, atxs.Add(tab.cdb, vPosAtx))
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).AnyTimes()
built2, err := publishAtx(t, tab, posAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
built2, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, built2)
require.NotEqual(t, built.NIPostChallenge, built2.NIPostChallenge)
Expand Down Expand Up @@ -1161,7 +1107,7 @@ func TestBuilder_InitialProofGeneratedOnce(t *testing.T) {

currLayer := posEpoch.FirstLayer().Add(1)
tab.mclock.EXPECT().CurrentLayer().Return(currLayer).AnyTimes()
atx, err := publishAtx(t, tab, prevAtx.ID(), posEpoch, &currLayer, layersPerEpoch)
atx, err := publishAtx(t, tab, posEpoch, &currLayer, layersPerEpoch)
require.NoError(t, err)
require.NotNil(t, atx)
assertLastAtx(require.New(t), tab.nodeID, types.BytesToHash(poetByte), atx, vPrevAtx, vPrevAtx, layersPerEpoch)
Expand Down Expand Up @@ -1258,8 +1204,6 @@ func TestWaitPositioningAtx(t *testing.T) {
tab.mclock.EXPECT().AwaitLayer(types.EpochID(1).FirstLayer()).Return(closed).AnyTimes()
tab.mclock.EXPECT().AwaitLayer(types.EpochID(2).FirstLayer()).Return(make(chan struct{})).AnyTimes()
tab.mpub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
tab.mhdlr.EXPECT().AwaitAtx(gomock.Any()).Return(closed).AnyTimes()
tab.mhdlr.EXPECT().UnsubscribeAtx(gomock.Any()).AnyTimes()

err := tab.PublishActivationTx(context.Background())
if len(tc.expect) > 0 {
Expand Down
49 changes: 0 additions & 49 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ var (
errMaliciousATX = errors.New("malicious atx")
)

type atxChan struct {
ch chan struct{}
listeners int
}

// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
local p2p.Peer
Expand All @@ -50,7 +45,6 @@ type Handler struct {
tortoise system.Tortoise
log log.Log
mu sync.Mutex
atxChannels map[types.ATXID]*atxChan
fetcher system.Fetcher
poetCfg PoetConfig
}
Expand Down Expand Up @@ -81,7 +75,6 @@ func NewHandler(
goldenATXID: goldenATXID,
nipostValidator: nipostValidator,
log: log,
atxChannels: make(map[types.ATXID]*atxChan),
fetcher: fetcher,
beacon: beacon,
tortoise: tortoise,
Expand All @@ -95,42 +88,6 @@ func init() {
close(closedChan)
}

// AwaitAtx returns a channel that will receive notification when the specified atx with id is received via gossip.
func (h *Handler) AwaitAtx(id types.ATXID) chan struct{} {
h.mu.Lock()
defer h.mu.Unlock()

if has, err := atxs.Has(h.cdb, id); err == nil && has {
return closedChan
}

ch, found := h.atxChannels[id]
if !found {
ch = &atxChan{
ch: make(chan struct{}),
listeners: 0,
}
h.atxChannels[id] = ch
}
ch.listeners++
return ch.ch
}

// UnsubscribeAtx un subscribes the waiting for a specific atx with atx id id to arrive via gossip.
func (h *Handler) UnsubscribeAtx(id types.ATXID) {
h.mu.Lock()
defer h.mu.Unlock()

ch, found := h.atxChannels[id]
if !found {
return
}
ch.listeners--
if ch.listeners < 1 {
delete(h.atxChannels, id)
}
}

// ProcessAtx validates the active set size declared in the atx, and contextually validates the atx according to atx
// validation rules it then stores the atx with flag set to validity of the atx.
//
Expand Down Expand Up @@ -424,12 +381,6 @@ func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx)
h.beacon.OnAtx(header)
h.tortoise.OnAtx(header.ToData())

// notify subscribers
if ch, found := h.atxChannels[atx.ID()]; found {
close(ch.ch)
delete(h.atxChannels, atx.ID())
}

h.log.WithContext(ctx).With().Debug("finished storing atx in epoch", atx.ID(), atx.PublishEpoch)

// broadcast malfeasance proof last as the verification of the proof will take place
Expand Down
Loading

0 comments on commit f74d4b2

Please sign in to comment.