Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix libp2p identify race #6573

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func NewFetch(
if host != nil {
connectedf := func(peer p2p.Peer) {
protocols := func() []protocol.ID {
// Make sure that the protocol list for the peer is correct.
// This is similar to what Host.NewStream does to make
// sure it is possible to use one of the specified
// protocols. If we don't do this, there may be a race causing
// some peers to be unnecessarily ignored.
host.Identify(peer)
Comment on lines +303 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this:

Suggested change
// Make sure that the protocol list for the peer is correct.
// This is similar to what Host.NewStream does to make
// sure it is possible to use one of the specified
// protocols. If we don't do this, there may be a race causing
// some peers to be unnecessarily ignored.
host.Identify(peer)
pi := host.Peerstore().PeerInfo(peer)
if err := host.Connect(context.Background(), pi); err != nil {
f.logger.Debug("failed to connect to peer",
zap.Stringer("id", peer),
zap.Error(err),
)
return nil
}

has basically the same effect without a) needing to intercept the building process of the libp2p host and b) allowing us to pass a timeout to Connect in case we want to abort early if something goes wrong.

Connect in both implementations of Host eventually calls IdentifyWait which is also called by IdentifyCon but allows passing a context in case we want to abort early.

Wdyt?

ps, err := host.Peerstore().GetProtocols(peer)
if err != nil {
f.logger.Debug("failed to get protocols for peer",
Expand Down
10 changes: 7 additions & 3 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,16 +966,16 @@ func TestFetch_GetCert(t *testing.T) {

// Test if GetAtxs() limits the number of concurrent requests to `cfg.GetAtxsConcurrency`.
func Test_GetAtxsLimiting(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

const (
totalRequests = 100
getAtxConcurrency = 10
)

for _, withLimiting := range []bool{false, true} {
t.Run(fmt.Sprintf("with limiting: %v", withLimiting), func(t *testing.T) {
// Do not connect immediately in order to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
srv := server.New(
wrapHost(mesh.Hosts()[1]),
hashProtocol,
Expand Down Expand Up @@ -1038,6 +1038,10 @@ func Test_GetAtxsLimiting(t *testing.T) {
require.NoError(t, f.Start())
t.Cleanup(f.Stop)

// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

var atxIds []types.ATXID
for i := 0; i < totalRequests; i++ {
id := types.RandomATXID()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/zeebo/blake3 v0.2.4
go.uber.org/fx v1.23.0
go.uber.org/mock v0.5.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.23.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.22.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/security/noise"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -282,6 +284,7 @@ func New(
return nil, fmt.Errorf("can't set up connection gater: %w", err)
}

var idService identify.IDService
pt := peerinfo.NewPeerInfoTracker()
lopts := []libp2p.Option{
libp2p.Identity(key),
Expand All @@ -295,6 +298,11 @@ func New(
cfg.AutoNATServer.PeerMax,
cfg.AutoNATServer.ResetPeriod),
libp2p.ConnectionGater(g),
// Obtain the IDService via fx dependency injection.
// This function is always called by libp2p.New().
libp2p.WithFxOption(fx.Invoke(func(ids identify.IDService) {
idService = ids
})),
}
if cfg.EnableTCPTransport {
lopts = append(lopts,
Expand Down Expand Up @@ -427,6 +435,7 @@ func New(
WithBootnodes(bootnodesMap),
WithDirectNodes(g.direct),
WithPeerInfo(pt),
WithIDService(idService),
)
return Upgrade(h, opts...)
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestConnectedPersist(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down Expand Up @@ -46,6 +48,8 @@ func TestConnectedBrokenCRC(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down
36 changes: 33 additions & 3 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,30 @@ func wrapHost(tb testing.TB, h host.Host) Host {
func TestServer(t *testing.T) {
const limit = 1024

mesh, err := mocknet.FullMeshConnected(5)
// Don't establish peer connections immediately.
// If we make the nodes connect to each other right away, there
// may be a problem with libp2p identify service. Namely, when a
// connection to a peer is established, identify request is sent
// to that peer, to which the peer sends identify response. Also,
// when a new handler is added to the host via SetStreamHandler,
// as it's done in p2p/server.Server, the identify response
// message is pushed to the peers currently connected to this
// node.
// When a server is created immediately following a connection
// from a peer, that peer may receive 2 identify response messages
// in quick succession: one which is indeed the response to
// initial identify request, and one which is push message
// generated by creation of the server. In some cases, the initial
// response may not contain the protocol of the freshly added
// server (hashProtocol="hs/1" in this case).
// The identify response message sent when SetStreamHandler is
// called will always contain the protocol being added, but due to
// how libp2p works, the responses may be processed in reverse
// order, and the set of protocols from the initial message will
// override the set of protocols in the second mesage (from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: typo

Suggested change
// override the set of protocols in the second mesage (from
// override the set of protocols in the second message (from

// SetStreamHandler). In this case, the peer will have a wrong
// idea about this node's protocols, and the test may fail.
mesh, err := mocknet.FullMeshLinked(5)
Comment on lines +47 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting these comments as a doc on the FullMeshLinked function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FullMeshLinked is a part of libp2p

require.NoError(t, err)
proto := "test"
request := []byte("test request")
Expand Down Expand Up @@ -121,6 +144,10 @@ func TestServer(t *testing.T) {
eg.Wait()
})

// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

t.Run("ReceiveMessage", func(t *testing.T) {
n := srvs[0].NumAcceptedRequests()
srvID := mesh.Hosts()[1].ID()
Expand Down Expand Up @@ -212,7 +239,7 @@ func TestServer(t *testing.T) {
}

func Test_Queued(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -245,6 +272,8 @@ func Test_Queued(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

var reqEq errgroup.Group
for i := 0; i < queueSize; i++ { // fill the queue with requests
reqEq.Go(func() error {
Expand All @@ -266,7 +295,7 @@ func Test_Queued(t *testing.T) {
}

func Test_RequestInterval(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -295,6 +324,7 @@ func Test_RequestInterval(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

start := time.Now()
for i := 0; i < maxReq; i++ { // fill the interval with requests (bursts up to maxReq are allowed)
Expand Down
38 changes: 37 additions & 1 deletion p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -75,6 +77,12 @@
}
}

func WithIDService(idService identify.IDService) Opt {
return func(fh *Host) {
fh.idService = idService
}
}

// Host is a convenience wrapper for all p2p related functionality required to run
// a full spacemesh node.
type Host struct {
Expand Down Expand Up @@ -112,7 +120,8 @@
value network.Reachability
}

ping *Ping
ping *Ping
idService identify.IDService
}

// Upgrade creates Host instance from host.Host.
Expand All @@ -128,6 +137,17 @@
for _, opt := range opts {
opt(fh)
}
if fh.idService == nil {
// If no IDService is provided, which may be the case in the tests,
// we can try to get it from the host, assuming it's a *basichost.BasicHost.
// *basichost.BasicHost is expected when libp2p mocknet is being used
// instead of libp2p.New().
if bh, ok := h.(*basichost.BasicHost); ok {
fh.idService = bh.IDService()
} else {
return nil, errors.New("no IDService provided")
}

Check warning on line 149 in p2p/upgrade.go

View check run for this annotation

Codecov / codecov/patch

p2p/upgrade.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}
cfg := fh.cfg
bootnodes, err := parseIntoAddr(fh.cfg.Bootnodes)
if err != nil {
Expand Down Expand Up @@ -504,3 +524,19 @@
func (fh *Host) PeerInfo() peerinfo.PeerInfo {
return fh.peerInfo
}

// Identify ensures that the given peer is identified via libp2p identify protocol.
// Identification is initiated after connecting to the peer, and the set of protocols for
// the peer in the ProtoBook is not guaranteed to be correct until identification
// finishes.
// Note that the set of the protocols in the ProtoBook for a particular peer may also
// change via a push identity notification when the peer adds a new handler via
// SetStreamHandler (e.g. sets up a new Server).
func (fh *Host) Identify(p peer.ID) {
for _, c := range fh.Network().ConnsToPeer(p) {
// IDService.IdentifyConn is a no-op if the connection is already
// identified, but otherwise we need to wait for identification to finish
// to have proper set of protocols.
fh.idService.IdentifyConn(c)
}
}
15 changes: 5 additions & 10 deletions sync2/dbset/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func runSync(
cfg rangesync.RangeSetReconcilerConfig,
) {
log := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
proto := "itest"
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
Expand Down Expand Up @@ -210,15 +211,9 @@ func runSync(
return srv.Run(ctx)
})

// Wait for the server to activate
require.Eventually(t, func() bool {
for _, h := range mesh.Hosts() {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

startTimer(t)
pssB := rangesync.NewPairwiseSetSyncerInternal(
Expand Down
7 changes: 6 additions & 1 deletion sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestP2P(t *testing.T) {
maxDepth = 24
)
logger := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(numNodes)
// Do not connect immediately.
// See Test_GetAtxsLimiting in fetch package for details.
mesh, err := mocknet.FullMeshLinked(numNodes)
require.NoError(t, err)
hs := make([]*sync2.P2PHashSync, numNodes)
initialSet := make([]rangesync.KeyBytes, numHashes)
Expand Down Expand Up @@ -120,6 +122,9 @@ func TestP2P(t *testing.T) {
require.NoError(t, err)
require.NoError(t, hs[n].Load())
require.False(t, hs[n].Synced())
}
require.NoError(t, mesh.ConnectAllButSelf())
for n := range hs {
hs[n].Start()
}

Expand Down
6 changes: 5 additions & 1 deletion sync2/rangesync/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func makeFakeDispHandler(n int) rangesync.Handler {
}

func TestDispatcher(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

d := rangesync.NewDispatcher(zaptest.NewLogger(t))
Expand All @@ -48,6 +49,9 @@ func TestDispatcher(t *testing.T) {
srvPeerID := mesh.Hosts()[0].ID()

c := server.New(mesh.Hosts()[1], proto, d.Dispatch, opts...)
// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())
for _, tt := range []struct {
name string
want int
Expand Down
14 changes: 5 additions & 9 deletions sync2/rangesync/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func fakeRequesterGetter(t *testing.T) getRequesterFunc {
}

func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(tb, err)
proto := "itest"
opts := []server.Opt{
Expand All @@ -85,14 +86,9 @@ func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
return server.New(mesh.Hosts()[0], proto, handler, opts...), mesh.Hosts()[0].ID()
}
s := server.New(mesh.Hosts()[1], proto, handler, opts...)
require.Eventually(tb, func() bool {
for _, h := range mesh.Hosts()[0:] {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(tb, mesh.ConnectAllButSelf())
return s, mesh.Hosts()[1].ID()
}
}
Expand Down
Loading
Loading