diff --git a/vms/platformvm/block/builder/builder_test.go b/vms/platformvm/block/builder/builder_test.go index 0aae84a294cf..bba4acbd8832 100644 --- a/vms/platformvm/block/builder/builder_test.go +++ b/vms/platformvm/block/builder/builder_test.go @@ -471,7 +471,7 @@ func TestBuildBlockInvalidStakingDurations(t *testing.T) { require.ErrorIs(tx2DropReason, txexecutor.ErrStakeTooLong) } -func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) { +func TestPreviouslyDroppedTxsCannotBeReAddedToMempool(t *testing.T) { require := require.New(t) env := newEnvironment(t) @@ -497,24 +497,24 @@ func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) { // Transaction should not be marked as dropped before being added to the // mempool - reason := env.mempool.GetDropReason(txID) - require.NoError(reason) + require.NoError(env.mempool.GetDropReason(txID)) // Mark the transaction as dropped errTestingDropped := errors.New("testing dropped") env.mempool.MarkDropped(txID, errTestingDropped) - reason = env.mempool.GetDropReason(txID) - require.ErrorIs(reason, errTestingDropped) + err = env.mempool.GetDropReason(txID) + require.ErrorIs(err, errTestingDropped) // Issue the transaction env.ctx.Lock.Unlock() - require.NoError(env.network.IssueTx(context.Background(), tx)) + err = env.network.IssueTx(context.Background(), tx) + require.ErrorIs(err, errTestingDropped) _, ok := env.mempool.Get(txID) - require.True(ok) + require.False(ok) - // When issued again, the mempool should not be marked as dropped - reason = env.mempool.GetDropReason(txID) - require.NoError(reason) + // When issued again, the mempool should still be marked as dropped + err = env.mempool.GetDropReason(txID) + require.ErrorIs(err, errTestingDropped) } func TestNoErrorOnUnexpectedSetPreferenceDuringBootstrapping(t *testing.T) { diff --git a/vms/platformvm/block/builder/helpers_test.go b/vms/platformvm/block/builder/helpers_test.go index bd0f58bd449d..6c0c5d6fe16d 100644 --- a/vms/platformvm/block/builder/helpers_test.go +++ b/vms/platformvm/block/builder/helpers_test.go @@ -4,6 +4,7 @@ package builder import ( + "context" "testing" "time" @@ -166,6 +167,9 @@ func newEnvironment(t *testing.T) *environment { registerer := prometheus.NewRegistry() res.sender = &common.SenderTest{T: t} + res.sender.SendAppGossipF = func(context.Context, []byte) error { + return nil + } metrics, err := metrics.New("", registerer) require.NoError(err) @@ -182,13 +186,19 @@ func newEnvironment(t *testing.T) *environment { ) txVerifier := network.NewLockedTxVerifier(&res.ctx.Lock, res.blkManager) - res.network = network.New( - logging.NoLog{}, + res.network, err = network.New( + res.backend.Ctx.Log, + res.backend.Ctx.NodeID, + res.backend.Ctx.SubnetID, + res.backend.Ctx.ValidatorState, txVerifier, res.mempool, res.backend.Config.PartialSyncPrimaryNetwork, res.sender, + registerer, + network.DefaultConfig, ) + require.NoError(err) res.Builder = New( res.mempool, diff --git a/vms/platformvm/config/execution_config.go b/vms/platformvm/config/execution_config.go index bfdb191f1281..964e72f51a57 100644 --- a/vms/platformvm/config/execution_config.go +++ b/vms/platformvm/config/execution_config.go @@ -7,9 +7,11 @@ import ( "encoding/json" "github.com/ava-labs/avalanchego/utils/units" + "github.com/ava-labs/avalanchego/vms/platformvm/network" ) var DefaultExecutionConfig = ExecutionConfig{ + Network: network.DefaultConfig, BlockCacheSize: 64 * units.MiB, TxCacheSize: 128 * units.MiB, TransformedSubnetTxCacheSize: 4 * units.MiB, @@ -23,15 +25,16 @@ var DefaultExecutionConfig = ExecutionConfig{ // ExecutionConfig provides execution parameters of PlatformVM type ExecutionConfig struct { - BlockCacheSize int `json:"block-cache-size"` - TxCacheSize int `json:"tx-cache-size"` - TransformedSubnetTxCacheSize int `json:"transformed-subnet-tx-cache-size"` - RewardUTXOsCacheSize int `json:"reward-utxos-cache-size"` - ChainCacheSize int `json:"chain-cache-size"` - ChainDBCacheSize int `json:"chain-db-cache-size"` - BlockIDCacheSize int `json:"block-id-cache-size"` - FxOwnerCacheSize int `json:"fx-owner-cache-size"` - ChecksumsEnabled bool `json:"checksums-enabled"` + Network network.Config `json:"network"` + BlockCacheSize int `json:"block-cache-size"` + TxCacheSize int `json:"tx-cache-size"` + TransformedSubnetTxCacheSize int `json:"transformed-subnet-tx-cache-size"` + RewardUTXOsCacheSize int `json:"reward-utxos-cache-size"` + ChainCacheSize int `json:"chain-cache-size"` + ChainDBCacheSize int `json:"chain-db-cache-size"` + BlockIDCacheSize int `json:"block-id-cache-size"` + FxOwnerCacheSize int `json:"fx-owner-cache-size"` + ChecksumsEnabled bool `json:"checksums-enabled"` } // GetExecutionConfig returns an ExecutionConfig diff --git a/vms/platformvm/config/execution_config_test.go b/vms/platformvm/config/execution_config_test.go index 0adbd862bd2d..6ba78df133ed 100644 --- a/vms/platformvm/config/execution_config_test.go +++ b/vms/platformvm/config/execution_config_test.go @@ -7,6 +7,8 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/vms/platformvm/network" ) func TestExecutionConfigUnmarshal(t *testing.T) { @@ -39,6 +41,66 @@ func TestExecutionConfigUnmarshal(t *testing.T) { t.Run("all values extracted from json", func(t *testing.T) { require := require.New(t) b := []byte(`{ + "network": { + "max-validator-set-staleness": 1, + "target-gossip-size": 2, + "pull-gossip-poll-size": 3, + "pull-gossip-frequency": 4, + "pull-gossip-throttling-period": 5, + "pull-gossip-throttling-limit": 6, + "expected-bloom-filter-elements":7, + "expected-bloom-filter-false-positive-probability": 8, + "max-bloom-filter-false-positive-probability": 9, + "legacy-push-gossip-cache-size": 10 + }, + "block-cache-size": 1, + "tx-cache-size": 2, + "transformed-subnet-tx-cache-size": 3, + "reward-utxos-cache-size": 5, + "chain-cache-size": 6, + "chain-db-cache-size": 7, + "block-id-cache-size": 8, + "fx-owner-cache-size": 9, + "checksums-enabled": true + }`) + ec, err := GetExecutionConfig(b) + require.NoError(err) + expected := &ExecutionConfig{ + Network: network.Config{ + MaxValidatorSetStaleness: 1, + TargetGossipSize: 2, + PullGossipPollSize: 3, + PullGossipFrequency: 4, + PullGossipThrottlingPeriod: 5, + PullGossipThrottlingLimit: 6, + ExpectedBloomFilterElements: 7, + ExpectedBloomFilterFalsePositiveProbability: 8, + MaxBloomFilterFalsePositiveProbability: 9, + LegacyPushGossipCacheSize: 10, + }, + BlockCacheSize: 1, + TxCacheSize: 2, + TransformedSubnetTxCacheSize: 3, + RewardUTXOsCacheSize: 5, + ChainCacheSize: 6, + ChainDBCacheSize: 7, + BlockIDCacheSize: 8, + FxOwnerCacheSize: 9, + ChecksumsEnabled: true, + } + require.Equal(expected, ec) + }) + + t.Run("default values applied correctly", func(t *testing.T) { + require := require.New(t) + b := []byte(`{ + "network": { + "max-validator-set-staleness": 1, + "target-gossip-size": 2, + "pull-gossip-poll-size": 3, + "pull-gossip-frequency": 4, + "pull-gossip-throttling-period": 5 + }, "block-cache-size": 1, "tx-cache-size": 2, "transformed-subnet-tx-cache-size": 3, @@ -52,6 +114,18 @@ func TestExecutionConfigUnmarshal(t *testing.T) { ec, err := GetExecutionConfig(b) require.NoError(err) expected := &ExecutionConfig{ + Network: network.Config{ + MaxValidatorSetStaleness: 1, + TargetGossipSize: 2, + PullGossipPollSize: 3, + PullGossipFrequency: 4, + PullGossipThrottlingPeriod: 5, + PullGossipThrottlingLimit: DefaultExecutionConfig.Network.PullGossipThrottlingLimit, + ExpectedBloomFilterElements: DefaultExecutionConfig.Network.ExpectedBloomFilterElements, + ExpectedBloomFilterFalsePositiveProbability: DefaultExecutionConfig.Network.ExpectedBloomFilterFalsePositiveProbability, + MaxBloomFilterFalsePositiveProbability: DefaultExecutionConfig.Network.MaxBloomFilterFalsePositiveProbability, + LegacyPushGossipCacheSize: DefaultExecutionConfig.Network.LegacyPushGossipCacheSize, + }, BlockCacheSize: 1, TxCacheSize: 2, TransformedSubnetTxCacheSize: 3, diff --git a/vms/platformvm/network/config.go b/vms/platformvm/network/config.go new file mode 100644 index 000000000000..2ff7828df2e4 --- /dev/null +++ b/vms/platformvm/network/config.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "time" + + "github.com/ava-labs/avalanchego/utils/units" +) + +var DefaultConfig = Config{ + MaxValidatorSetStaleness: time.Minute, + TargetGossipSize: 20 * units.KiB, + PullGossipPollSize: 1, + PullGossipFrequency: 1500 * time.Millisecond, + PullGossipThrottlingPeriod: 10 * time.Second, + PullGossipThrottlingLimit: 2, + ExpectedBloomFilterElements: 8 * 1024, + ExpectedBloomFilterFalsePositiveProbability: .01, + MaxBloomFilterFalsePositiveProbability: .05, + LegacyPushGossipCacheSize: 512, +} + +type Config struct { + // MaxValidatorSetStaleness limits how old of a validator set the network + // will use for peer sampling and rate limiting. + MaxValidatorSetStaleness time.Duration `json:"max-validator-set-staleness"` + // TargetGossipSize is the number of bytes that will be attempted to be + // sent when pushing transactions and when responded to transaction pull + // requests. + TargetGossipSize int `json:"target-gossip-size"` + // PullGossipPollSize is the number of validators to sample when performing + // a round of pull gossip. + PullGossipPollSize int `json:"pull-gossip-poll-size"` + // PullGossipFrequency is how frequently rounds of pull gossip are + // performed. + PullGossipFrequency time.Duration `json:"pull-gossip-frequency"` + // PullGossipThrottlingPeriod is how large of a window the throttler should + // use. + PullGossipThrottlingPeriod time.Duration `json:"pull-gossip-throttling-period"` + // PullGossipThrottlingLimit is the number of pull querys that are allowed + // by a validator in every throttling window. + PullGossipThrottlingLimit int `json:"pull-gossip-throttling-limit"` + // ExpectedBloomFilterElements is the number of elements to expect when + // creating a new bloom filter. The larger this number is, the larger the + // bloom filter will be. + ExpectedBloomFilterElements uint64 `json:"expected-bloom-filter-elements"` + // ExpectedBloomFilterFalsePositiveProbability is the expected probability + // of a false positive after having inserted ExpectedBloomFilterElements + // into a bloom filter. The smaller this number is, the larger the bloom + // filter will be. + ExpectedBloomFilterFalsePositiveProbability float64 `json:"expected-bloom-filter-false-positive-probability"` + // MaxBloomFilterFalsePositiveProbability is used to determine when the + // bloom filter should be refreshed. Once the expected probability of a + // false positive exceeds this value, the bloom filter will be regenerated. + // The smaller this number is, the more frequently that the bloom filter + // will be regenerated. + MaxBloomFilterFalsePositiveProbability float64 `json:"max-bloom-filter-false-positive-probability"` + // LegacyPushGossipCacheSize tracks the most recently received transactions + // and ensures to only gossip them once. + // + // Deprecated: The legacy push gossip mechanism is deprecated in favor of + // the p2p SDK's push gossip mechanism. + LegacyPushGossipCacheSize int `json:"legacy-push-gossip-cache-size"` +} diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go new file mode 100644 index 000000000000..5c12ee7a83bc --- /dev/null +++ b/vms/platformvm/network/gossip.go @@ -0,0 +1,138 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/txs" + "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" +) + +var ( + _ p2p.Handler = (*txGossipHandler)(nil) + _ gossip.Marshaller[*txs.Tx] = (*txMarshaller)(nil) + _ gossip.Gossipable = (*txs.Tx)(nil) +) + +// txGossipHandler is the handler called when serving gossip messages +type txGossipHandler struct { + p2p.NoOpHandler + appGossipHandler p2p.Handler + appRequestHandler p2p.Handler +} + +func (t txGossipHandler) AppGossip( + ctx context.Context, + nodeID ids.NodeID, + gossipBytes []byte, +) { + t.appGossipHandler.AppGossip(ctx, nodeID, gossipBytes) +} + +func (t txGossipHandler) AppRequest( + ctx context.Context, + nodeID ids.NodeID, + deadline time.Time, + requestBytes []byte, +) ([]byte, error) { + return t.appRequestHandler.AppRequest(ctx, nodeID, deadline, requestBytes) +} + +type txMarshaller struct{} + +func (txMarshaller) MarshalGossip(tx *txs.Tx) ([]byte, error) { + return tx.Bytes(), nil +} + +func (txMarshaller) UnmarshalGossip(bytes []byte) (*txs.Tx, error) { + return txs.Parse(txs.Codec, bytes) +} + +func newGossipMempool( + mempool mempool.Mempool, + log logging.Logger, + txVerifier TxVerifier, + maxExpectedElements uint64, + falsePositiveProbability float64, + maxFalsePositiveProbability float64, +) (*gossipMempool, error) { + bloom, err := gossip.NewBloomFilter(maxExpectedElements, falsePositiveProbability) + return &gossipMempool{ + Mempool: mempool, + log: log, + txVerifier: txVerifier, + bloom: bloom, + maxFalsePositiveProbability: maxFalsePositiveProbability, + }, err +} + +type gossipMempool struct { + mempool.Mempool + log logging.Logger + txVerifier TxVerifier + maxFalsePositiveProbability float64 + + lock sync.RWMutex + bloom *gossip.BloomFilter +} + +func (g *gossipMempool) Add(tx *txs.Tx) error { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx) + } + + if reason := g.Mempool.GetDropReason(txID); reason != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + return reason + } + + if err := g.txVerifier.VerifyTx(tx); err != nil { + g.Mempool.MarkDropped(txID, err) + return err + } + + if err := g.Mempool.Add(tx); err != nil { + g.Mempool.MarkDropped(txID, err) + return err + } + + g.lock.Lock() + defer g.lock.Unlock() + + g.bloom.Add(tx) + reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.maxFalsePositiveProbability) + if err != nil { + return err + } + + if reset { + g.log.Debug("resetting bloom filter") + g.Mempool.Iterate(func(tx *txs.Tx) bool { + g.bloom.Add(tx) + return true + }) + } + + g.Mempool.RequestBuildBlock(false) + return nil +} + +func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte, err error) { + g.lock.RLock() + defer g.lock.RUnlock() + + return g.bloom.Marshal() +} diff --git a/vms/platformvm/network/gossip_test.go b/vms/platformvm/network/gossip_test.go new file mode 100644 index 000000000000..9c3fff9b5ff9 --- /dev/null +++ b/vms/platformvm/network/gossip_test.go @@ -0,0 +1,147 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "go.uber.org/mock/gomock" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/txs" + "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" +) + +var errFoo = errors.New("foo") + +// Add should error if verification errors +func TestGossipMempoolAddVerificationError(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + txID := ids.GenerateTestID() + tx := &txs.Tx{ + TxID: txID, + } + + mempool := mempool.NewMockMempool(ctrl) + txVerifier := testTxVerifier{err: errFoo} + + mempool.EXPECT().Get(txID).Return(nil, false) + mempool.EXPECT().GetDropReason(txID).Return(nil) + mempool.EXPECT().MarkDropped(txID, errFoo) + + gossipMempool, err := newGossipMempool( + mempool, + logging.NoLog{}, + txVerifier, + testConfig.ExpectedBloomFilterElements, + testConfig.ExpectedBloomFilterFalsePositiveProbability, + testConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + err = gossipMempool.Add(tx) + require.ErrorIs(err, errFoo) + require.False(gossipMempool.bloom.Has(tx)) +} + +// Add should error if adding to the mempool errors +func TestGossipMempoolAddError(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + txID := ids.GenerateTestID() + tx := &txs.Tx{ + TxID: txID, + } + + txVerifier := testTxVerifier{} + mempool := mempool.NewMockMempool(ctrl) + + mempool.EXPECT().Get(txID).Return(nil, false) + mempool.EXPECT().GetDropReason(txID).Return(nil) + mempool.EXPECT().Add(tx).Return(errFoo) + mempool.EXPECT().MarkDropped(txID, errFoo).AnyTimes() + + gossipMempool, err := newGossipMempool( + mempool, + logging.NoLog{}, + txVerifier, + testConfig.ExpectedBloomFilterElements, + testConfig.ExpectedBloomFilterFalsePositiveProbability, + testConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + err = gossipMempool.Add(tx) + require.ErrorIs(err, errFoo) + require.False(gossipMempool.bloom.Has(tx)) +} + +// Adding a duplicate to the mempool should return an error +func TestMempoolDuplicate(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + testMempool := mempool.NewMockMempool(ctrl) + txVerifier := testTxVerifier{} + + txID := ids.GenerateTestID() + tx := &txs.Tx{ + TxID: txID, + } + + testMempool.EXPECT().Get(txID).Return(tx, true) + + gossipMempool, err := newGossipMempool( + testMempool, + logging.NoLog{}, + txVerifier, + testConfig.ExpectedBloomFilterElements, + testConfig.ExpectedBloomFilterFalsePositiveProbability, + testConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + err = gossipMempool.Add(tx) + require.ErrorIs(err, mempool.ErrDuplicateTx) + require.False(gossipMempool.bloom.Has(tx)) +} + +// Adding a tx to the mempool should add it to the bloom filter +func TestGossipAddBloomFilter(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + txID := ids.GenerateTestID() + tx := &txs.Tx{ + TxID: txID, + } + + txVerifier := testTxVerifier{} + mempool := mempool.NewMockMempool(ctrl) + + mempool.EXPECT().Get(txID).Return(nil, false) + mempool.EXPECT().GetDropReason(txID).Return(nil) + mempool.EXPECT().Add(tx).Return(nil) + mempool.EXPECT().RequestBuildBlock(false) + + gossipMempool, err := newGossipMempool( + mempool, + logging.NoLog{}, + txVerifier, + testConfig.ExpectedBloomFilterElements, + testConfig.ExpectedBloomFilterFalsePositiveProbability, + testConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + require.NoError(gossipMempool.Add(tx)) + require.True(gossipMempool.bloom.Has(tx)) +} diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index e3003768d13f..58fb0ec156eb 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -6,42 +6,49 @@ package network import ( "context" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/components/message" "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" ) -// We allow [recentCacheSize] to be fairly large because we only store hashes -// in the cache, not entire transactions. -const recentCacheSize = 512 - -var _ Network = (*network)(nil) +const txGossipHandlerID = 0 type Network interface { common.AppHandler + // Gossip starts gossiping transactions and blocks until it completes. + Gossip(ctx context.Context) // IssueTx verifies the transaction at the currently preferred state, adds // it to the mempool, and gossips it to the network. IssueTx(context.Context, *txs.Tx) error } type network struct { - // We embed a noop handler for all unhandled messages - common.AppHandler + *p2p.Network log logging.Logger txVerifier TxVerifier - mempool mempool.Mempool + mempool *gossipMempool partialSyncPrimaryNetwork bool appSender common.AppSender + txPushGossiper gossip.Accumulator[*txs.Tx] + txPullGossiper gossip.Gossiper + txGossipFrequency time.Duration + // gossip related attributes recentTxsLock sync.Mutex recentTxs *cache.LRU[ids.ID, struct{}] @@ -49,21 +56,129 @@ type network struct { func New( log logging.Logger, + nodeID ids.NodeID, + subnetID ids.ID, + vdrs validators.State, txVerifier TxVerifier, mempool mempool.Mempool, partialSyncPrimaryNetwork bool, appSender common.AppSender, -) Network { - return &network{ - AppHandler: common.NewNoOpAppHandler(log), + registerer prometheus.Registerer, + config Config, +) (Network, error) { + p2pNetwork, err := p2p.NewNetwork(log, appSender, registerer, "p2p") + if err != nil { + return nil, err + } + + marshaller := txMarshaller{} + validators := p2p.NewValidators( + p2pNetwork.Peers, + log, + subnetID, + vdrs, + config.MaxValidatorSetStaleness, + ) + txGossipClient := p2pNetwork.NewClient( + txGossipHandlerID, + p2p.WithValidatorSampling(validators), + ) + txGossipMetrics, err := gossip.NewMetrics(registerer, "tx") + if err != nil { + return nil, err + } + + txPushGossiper := gossip.NewPushGossiper[*txs.Tx]( + marshaller, + txGossipClient, + txGossipMetrics, + config.TargetGossipSize, + ) + + gossipMempool, err := newGossipMempool( + mempool, + log, + txVerifier, + config.ExpectedBloomFilterElements, + config.ExpectedBloomFilterFalsePositiveProbability, + config.MaxBloomFilterFalsePositiveProbability, + ) + if err != nil { + return nil, err + } + + var txPullGossiper gossip.Gossiper + txPullGossiper = gossip.NewPullGossiper[*txs.Tx]( + log, + marshaller, + gossipMempool, + txGossipClient, + txGossipMetrics, + config.PullGossipPollSize, + ) + + // Gossip requests are only served if a node is a validator + txPullGossiper = gossip.ValidatorGossiper{ + Gossiper: txPullGossiper, + NodeID: nodeID, + Validators: validators, + } + + handler := gossip.NewHandler[*txs.Tx]( + log, + marshaller, + txPushGossiper, + gossipMempool, + txGossipMetrics, + config.TargetGossipSize, + ) + + validatorHandler := p2p.NewValidatorHandler( + p2p.NewThrottlerHandler( + handler, + p2p.NewSlidingWindowThrottler( + config.PullGossipThrottlingPeriod, + config.PullGossipThrottlingLimit, + ), + log, + ), + validators, + log, + ) + + // We allow pushing txs between all peers, but only serve gossip requests + // from validators + txGossipHandler := txGossipHandler{ + appGossipHandler: handler, + appRequestHandler: validatorHandler, + } + if err := p2pNetwork.AddHandler(txGossipHandlerID, txGossipHandler); err != nil { + return nil, err + } + + return &network{ + Network: p2pNetwork, log: log, txVerifier: txVerifier, - mempool: mempool, + mempool: gossipMempool, partialSyncPrimaryNetwork: partialSyncPrimaryNetwork, appSender: appSender, - recentTxs: &cache.LRU[ids.ID, struct{}]{Size: recentCacheSize}, + txPushGossiper: txPushGossiper, + txPullGossiper: txPullGossiper, + txGossipFrequency: config.PullGossipFrequency, + recentTxs: &cache.LRU[ids.ID, struct{}]{Size: config.LegacyPushGossipCacheSize}, + }, nil +} + +func (n *network) Gossip(ctx context.Context) { + // If the node is running partial sync, we should not perform any pull + // gossip. + if n.partialSyncPrimaryNetwork { + return } + + gossip.Every(ctx, n.log, n.txPullGossiper, n.txGossipFrequency) } func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error { @@ -81,10 +196,11 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b msgIntf, err := message.Parse(msgBytes) if err != nil { - n.log.Debug("dropping AppGossip message", + n.log.Debug("forwarding AppGossip to p2p network", zap.String("reason", "failed to parse message"), ) - return nil + + return n.Network.AppGossip(ctx, nodeID, msgBytes) } msg, ok := msgIntf.(*message.Tx) @@ -106,12 +222,11 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b } txID := tx.ID() - if reason := n.mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - return nil - } if err := n.issueTx(tx); err == nil { - n.gossipTx(ctx, txID, msgBytes) + n.legacyGossipTx(ctx, txID, msgBytes) + + n.txPushGossiper.Add(tx) + return n.txPushGossiper.Gossip(ctx) } return nil } @@ -131,29 +246,13 @@ func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error { } txID := tx.ID() - n.gossipTx(ctx, txID, msgBytes) - return nil + n.legacyGossipTx(ctx, txID, msgBytes) + n.txPushGossiper.Add(tx) + return n.txPushGossiper.Gossip(ctx) } // returns nil if the tx is in the mempool func (n *network) issueTx(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := n.mempool.Get(txID); ok { - // The tx is already in the mempool - return nil - } - - // Verify the tx at the currently preferred state - if err := n.txVerifier.VerifyTx(tx); err != nil { - n.log.Debug("tx failed verification", - zap.Stringer("txID", txID), - zap.Error(err), - ) - - n.mempool.MarkDropped(txID, err) - return err - } - // If we are partially syncing the Primary Network, we should not be // maintaining the transaction mempool locally. if n.partialSyncPrimaryNetwork { @@ -162,20 +261,17 @@ func (n *network) issueTx(tx *txs.Tx) error { if err := n.mempool.Add(tx); err != nil { n.log.Debug("tx failed to be added to the mempool", - zap.Stringer("txID", txID), + zap.Stringer("txID", tx.ID()), zap.Error(err), ) - n.mempool.MarkDropped(txID, err) return err } - n.mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/) - return nil } -func (n *network) gossipTx(ctx context.Context, txID ids.ID, msgBytes []byte) { +func (n *network) legacyGossipTx(ctx context.Context, txID ids.ID, msgBytes []byte) { n.recentTxsLock.Lock() _, has := n.recentTxs.Get(txID) n.recentTxs.Put(txID, struct{}{}) diff --git a/vms/platformvm/network/network_test.go b/vms/platformvm/network/network_test.go index f0b3cc0a87fc..32e224d971e1 100644 --- a/vms/platformvm/network/network_test.go +++ b/vms/platformvm/network/network_test.go @@ -7,6 +7,9 @@ import ( "context" "errors" "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -14,6 +17,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/components/message" @@ -22,10 +26,24 @@ import ( ) var ( - errTest = errors.New("test error") - _ TxVerifier = (*testTxVerifier)(nil) + errTest = errors.New("test error") + + testConfig = Config{ + MaxValidatorSetStaleness: time.Second, + TargetGossipSize: 1, + PullGossipPollSize: 1, + PullGossipFrequency: time.Second, + PullGossipThrottlingPeriod: time.Second, + PullGossipThrottlingLimit: 1, + ExpectedBloomFilterElements: 10, + ExpectedBloomFilterFalsePositiveProbability: .1, + MaxBloomFilterFalsePositiveProbability: .5, + LegacyPushGossipCacheSize: 512, + } ) +var _ TxVerifier = (*testTxVerifier)(nil) + type testTxVerifier struct { err error } @@ -92,7 +110,6 @@ func TestNetworkAppGossip(t *testing.T) { }, }, { - // Issue returns nil because mempool has tx. We should gossip the tx. name: "issuance succeeds", msgBytesFunc: func() []byte { msg := message.Tx{ @@ -104,13 +121,17 @@ func TestNetworkAppGossip(t *testing.T) { }, mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) - mempool.EXPECT().Get(gomock.Any()).Return(testTx, true) + mempool.EXPECT().Get(gomock.Any()).Return(nil, false) mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil) + mempool.EXPECT().Add(gomock.Any()).Return(nil) + mempool.EXPECT().RequestBuildBlock(false) return mempool }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { + // we should gossip the tx twice because sdk and legacy gossip + // currently runs together appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Times(2) return appSender }, }, @@ -127,6 +148,7 @@ func TestNetworkAppGossip(t *testing.T) { }, mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) + mempool.EXPECT().Get(gomock.Any()).Return(nil, false) mempool.EXPECT().GetDropReason(gomock.Any()).Return(errTest) return mempool }, @@ -162,16 +184,25 @@ func TestNetworkAppGossip(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require := require.New(t) + ctx := context.Background() ctrl := gomock.NewController(t) - n := New( + snowCtx := snowtest.Context(t, ids.Empty) + n, err := New( logging.NoLog{}, + ids.EmptyNodeID, + ids.Empty, + snowCtx.ValidatorState, testTxVerifier{}, tt.mempoolFunc(ctrl), tt.partialSyncPrimaryNetwork, tt.appSenderFunc(ctrl), + prometheus.NewRegistry(), + DefaultConfig, ) - require.NoError(n.AppGossip(context.Background(), ids.GenerateTestNodeID(), tt.msgBytesFunc())) + require.NoError(err) + + require.NoError(n.AppGossip(ctx, ids.GenerateTestNodeID(), tt.msgBytesFunc())) }) } } @@ -197,22 +228,18 @@ func TestNetworkIssueTx(t *testing.T) { return mempool }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // Should gossip the tx - appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) - return appSender + return common.NewMockSender(ctrl) }, - expectedErr: nil, + expectedErr: mempool.ErrDuplicateTx, }, { name: "transaction marked as dropped in mempool", mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) mempool.EXPECT().Get(gomock.Any()).Return(nil, false) - mempool.EXPECT().MarkDropped(gomock.Any(), gomock.Any()) + mempool.EXPECT().GetDropReason(gomock.Any()).Return(errTest) return mempool }, - txVerifier: testTxVerifier{err: errTest}, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { // Shouldn't gossip the tx return common.NewMockSender(ctrl) @@ -224,6 +251,7 @@ func TestNetworkIssueTx(t *testing.T) { mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) mempool.EXPECT().Get(gomock.Any()).Return(nil, false) + mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil) mempool.EXPECT().MarkDropped(gomock.Any(), gomock.Any()) return mempool }, @@ -239,8 +267,9 @@ func TestNetworkIssueTx(t *testing.T) { mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) mempool.EXPECT().Get(gomock.Any()).Return(nil, false) + mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil) mempool.EXPECT().Add(gomock.Any()).Return(errTest) - mempool.EXPECT().MarkDropped(gomock.Any(), errTest) + mempool.EXPECT().MarkDropped(gomock.Any(), gomock.Any()) return mempool }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { @@ -252,15 +281,14 @@ func TestNetworkIssueTx(t *testing.T) { { name: "AppGossip tx but do not add to mempool if primary network is not being fully synced", mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { - mempool := mempool.NewMockMempool(ctrl) - mempool.EXPECT().Get(gomock.Any()).Return(nil, false) - return mempool + return mempool.NewMockMempool(ctrl) }, partialSyncPrimaryNetwork: true, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // Should gossip the tx + // we should gossip the tx twice because sdk and legacy gossip + // currently runs together appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) return appSender }, expectedErr: nil, @@ -269,15 +297,17 @@ func TestNetworkIssueTx(t *testing.T) { name: "happy path", mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool { mempool := mempool.NewMockMempool(ctrl) - mempool.EXPECT().Get(gomock.Any()).Return(tx, false) + mempool.EXPECT().Get(gomock.Any()).Return(nil, false) + mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil) mempool.EXPECT().Add(gomock.Any()).Return(nil) mempool.EXPECT().RequestBuildBlock(false) return mempool }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { - // Should gossip the tx + // we should gossip the tx twice because sdk and legacy gossip + // currently runs together appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) return appSender }, expectedErr: nil, @@ -289,14 +319,22 @@ func TestNetworkIssueTx(t *testing.T) { require := require.New(t) ctrl := gomock.NewController(t) - n := New( - logging.NoLog{}, + snowCtx := snowtest.Context(t, ids.Empty) + n, err := New( + snowCtx.Log, + snowCtx.NodeID, + snowCtx.SubnetID, + snowCtx.ValidatorState, tt.txVerifier, tt.mempoolFunc(ctrl), tt.partialSyncPrimaryNetwork, tt.appSenderFunc(ctrl), + prometheus.NewRegistry(), + testConfig, ) - err := n.IssueTx(context.Background(), tx) + require.NoError(err) + + err = n.IssueTx(context.Background(), tx) require.ErrorIs(err, tt.expectedErr) }) } @@ -308,25 +346,32 @@ func TestNetworkGossipTx(t *testing.T) { appSender := common.NewMockSender(ctrl) - nIntf := New( - logging.NoLog{}, + snowCtx := snowtest.Context(t, ids.Empty) + nIntf, err := New( + snowCtx.Log, + snowCtx.NodeID, + snowCtx.SubnetID, + snowCtx.ValidatorState, testTxVerifier{}, mempool.NewMockMempool(ctrl), false, appSender, + prometheus.NewRegistry(), + testConfig, ) + require.NoError(err) require.IsType(&network{}, nIntf) n := nIntf.(*network) // Case: Tx was recently gossiped txID := ids.GenerateTestID() n.recentTxs.Put(txID, struct{}{}) - n.gossipTx(context.Background(), txID, []byte{}) + n.legacyGossipTx(context.Background(), txID, []byte{}) // Didn't make a call to SendAppGossip // Case: Tx was not recently gossiped msgBytes := []byte{1, 2, 3} appSender.EXPECT().SendAppGossip(gomock.Any(), msgBytes).Return(nil) - n.gossipTx(context.Background(), ids.GenerateTestID(), msgBytes) + n.legacyGossipTx(context.Background(), ids.GenerateTestID(), msgBytes) // Did make a call to SendAppGossip } diff --git a/vms/platformvm/service_test.go b/vms/platformvm/service_test.go index 376b809fa3d8..de0c16718b9a 100644 --- a/vms/platformvm/service_test.go +++ b/vms/platformvm/service_test.go @@ -217,7 +217,6 @@ func TestGetTxStatus(t *testing.T) { }, })) - oldSharedMemory := mutableSharedMemory.SharedMemory mutableSharedMemory.SharedMemory = sm tx, err := service.vm.txBuilder.NewImportTx( @@ -228,8 +227,6 @@ func TestGetTxStatus(t *testing.T) { ) require.NoError(err) - mutableSharedMemory.SharedMemory = oldSharedMemory - service.vm.ctx.Lock.Unlock() var ( @@ -241,11 +238,6 @@ func TestGetTxStatus(t *testing.T) { require.Zero(resp.Reason) // put the chain in existing chain list - err = service.vm.Network.IssueTx(context.Background(), tx) - require.ErrorIs(err, database.ErrNotFound) // Missing shared memory UTXO - - mutableSharedMemory.SharedMemory = sm - require.NoError(service.vm.Network.IssueTx(context.Background(), tx)) service.vm.ctx.Lock.Lock() diff --git a/vms/platformvm/txs/tx.go b/vms/platformvm/txs/tx.go index 27cc812e5a79..c9713ffe09c3 100644 --- a/vms/platformvm/txs/tx.go +++ b/vms/platformvm/txs/tx.go @@ -9,6 +9,7 @@ import ( "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/hashing" @@ -18,6 +19,8 @@ import ( ) var ( + _ gossip.Gossipable = (*Tx)(nil) + ErrNilSignedTx = errors.New("nil signed tx is not valid") errSignedTxNotInitialized = errors.New("signed tx was never initialized and is not valid") @@ -93,6 +96,10 @@ func (tx *Tx) ID() ids.ID { return tx.TxID } +func (tx *Tx) GossipID() ids.ID { + return tx.TxID +} + // UTXOs returns the UTXOs transaction is producing. func (tx *Tx) UTXOs() []*avax.UTXO { outs := tx.Unsigned.Outputs() diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index ef4581b5dac9..e9885a9bad94 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/http" + "sync" "github.com/gorilla/rpc/v2" @@ -89,6 +90,9 @@ type VM struct { txBuilder txbuilder.Builder manager blockexecutor.Manager + startShutdown context.CancelFunc + awaitShutdown sync.WaitGroup + // TODO: Remove after v1.11.x is activated pruned utils.Atomic[bool] } @@ -192,13 +196,30 @@ func (vm *VM) Initialize( ) txVerifier := network.NewLockedTxVerifier(&txExecutorBackend.Ctx.Lock, vm.manager) - vm.Network = network.New( + vm.Network, err = network.New( chainCtx.Log, + chainCtx.NodeID, + chainCtx.SubnetID, + chainCtx.ValidatorState, txVerifier, mempool, txExecutorBackend.Config.PartialSyncPrimaryNetwork, appSender, + registerer, + execConfig.Network, ) + if err != nil { + return fmt.Errorf("failed to initialize network: %w", err) + } + + vmCtx, cancel := context.WithCancel(context.Background()) + vm.startShutdown = cancel + vm.awaitShutdown.Add(1) + go func() { + defer vm.awaitShutdown.Done() + vm.Network.Gossip(vmCtx) + }() + vm.Builder = blockbuilder.New( mempool, vm.txBuilder, @@ -354,6 +375,9 @@ func (vm *VM) Shutdown(context.Context) error { return nil } + vm.startShutdown() + vm.awaitShutdown.Wait() + vm.Builder.ShutdownBlockTimer() if vm.bootstrapped.Get() {