Skip to content

Commit

Permalink
Merge branch 'fetch-active-set' into test-activeset-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Dec 20, 2023
2 parents f84f882 + f8001e3 commit 5d30af2
Show file tree
Hide file tree
Showing 40 changed files with 2,501 additions and 301 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ for more information on how to configure the node to work with the PoST service.
query rewards by smesherID. Additionally, it does not re-index old data. Rewards will contain smesherID going forward,
but to refresh data for all rewards, a node will have to delete its database and resync from genesis.

* [#5329](https://github.com/spacemeshos/go-spacemesh/pull/5329) P2P decentralization improvements. Added support for QUIC
transport and DHT routing discovery for finding peers and relays. Also, added the `ping-peers` feature which is useful
during connectivity troubleshooting. `static-relays` feature can be used to provide a static list of circuit v2 relays
nodes when automatic relay discovery is not desired. All of the relay server resource settings are now configurable. Most
of the new functionality is disabled by default unless explicitly enabled in the config via `enable-routing-discovery`,
`routing-discovery-advertise`, `enable-quic-transport`, `static-relays` and `ping-peers` options in the `p2p` config
section. The non-conditional changes include values/provides support on all of the nodes, which will enable DHT to
function efficiently for routing discovery.

* [#5384](https://github.com/spacemeshos/go-spacemesh/pull/5384) to improve network stability and performance allow the
active set to be set in advance for an epoch. This allows the network to start consensus on the first layer of an epoch.

## Release v1.2.9

### Improvements
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,49 @@ as on UNIX-based systems.
- This is a great way to get a feel for the protocol and the platform and to start hacking on Spacemesh.
- Follow the steps in our [Local Testnet Guide](https://testnet.spacemesh.io/#/README)
### Improved decentralization and P2P diagnostic features
**WARNING! THIS IS EXPERIMENTAL FUNCTIONALITY, USE WITH CARE!**
In order to make the p2p network more decentralized, the following options are provided:
- `"enable-routing-discovery": true`: enables routing discovery for finding new peers, including those behind NAT, ans
also for discovering relay nodes which are used for NAT hole punching. Note that hole punching can be done when both
ends of the connection are behind an endpoint-independent ("cone") NAT.
- `"routing-discovery-advertise": true` advertises this node for discovery by other peers, even if it is behind NAT.
- `"enable-quic-transport": true`: enables QUIC transport which, together with TCP transport, heightens the changes of
successful NAT hole punching.
- `"enable-tcp-transport": false` disables TCP transport. This option is intended to be used for debugging purposes
only!
- `"static-relays": ["/dns4/relay.example.com/udp/5000/quic-v1/p2p/...", ...]` provides a static list of relay nodes for
use for NAT hole punching in case of routing discovery based relay search is not to be used.
- `"ping-peers": ["p2p_id_1", "p2p_id_2", ...]` runs P2P ping against the specified peers, logging the results.
For the purpose of debugging P2P connectivity issues, the following command can also be used:
```console
$ grpcurl -plaintext 127.0.0.1:9093 spacemesh.v1.DebugService.NetworkInfo
{
"id": "12D3Koo...",
"listenAddresses": [
"/ip4/0.0.0.0/tcp/50212",
"/ip4/0.0.0.0/udp/59458/quic-v1",
"/p2p-circuit"
],
"knownAddresses": [
"/ip4/127.0.0.1/tcp/50212",
"/ip4/127.0.0.1/udp/59458/quic-v1",
"/ip4/192.168.33.5/tcp/50212",
"/ip4/192.168.33.5/udp/59458/quic-v1",
"/ip4/.../tcp/37670/p2p/12D3Koo.../p2p-circuit",
"/ip4/.../udp/37659/quic-v1/p2p/12D3Koo.../p2p-circuit",
"/ip4/.../tcp/31960/p2p/12D3Koo.../p2p-circuit",
"/ip4/.../udp/33377/quic-v1/p2p/12D3Koo.../p2p-circuit"
],
"natTypeUdp": "Cone",
"natTypeTcp": "Cone",
"reachability": "Private"
}
```
#### Next Steps
- Please visit our [wiki](https://github.com/spacemeshos/go-spacemesh/wiki)
Expand Down
42 changes: 40 additions & 2 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type Handler struct {
mu sync.Mutex
fetcher system.Fetcher
poetCfg PoetConfig

// inProgress map gathers ATXs that are currently being processed.
// It's used to avoid processing the same ATX twice.
inProgress map[types.ATXID][]chan error
inProgressMu sync.Mutex
}

// NewHandler returns a data handler for ATX.
Expand Down Expand Up @@ -83,6 +88,8 @@ func NewHandler(
beacon: beacon,
tortoise: tortoise,
poetCfg: poetCfg,

inProgress: make(map[types.ATXID][]chan error),
}
}

Expand Down Expand Up @@ -502,12 +509,43 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
if err := codec.Decode(msg, &atx); err != nil {
return fmt.Errorf("%w: %w", errMalformedData, err)
}

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
}

// Check if processing is already in progress
h.inProgressMu.Lock()
if sub, ok := h.inProgress[atx.ID()]; ok {
ch := make(chan error, 1)
h.inProgress[atx.ID()] = append(sub, ch)
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Debug("atx is already being processed. waiting for result", atx.ID())
select {
case err := <-ch:
h.log.WithContext(ctx).With().Debug("atx processed in other task", atx.ID(), log.Err(err))
return err
case <-ctx.Done():
return ctx.Err()
}
}

h.inProgress[atx.ID()] = []chan error{}
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Info("handling incoming atx", atx.ID(), log.Int("size", len(msg)))

err := h.processAtx(ctx, expHash, peer, atx)
h.inProgressMu.Lock()
defer h.inProgressMu.Unlock()
for _, ch := range h.inProgress[atx.ID()] {
ch <- err
close(ch)
}
delete(h.inProgress, atx.ID())
return err
}

func (h *Handler) processAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, atx types.ActivationTx) error {
if !h.edVerifier.Verify(signing.ATX, atx.SmesherID, atx.SignedBytes(), atx.Signature) {
return fmt.Errorf("failed to verify atx signature: %w", errMalformedData)
}
Expand Down Expand Up @@ -544,7 +582,7 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("cannot process atx %v: %w", atx.ShortString(), err)
}
events.ReportNewActivation(vAtx)
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx), log.Int("size", len(msg)))
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx))
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/atxsdata"
"github.com/spacemeshos/go-spacemesh/codec"
Expand Down Expand Up @@ -1211,6 +1212,74 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), "", secondData))
}

func TestHandler_HandleParallelGossipAtx(t *testing.T) {
goldenATXID := types.ATXID{2, 3, 4}
atxHdlr := newTestHandler(t, goldenATXID)

sig, err := signing.NewEdSigner()
require.NoError(t, err)
nodeID := sig.NodeID()
nipost := newNIPostWithChallenge(t, types.HexToHash32("0x3333"), []byte{0xba, 0xbe})
vrfNonce := types.VRFPostIndex(12345)
atx := &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
PrevATXID: types.EmptyATXID,
PositioningATX: goldenATXID,
CommitmentATX: &goldenATXID,
InitialPost: nipost.Post,
},
Coinbase: types.Address{2, 3, 4},
NumUnits: 2,
NIPost: nipost,
NodeID: &nodeID,
VRFNonce: &vrfNonce,
},
SmesherID: nodeID,
}
atx.Signature = sig.Sign(signing.ATX, atx.SignedBytes())
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
_, err = atx.Verify(0, 2)
require.NoError(t, err)

atxData, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().CurrentLayer().Return(atx.PublishEpoch.FirstLayer())
atxHdlr.mValidator.EXPECT().VRFNonce(nodeID, goldenATXID, &vrfNonce, gomock.Any(), atx.NumUnits)
atxHdlr.mValidator.EXPECT().Post(
gomock.Any(),
atx.SmesherID,
goldenATXID,
atx.InitialPost,
gomock.Any(),
atx.NumUnits,
).DoAndReturn(
func(_ context.Context, _ types.NodeID, _ types.ATXID, _ *types.Post, _ *types.PostMetadata, _ uint32) error {
time.Sleep(100 * time.Millisecond)
return nil
},
)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mValidator.EXPECT().InitialNIPostChallenge(&atx.NIPostChallenge, gomock.Any(), goldenATXID)
atxHdlr.mValidator.EXPECT().PositioningAtx(goldenATXID, gomock.Any(), goldenATXID, atx.PublishEpoch)
atxHdlr.mValidator.EXPECT().NIPost(gomock.Any(), nodeID, goldenATXID, atx.NIPost, gomock.Any(), atx.NumUnits)
atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any())
atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any())

var eg errgroup.Group
for i := 0; i < 10; i++ {
eg.Go(func() error {
return atxHdlr.HandleGossipAtx(context.Background(), "", atxData)
})
}

require.NoError(t, eg.Wait())
}

func TestHandler_HandleSyncedAtx(t *testing.T) {
// Arrange
goldenATXID := types.ATXID{2, 3, 4}
Expand Down
46 changes: 42 additions & 4 deletions api/grpcserver/debug_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package grpcserver
import (
"context"
"fmt"
"sort"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/libp2p/go-libp2p/core/network"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -24,7 +26,7 @@ import (
type DebugService struct {
db *sql.Database
conState conservativeState
identity networkIdentity
netInfo networkInfo
oracle oracle
}

Expand All @@ -43,11 +45,11 @@ func (d DebugService) String() string {
}

// NewDebugService creates a new grpc service using config data.
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService {
func NewDebugService(db *sql.Database, conState conservativeState, host networkInfo, oracle oracle) *DebugService {
return &DebugService{
db: db,
conState: conState,
identity: host,
netInfo: host,
oracle: oracle,
}
}
Expand Down Expand Up @@ -91,7 +93,21 @@ func (d DebugService) Accounts(ctx context.Context, in *pb.AccountsRequest) (*pb

// NetworkInfo query provides NetworkInfoResponse.
func (d DebugService) NetworkInfo(ctx context.Context, _ *emptypb.Empty) (*pb.NetworkInfoResponse, error) {
return &pb.NetworkInfoResponse{Id: d.identity.ID().String()}, nil
resp := &pb.NetworkInfoResponse{Id: d.netInfo.ID().String()}
for _, a := range d.netInfo.ListenAddresses() {
resp.ListenAddresses = append(resp.ListenAddresses, a.String())
}
sort.Strings(resp.ListenAddresses)
for _, a := range d.netInfo.KnownAddresses() {
resp.KnownAddresses = append(resp.KnownAddresses, a.String())
}
sort.Strings(resp.KnownAddresses)
udpNATType, tcpNATType := d.netInfo.NATDeviceType()
resp.NatTypeUdp = convertNATType(udpNATType)
resp.NatTypeTcp = convertNATType(tcpNATType)
resp.Reachability = convertReachability(d.netInfo.Reachability())
resp.DhtServerEnabled = d.netInfo.DHTServerEnabled()
return resp, nil
}

// ActiveSet query provides hare active set for the specified epoch.
Expand Down Expand Up @@ -158,3 +174,25 @@ func castEventProposal(ev *events.EventProposal) *pb.Proposal {
}
return proposal
}

func convertNATType(natType network.NATDeviceType) pb.NetworkInfoResponse_NATType {
switch natType {
case network.NATDeviceTypeCone:
return pb.NetworkInfoResponse_Cone
case network.NATDeviceTypeSymmetric:
return pb.NetworkInfoResponse_Symmetric
default:
return pb.NetworkInfoResponse_NATTypeUnknown
}
}

func convertReachability(r network.Reachability) pb.NetworkInfoResponse_Reachability {
switch r {
case network.ReachabilityPublic:
return pb.NetworkInfoResponse_Public
case network.ReachabilityPrivate:
return pb.NetworkInfoResponse_Private
default:
return pb.NetworkInfoResponse_ReachabilityUnknown
}
}
36 changes: 33 additions & 3 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spacemeshos/merkle-tree"
"github.com/spacemeshos/poet/shared"
Expand Down Expand Up @@ -2043,10 +2045,10 @@ func TestMultiService(t *testing.T) {

func TestDebugService(t *testing.T) {
ctrl := gomock.NewController(t)
identity := NewMocknetworkIdentity(ctrl)
netInfo := NewMocknetworkInfo(ctrl)
mOracle := NewMockoracle(ctrl)
db := sql.InMemory()
svc := NewDebugService(db, conStateAPI, identity, mOracle)
svc := NewDebugService(db, conStateAPI, netInfo, mOracle)
cfg, cleanup := launchServer(t, svc)
t.Cleanup(cleanup)

Expand Down Expand Up @@ -2097,13 +2099,33 @@ func TestDebugService(t *testing.T) {

t.Run("networkID", func(t *testing.T) {
id := p2p.Peer("test")
identity.EXPECT().ID().Return(id)
netInfo.EXPECT().ID().Return(id)
netInfo.EXPECT().ListenAddresses().Return([]ma.Multiaddr{
mustParseMultiaddr("/ip4/0.0.0.0/tcp/5000"),
mustParseMultiaddr("/ip4/0.0.0.0/udp/5001/quic-v1"),
})
netInfo.EXPECT().KnownAddresses().Return([]ma.Multiaddr{
mustParseMultiaddr("/ip4/10.36.0.221/tcp/5000"),
mustParseMultiaddr("/ip4/10.36.0.221/udp/5001/quic-v1"),
})
netInfo.EXPECT().NATDeviceType().Return(network.NATDeviceTypeCone, network.NATDeviceTypeSymmetric)
netInfo.EXPECT().Reachability().Return(network.ReachabilityPrivate)
netInfo.EXPECT().DHTServerEnabled().Return(true)

response, err := c.NetworkInfo(context.Background(), &emptypb.Empty{})
require.NoError(t, err)
require.NotNil(t, response)
require.Equal(t, id.String(), response.Id)
require.Equal(t, []string{"/ip4/0.0.0.0/tcp/5000", "/ip4/0.0.0.0/udp/5001/quic-v1"},
response.ListenAddresses)
require.Equal(t, []string{"/ip4/10.36.0.221/tcp/5000", "/ip4/10.36.0.221/udp/5001/quic-v1"},
response.KnownAddresses)
require.Equal(t, pb.NetworkInfoResponse_Cone, response.NatTypeUdp)
require.Equal(t, pb.NetworkInfoResponse_Symmetric, response.NatTypeTcp)
require.Equal(t, pb.NetworkInfoResponse_Private, response.Reachability)
require.True(t, response.DhtServerEnabled)
})

t.Run("ActiveSet", func(t *testing.T) {
epoch := types.EpochID(3)
activeSet := types.RandomActiveSet(11)
Expand Down Expand Up @@ -2445,3 +2467,11 @@ func TestMeshService_EpochStream(t *testing.T) {
}
require.ElementsMatch(t, expected, got)
}

func mustParseMultiaddr(s string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(s)
if err != nil {
panic("can't parse multiaddr: " + err.Error())
}
return maddr
}
Loading

0 comments on commit 5d30af2

Please sign in to comment.