From 133bbe584a89959456e25c5957faad464a6ac236 Mon Sep 17 00:00:00 2001 From: Matthias Fasching <5011972+fasmat@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:53:30 +0000 Subject: [PATCH] Add mTLS grpcserver (#5154) ## Motivation Closes #5131 do not merge before https://github.com/spacemeshos/api/pull/268 and https://github.com/spacemeshos/post/pull/245 ## Changes - setup for gRPC servers has been moved from node startup into the `grpcserver` package - `NewPublic`, `NewPrivate` and `NewTLS` create servers for the given purposes based on the configuration passed to them - replaced more instances of `go-spacemesh/log` with `zap` ## Test Plan - existing tests pass - TODO: add new tests for mTLS connection ## TODO - [x] Explain motivation or link existing issue(s) - [x] Test changes and document test plan - [x] Update documentation as needed - [x] Update [changelog](../CHANGELOG.md) as needed --- CHANGELOG.md | 6 +- Makefile-libs.Inc | 2 +- README.md | 50 ++ activation/e2e/nipost_test.go | 10 +- activation/post_supervisor.go | 14 +- api/grpcserver/activation_service.go | 15 +- api/grpcserver/admin_service.go | 15 +- api/grpcserver/admin_service_test.go | 14 +- api/grpcserver/config.go | 10 +- api/grpcserver/debug_service.go | 15 +- api/grpcserver/globalstate_service.go | 15 +- api/grpcserver/globalstate_service_test.go | 364 ++++++++++++++ api/grpcserver/grpc.go | 146 ++++-- api/grpcserver/grpcserver_test.go | 536 ++------------------- api/grpcserver/grpcserver_tls_test.go | 46 ++ api/grpcserver/http_server.go | 51 +- api/grpcserver/http_server_test.go | 91 ++++ api/grpcserver/mesh_service.go | 15 +- api/grpcserver/mesh_service_test.go | 4 +- api/grpcserver/node_service.go | 15 +- api/grpcserver/node_service_test.go | 153 ++++++ api/grpcserver/post_client.go | 21 +- api/grpcserver/post_service.go | 16 +- api/grpcserver/post_service_test.go | 71 +++ api/grpcserver/smesher_service.go | 15 +- api/grpcserver/testdata/README.md | 38 ++ api/grpcserver/testdata/ca.crt | 34 ++ api/grpcserver/testdata/ca.key | 52 ++ api/grpcserver/testdata/client.crt | 34 ++ api/grpcserver/testdata/client.key | 52 ++ api/grpcserver/testdata/domains.ext | 6 + api/grpcserver/testdata/server.crt | 34 ++ api/grpcserver/testdata/server.key | 52 ++ api/grpcserver/transaction_service.go | 15 +- api/grpcserver/transaction_service_test.go | 6 +- cmd/bootstrapper/generator_test.go | 23 +- cmd/bootstrapper/server_test.go | 5 +- cmd/root.go | 6 + config/config.go | 5 +- config/presets/fastnet.go | 4 +- config/presets/presets.go | 3 +- config/presets/standalone.go | 2 +- go.mod | 22 +- go.sum | 40 +- node/bad_peer_test.go | 12 +- node/node.go | 220 ++++----- node/node_test.go | 3 +- node/{util_test.go => test_network.go} | 2 +- 48 files changed, 1600 insertions(+), 780 deletions(-) create mode 100644 api/grpcserver/globalstate_service_test.go create mode 100644 api/grpcserver/grpcserver_tls_test.go create mode 100644 api/grpcserver/http_server_test.go create mode 100644 api/grpcserver/node_service_test.go create mode 100644 api/grpcserver/testdata/README.md create mode 100644 api/grpcserver/testdata/ca.crt create mode 100644 api/grpcserver/testdata/ca.key create mode 100644 api/grpcserver/testdata/client.crt create mode 100644 api/grpcserver/testdata/client.key create mode 100644 api/grpcserver/testdata/domains.ext create mode 100644 api/grpcserver/testdata/server.crt create mode 100644 api/grpcserver/testdata/server.key rename node/{util_test.go => test_network.go} (97%) diff --git a/CHANGELOG.md b/CHANGELOG.md index dbe6b534cea..0bfbffaa1f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,10 +18,12 @@ See [RELEASE](./RELEASE.md) for workflow instructions. > 2023-10-02T15:28:14.002+0200 WARN fd68b.sync mesh failed to process layer from sync {"node_id": "fd68b9397572556c2f329f3e5af2faf23aef85dbbbb7e38447fae2f4ef38899f", "module": "sync", "sessionId": "29422935-68d6-47d1-87a8-02293aa181f3", "layer_id": 23104, "errmsg": "requested layer 8063 is before evicted 13102", "name": "sync"} -* [#5091](https://github.com/spacemeshos/go-spacemesh/pull/5091) First stage of separating PoST from the node into its own service. +* [#5091](https://github.com/spacemeshos/go-spacemesh/pull/5091) Separating PoST from the node into its own service. * [#5061](https://github.com/spacemeshos/go-spacemesh/pull/5061) Proof generation is now done via a dedicated service instead of the node. +* [#5154](https://github.com/spacemeshos/go-spacemesh/pull/5154) Enable TLS connections between node and PoST service. - Operating a node doesn't require any changes at the moment. The service will be automatically started by the node if needed and will be stopped when the node is stopped. + PoST proofs are now done via a dedicated process / service that the node communicates with via gRPC. Smapp users can continue to smesh as they used to. The node will + automatically start the PoST service when it starts and will shut it down when it shuts down. * [#5138](https://github.com/spacemeshos/go-spacemesh/pull/5138) Bump poet to v0.9.7 diff --git a/Makefile-libs.Inc b/Makefile-libs.Inc index 1ebee8716ca..f3fd8f3b384 100644 --- a/Makefile-libs.Inc +++ b/Makefile-libs.Inc @@ -50,7 +50,7 @@ else endif endif -POSTRS_SETUP_REV = 0.5.0-alpha2 +POSTRS_SETUP_REV = 0.5.0 POSTRS_SETUP_ZIP = libpost-$(platform)-v$(POSTRS_SETUP_REV).zip POSTRS_SETUP_URL_ZIP ?= https://github.com/spacemeshos/post-rs/releases/download/v$(POSTRS_SETUP_REV)/$(POSTRS_SETUP_ZIP) POSTRS_PROFILER_ZIP = profiler-$(platform)-v$(POSTRS_SETUP_REV).zip diff --git a/README.md b/README.md index d4973738e60..732d7b934ab 100644 --- a/README.md +++ b/README.md @@ -235,6 +235,56 @@ on Windows you can use Intel OpenAPI: choco install opencl-intel-cpu-runtime ``` +#### Using a remote machine as provider for PoST proofs + +To disable the internal PoST service and disable smeshing on your node you can use the following config: + +```json +"smeshing": { + "smeshing-start": false, +} +``` + +or use the `--smeshing-start=false` flag. This will disable smeshing on your node causing it not generate any PoST proofs until a remote post +service connects. + +By default the node listens for the PoST service on `grpc-private-listener` (defaults to 127.0.0.1:9093). This endpoint does not require authentication and +should only be accessible from the same machine. If you want to allow connections from post services on other hosts to your node, you should do so via the +`grpc-tls-listener` (defaults to 0.0.0.0:9094) and setup TLS for the connection. + +This is useful for example if you want to run a node on a cloud provider with fewer resources and run PoST on a local machine with more resources. The post +service only needs to be online for the initial proof (i.e. when joining the network for the first time) and during the cyclegap in every epoch. + +To setup TLS-secured public connections the API config has been extended with the following options: + +```json +"api": { + "grpc-private-services": ["admin", "smesher"], // remove "post" from the list of services only exposed to the local machine + "grpc-tls-services": ["post"], // add "post" to the list of services that should be exposed via TLS + "grpc-tls-listener": "0.0.0.0:9094", // listen address for TLS connections + "grpc-tls-ca-cert": "/path/to/ca.pem", // CA certificate that signed the node's and the PoST service's certificates + "grpc-tls-cert": "/path/to/cert.pem", // certificate for the node + "grpc-tls-key": "/path/to/key.pem", // private key for the node +} +``` + +Ensure that remote PoST services are setup to connect to your node via TLS, that they trust your node's certificate and use a certificate that is signed by the +same CA as your node's certificate. + +The local (supervised) PoST service can also be configured to connect to your node via TLS if needed. The following config options are available: + +```json +"post-service": { + "post-opts-post-service": "/path/to/service-binary", // defaults to service in the same directory as the node binary + "post-opts-node-address": "http://domain:port", // defaults to 127.0.0.1:9093 - the same default value as for "grpc-private-listener" + + // the following settings are mandatory when connecting to the node via TLS - when connecting via the private listener they are not needed + "post-opts-tls-ca-cert": "/path/to/ca.pem", // CA certificate that signed the node's and the PoST service's certificates + "post-opts-tls-cert": "/path/to/cert.pem", // certificate for the PoST service + "post-opts-tls-key": "/path/to/key.pem", // private key for the PoST service +} +``` + --- ### Testing diff --git a/activation/e2e/nipost_test.go b/activation/e2e/nipost_test.go index 2b3abf4cb45..6f3dc9bc018 100644 --- a/activation/e2e/nipost_test.go +++ b/activation/e2e/nipost_test.go @@ -78,19 +78,19 @@ func launchServer(tb testing.TB, services ...grpcserver.ServiceAPI) (grpcserver. cfg := grpcserver.DefaultTestConfig() // run on random ports - grpcService := grpcserver.New("127.0.0.1:0", logtest.New(tb).Named("grpc")) + server := grpcserver.New("127.0.0.1:0", zaptest.NewLogger(tb).Named("grpc"), cfg) // attach services for _, svc := range services { - svc.RegisterService(grpcService) + svc.RegisterService(server.GrpcServer) } - require.NoError(tb, grpcService.Start()) + require.NoError(tb, server.Start()) // update config with bound addresses - cfg.PublicListener = grpcService.BoundAddress + cfg.PublicListener = server.BoundAddress - return cfg, func() { assert.NoError(tb, grpcService.Close()) } + return cfg, func() { assert.NoError(tb, server.Close()) } } func initPost(tb testing.TB, logger *zap.Logger, mgr *activation.PostSetupManager, opts activation.PostSetupOpts) { diff --git a/activation/post_supervisor.go b/activation/post_supervisor.go index 3365c1f0505..acbb2baf44a 100644 --- a/activation/post_supervisor.go +++ b/activation/post_supervisor.go @@ -47,8 +47,11 @@ func DefaultTestPostServiceConfig() PostSupervisorConfig { type PostSupervisorConfig struct { PostServiceCmd string `mapstructure:"post-opts-post-service"` + NodeAddress string `mapstructure:"post-opts-node-address"` - NodeAddress string `mapstructure:"post-opts-node-address"` + CACert string `mapstructure:"post-opts-ca-cert"` + Cert string `mapstructure:"post-opts-cert"` + Key string `mapstructure:"post-opts-key"` } // PostSupervisor manages a local post service. @@ -153,6 +156,15 @@ func (ps *PostSupervisor) runCmd(ctx context.Context, cmdCfg PostSupervisorConfi "--nonces", strconv.FormatUint(uint64(provingOpts.Nonces), 10), "--randomx-mode", provingOpts.RandomXMode.String(), } + if cmdCfg.CACert != "" { + args = append(args, "--ca-cert", cmdCfg.CACert) + } + if cmdCfg.Cert != "" { + args = append(args, "--cert", cmdCfg.Cert) + } + if cmdCfg.Key != "" { + args = append(args, "--key", cmdCfg.Key) + } cmd := exec.CommandContext( ctx, diff --git a/api/grpcserver/activation_service.go b/api/grpcserver/activation_service.go index 042a6460341..c8a0afbd4d9 100644 --- a/api/grpcserver/activation_service.go +++ b/api/grpcserver/activation_service.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -30,8 +32,17 @@ func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activ } // RegisterService implements ServiceAPI. -func (s *activationService) RegisterService(server *Server) { - pb.RegisterActivationServiceServer(server.GrpcServer, s) +func (s *activationService) RegisterService(server *grpc.Server) { + pb.RegisterActivationServiceServer(server, s) +} + +func (s *activationService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterActivationServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the service name. +func (s *activationService) String() string { + return "ActivationService" } // Get implements v1.ActivationServiceServer. diff --git a/api/grpcserver/admin_service.go b/api/grpcserver/admin_service.go index f242bda5d44..0b918a922ec 100644 --- a/api/grpcserver/admin_service.go +++ b/api/grpcserver/admin_service.go @@ -9,8 +9,10 @@ import ( "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/spf13/afero" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -53,8 +55,17 @@ func NewAdminService(db *sql.Database, dataDir string, p peers) *AdminService { } // RegisterService registers this service with a grpc server instance. -func (a AdminService) RegisterService(server *Server) { - pb.RegisterAdminServiceServer(server.GrpcServer, a) +func (a AdminService) RegisterService(server *grpc.Server) { + pb.RegisterAdminServiceServer(server, a) +} + +func (s AdminService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterAdminServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (a AdminService) String() string { + return "AdminService" } func (a AdminService) CheckpointStream(req *pb.CheckpointStreamRequest, stream pb.AdminService_CheckpointStreamServer) error { diff --git a/api/grpcserver/admin_service_test.go b/api/grpcserver/admin_service_test.go index 109fb3c2144..2df5b8ff0dc 100644 --- a/api/grpcserver/admin_service_test.go +++ b/api/grpcserver/admin_service_test.go @@ -19,7 +19,7 @@ import ( const snapshot uint32 = 15 -func newatx(tb testing.TB, db *sql.Database) { +func newAtx(tb testing.TB, db *sql.Database) { atx := &types.ActivationTx{ InnerActivationTx: types.InnerActivationTx{ NIPostChallenge: types.NIPostChallenge{ @@ -32,8 +32,8 @@ func newatx(tb testing.TB, db *sql.Database) { }, } atx.SetID(types.RandomATXID()) - vrfnonce := types.VRFPostIndex(11) - atx.VRFNonce = &vrfnonce + vrfNonce := types.VRFPostIndex(11) + atx.VRFNonce = &vrfNonce atx.SmesherID = types.BytesToNodeID(types.RandomBytes(20)) atx.NodeID = &atx.SmesherID atx.SetEffectiveNumUnits(atx.NumUnits) @@ -45,7 +45,7 @@ func newatx(tb testing.TB, db *sql.Database) { func createMesh(tb testing.TB, db *sql.Database) { for i := 0; i < 10; i++ { - newatx(tb, db) + newAtx(tb, db) } acct := &types.Account{ Layer: types.LayerID(0), Address: types.Address{1, 1}, NextNonce: 1, Balance: 1300, TemplateAddress: &types.Address{2}, State: []byte("state10"), @@ -62,7 +62,7 @@ func TestAdminService_Checkpoint(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewAdminServiceClient(conn) stream, err := c.CheckpointStream(ctx, &pb.CheckpointStreamRequest{SnapshotLayer: snapshot}) @@ -98,7 +98,7 @@ func TestAdminService_CheckpointError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewAdminServiceClient(conn) stream, err := c.CheckpointStream(ctx, &pb.CheckpointStreamRequest{SnapshotLayer: snapshot}) @@ -118,7 +118,7 @@ func TestAdminService_Recovery(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewAdminServiceClient(conn) _, err := c.Recover(ctx, &pb.RecoverRequest{}) diff --git a/api/grpcserver/config.go b/api/grpcserver/config.go index daa07c75405..c08d63beff7 100644 --- a/api/grpcserver/config.go +++ b/api/grpcserver/config.go @@ -10,6 +10,11 @@ type Config struct { PublicListener string `mapstructure:"grpc-public-listener"` PrivateServices []Service `mapstructure:"grpc-private-services"` PrivateListener string `mapstructure:"grpc-private-listener"` + TLSServices []Service `mapstructure:"grpc-tls-services"` + TLSListener string `mapstructure:"grpc-tls-listener"` + TLSCACert string `mapstructure:"gprc-tls-ca-cert"` + TLSCert string `mapstructure:"grpc-tls-cert"` + TLSKey string `mapstructure:"grpc-tls-key"` GrpcSendMsgSize int `mapstructure:"grpc-send-msg-size"` GrpcRecvMsgSize int `mapstructure:"grpc-recv-msg-size"` JSONListener string `mapstructure:"grpc-json-listener"` @@ -36,8 +41,10 @@ func DefaultConfig() Config { return Config{ PublicServices: []Service{Debug, GlobalState, Mesh, Transaction, Node, Activation}, PublicListener: "0.0.0.0:9092", - PrivateServices: []Service{Admin, Smesher, Post}, // TODO(mafa): move from private to public with authentication (probably new service category) + PrivateServices: []Service{Admin, Smesher, Post}, PrivateListener: "127.0.0.1:9093", + TLSServices: []Service{}, + TLSListener: "0.0.0.0:9094", JSONListener: "", GrpcSendMsgSize: 1024 * 1024 * 10, GrpcRecvMsgSize: 1024 * 1024 * 10, @@ -51,5 +58,6 @@ func DefaultTestConfig() Config { conf.PublicListener = "127.0.0.1:0" conf.PrivateListener = "127.0.0.1:0" conf.JSONListener = "127.0.0.1:0" + conf.TLSListener = "127.0.0.1:0" return conf } diff --git a/api/grpcserver/debug_service.go b/api/grpcserver/debug_service.go index 349e7995d5e..48110b7181a 100644 --- a/api/grpcserver/debug_service.go +++ b/api/grpcserver/debug_service.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -27,8 +29,17 @@ type DebugService struct { } // RegisterService registers this service with a grpc server instance. -func (d DebugService) RegisterService(server *Server) { - pb.RegisterDebugServiceServer(server.GrpcServer, d) +func (d DebugService) RegisterService(server *grpc.Server) { + pb.RegisterDebugServiceServer(server, d) +} + +func (s DebugService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterDebugServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (d DebugService) String() string { + return "DebugService" } // NewDebugService creates a new grpc service using config data. diff --git a/api/grpcserver/globalstate_service.go b/api/grpcserver/globalstate_service.go index 12602b58e23..8ea8b781cea 100644 --- a/api/grpcserver/globalstate_service.go +++ b/api/grpcserver/globalstate_service.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -22,8 +24,17 @@ type GlobalStateService struct { } // RegisterService registers this service with a grpc server instance. -func (s GlobalStateService) RegisterService(server *Server) { - pb.RegisterGlobalStateServiceServer(server.GrpcServer, s) +func (s GlobalStateService) RegisterService(server *grpc.Server) { + pb.RegisterGlobalStateServiceServer(server, s) +} + +func (s GlobalStateService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterGlobalStateServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of the service. +func (s GlobalStateService) String() string { + return "GlobalStateService" } // NewGlobalStateService creates a new grpc service using config data. diff --git a/api/grpcserver/globalstate_service_test.go b/api/grpcserver/globalstate_service_test.go new file mode 100644 index 00000000000..b39cf09683f --- /dev/null +++ b/api/grpcserver/globalstate_service_test.go @@ -0,0 +1,364 @@ +package grpcserver + +import ( + "context" + "math" + "testing" + "time" + + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/spacemeshos/go-spacemesh/common/types" +) + +type globalStateServiceConn struct { + pb.GlobalStateServiceClient + + meshAPI *MockmeshAPI + conStateAPI *MockconservativeState +} + +func setupGlobalStateService(t *testing.T) (*globalStateServiceConn, context.Context) { + ctrl, mockCtx := gomock.WithContext(context.Background(), t) + meshAPI := NewMockmeshAPI(ctrl) + conStateAPI := NewMockconservativeState(ctrl) + svc := NewGlobalStateService(meshAPI, conStateAPI) + cfg, cleanup := launchServer(t, svc) + t.Cleanup(cleanup) + + grpcCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + conn := dialGrpc(grpcCtx, t, cfg) + client := pb.NewGlobalStateServiceClient(conn) + + return &globalStateServiceConn{ + GlobalStateServiceClient: client, + + meshAPI: meshAPI, + conStateAPI: conStateAPI, + }, mockCtx +} + +func TestGlobalStateService(t *testing.T) { + t.Run("GlobalStateHash", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + layerVerified := types.LayerID(8) + c.meshAPI.EXPECT().LatestLayerInState().Return(layerVerified) + stateRoot := types.HexToHash32("11111") + c.conStateAPI.EXPECT().GetStateRoot().Return(stateRoot, nil) + + res, err := c.GlobalStateHash(ctx, &pb.GlobalStateHashRequest{}) + require.NoError(t, err) + require.Equal(t, layerVerified.Uint32(), res.Response.Layer.Number) + require.Equal(t, stateRoot.Bytes(), res.Response.RootHash) + }) + t.Run("Account", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + c.conStateAPI.EXPECT().GetBalance(addr1).Return(accountBalance, nil) + c.conStateAPI.EXPECT().GetNonce(addr1).Return(accountCounter, nil) + c.conStateAPI.EXPECT().GetProjection(addr1).Return(accountCounter+1, accountBalance+1) + + res, err := c.Account(ctx, &pb.AccountRequest{ + AccountId: &pb.AccountId{Address: addr1.String()}, + }) + require.NoError(t, err) + require.Equal(t, addr1.String(), res.AccountWrapper.AccountId.Address) + require.Equal(t, uint64(accountBalance), res.AccountWrapper.StateCurrent.Balance.Value) + require.Equal(t, uint64(accountCounter), res.AccountWrapper.StateCurrent.Counter) + require.Equal(t, uint64(accountBalance+1), res.AccountWrapper.StateProjected.Balance.Value) + require.Equal(t, uint64(accountCounter+1), res.AccountWrapper.StateProjected.Counter) + }) + t.Run("AccountDataQuery_MissingFilter", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + _, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{}) + require.Error(t, err) + require.Contains(t, err.Error(), "`Filter` must be provided") + }) + t.Run("AccountDataQuery_MissingFlags", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + _, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{ + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + }, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "`Filter.AccountMeshDataFlags` must set at least one") + }) + t.Run("AccountDataQuery_BadOffset", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + c.meshAPI.EXPECT().GetRewards(addr1).Return([]*types.Reward{ + { + Layer: layerFirst, + TotalReward: rewardAmount, + LayerReward: rewardAmount, + Coinbase: addr1, + }, + }, nil) + c.conStateAPI.EXPECT().GetBalance(addr1).Return(accountBalance, nil) + c.conStateAPI.EXPECT().GetNonce(addr1).Return(accountCounter, nil) + c.conStateAPI.EXPECT().GetProjection(addr1).Return(accountCounter+1, accountBalance+1) + + res, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{ + MaxResults: uint32(1), + Offset: math.MaxUint32, + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), + }, + }) + // huge offset is not an error, we just expect no results + require.NoError(t, err) + require.Equal(t, uint32(0), res.TotalResults) + require.Equal(t, 0, len(res.AccountItem)) + }) + t.Run("AccountDataQuery_ZeroMaxResults", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + c.meshAPI.EXPECT().GetRewards(addr1).Return([]*types.Reward{ + { + Layer: layerFirst, + TotalReward: rewardAmount, + LayerReward: rewardAmount, + Coinbase: addr1, + }, + }, nil) + c.conStateAPI.EXPECT().GetBalance(addr1).Return(accountBalance, nil) + c.conStateAPI.EXPECT().GetNonce(addr1).Return(accountCounter, nil) + c.conStateAPI.EXPECT().GetProjection(addr1).Return(accountCounter+1, accountBalance+1) + + res, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{ + MaxResults: uint32(0), + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), + }, + }) + // zero maxresults means return everything + require.NoError(t, err) + require.Equal(t, uint32(2), res.TotalResults) + require.Equal(t, 2, len(res.AccountItem)) + }) + t.Run("AccountDataQuery_OneResult", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + c.meshAPI.EXPECT().GetRewards(addr1).Return([]*types.Reward{ + { + Layer: layerFirst, + TotalReward: rewardAmount, + LayerReward: rewardAmount, + Coinbase: addr1, + }, + }, nil) + c.conStateAPI.EXPECT().GetBalance(addr1).Return(accountBalance, nil) + c.conStateAPI.EXPECT().GetNonce(addr1).Return(accountCounter, nil) + c.conStateAPI.EXPECT().GetProjection(addr1).Return(accountCounter+1, accountBalance+1) + + res, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{ + MaxResults: uint32(1), + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), + }, + }) + require.NoError(t, err) + require.Equal(t, uint32(2), res.TotalResults) + require.Equal(t, 1, len(res.AccountItem)) + checkAccountDataQueryItemReward(t, res.AccountItem[0].Datum) + }) + t.Run("AccountDataQuery", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + c.meshAPI.EXPECT().GetRewards(addr1).Return([]*types.Reward{ + { + Layer: layerFirst, + TotalReward: rewardAmount, + LayerReward: rewardAmount, + Coinbase: addr1, + }, + }, nil) + c.conStateAPI.EXPECT().GetBalance(addr1).Return(accountBalance, nil) + c.conStateAPI.EXPECT().GetNonce(addr1).Return(accountCounter, nil) + c.conStateAPI.EXPECT().GetProjection(addr1).Return(accountCounter+1, accountBalance+1) + + res, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{ + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), + }, + }) + require.NoError(t, err) + require.Equal(t, uint32(2), res.TotalResults) + require.Equal(t, 2, len(res.AccountItem)) + + checkAccountDataQueryItemReward(t, res.AccountItem[0].Datum) + checkAccountDataQueryItemAccount(t, res.AccountItem[1].Datum) + }) + t.Run("AppEventStream", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + stream, err := c.AppEventStream(ctx, &pb.AppEventStreamRequest{}) + // We expect to be able to open the stream but for it to fail upon the first request + require.NoError(t, err) + _, err = stream.Recv() + statusCode := status.Code(err) + require.Equal(t, codes.Unimplemented, statusCode) + }) + t.Run("AccountDataStream_emptyAddress", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + stream, err := c.AccountDataStream(ctx, &pb.AccountDataStreamRequest{ + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{}, + AccountDataFlags: uint32( + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), + }, + }) + require.NoError(t, err, "unexpected error opening stream") + + _, err = stream.Recv() + statusCode := status.Code(err) + require.Equal(t, codes.Unknown, statusCode) + }) + t.Run("AccountDataStream_invalidAddress", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + // Just try opening and immediately closing the stream + ctx, cancel := context.WithCancel(ctx) + stream, err := c.AccountDataStream(ctx, &pb.AccountDataStreamRequest{ + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: types.GenerateAddress([]byte{'A'}).String()}, + AccountDataFlags: uint32( + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), + }, + }) + require.NoError(t, err, "unexpected error opening stream") + + cancel() + _, err = stream.Recv() + statusCode := status.Code(err) + require.Equal(t, codes.Canceled, statusCode) + }) + t.Run("AccountDataStream", func(t *testing.T) { + t.Parallel() + + tt := []struct { + name string + req *pb.AccountDataStreamRequest + err string + }{ + { + name: "missing filter", + req: &pb.AccountDataStreamRequest{}, + err: "`Filter` must be provided", + }, + { + name: "empty filter", + req: &pb.AccountDataStreamRequest{ + Filter: &pb.AccountDataFilter{}, + }, + err: "`Filter.AccountId` must be provided", + }, + { + name: "missing address", + req: &pb.AccountDataStreamRequest{ + Filter: &pb.AccountDataFilter{ + AccountDataFlags: uint32( + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | + pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), + }, + }, + err: "`Filter.AccountId` must be provided", + }, + { + name: "filter with zero flags", + req: &pb.AccountDataStreamRequest{ + Filter: &pb.AccountDataFilter{ + AccountId: &pb.AccountId{Address: addr1.String()}, + AccountDataFlags: uint32(0), + }, + }, + err: "`Filter.AccountDataFlags` must set at least one bitfield", + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + // there should be no error opening the stream + stream, err := c.AccountDataStream(ctx, tc.req) + require.NoError(t, err, "unexpected error opening stream") + + // sending a request should generate an error + _, err = stream.Recv() + require.Error(t, err, "expected an error") + require.ErrorContains(t, err, tc.err, "received unexpected error") + statusCode := status.Code(err) + require.Equal(t, codes.InvalidArgument, statusCode, "expected InvalidArgument error") + }) + } + }) + t.Run("GlobalStateStream", func(t *testing.T) { + t.Parallel() + + t.Run("nonzero flags", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + // Just try opening and immediately closing the stream + ctx, cancel := context.WithCancel(ctx) + stream, err := c.GlobalStateStream(ctx, &pb.GlobalStateStreamRequest{ + GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_ACCOUNT), + }) + require.NoError(t, err, "unexpected error opening stream") + + cancel() + _, err = stream.Recv() + statusCode := status.Code(err) + require.Equal(t, codes.Canceled, statusCode) + }) + t.Run("zero flags", func(t *testing.T) { + t.Parallel() + c, ctx := setupGlobalStateService(t) + + // there should be no error opening the stream + stream, err := c.GlobalStateStream(ctx, &pb.GlobalStateStreamRequest{GlobalStateDataFlags: uint32(0)}) + require.NoError(t, err, "unexpected error opening stream") + + // sending a request should generate an error + _, err = stream.Recv() + require.Error(t, err, "expected an error") + require.ErrorContains(t, err, "`GlobalStateDataFlags` must set at least one bitfield", "received unexpected error") + statusCode := status.Code(err) + require.Equal(t, codes.InvalidArgument, statusCode, "expected InvalidArgument error") + }) + }) +} diff --git a/api/grpcserver/grpc.go b/api/grpcserver/grpc.go index 01a38f81207..9528d4dde67 100644 --- a/api/grpcserver/grpc.go +++ b/api/grpcserver/grpc.go @@ -1,26 +1,39 @@ package grpcserver import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" "net" + "os" "time" + grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + grpctags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" - - "github.com/spacemeshos/go-spacemesh/log" ) // ServiceAPI allows individual grpc services to register the grpc server. type ServiceAPI interface { - RegisterService(*Server) + RegisterService(*grpc.Server) + RegisterHandlerService(*runtime.ServeMux) error + String() string } // Server is a very basic grpc server. type Server struct { listener string - logger log.Logger + logger *zap.Logger // BoundAddress contains the address that the server bound to, useful if // the server uses a dynamic port. It is set during startup and can be // safely accessed after Start has completed (I.E. the returned channel has @@ -30,21 +43,114 @@ type Server struct { grp errgroup.Group } -// New creates and returns a new Server with port and interface. -func New(listener string, lg log.Logger, opts ...grpc.ServerOption) *Server { - opts = append(opts, ServerOptions...) +func unaryGrpcLogStart(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + ctxzap.Info(ctx, "started unary call") + return handler(ctx, req) +} + +func streamingGrpcLogStart(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctxzap.Info(stream.Context(), "started streaming call") + return handler(srv, stream) +} + +// NewPublic creates a new Server listening on the PublicListener address with the given logger and config. +// Services passed in the svc slice are registered with the server. +func NewPublic(logger *zap.Logger, config Config, svc []ServiceAPI) (*Server, error) { + if len(svc) == 0 { + return nil, errors.New("no services to register") + } + + server := New(config.PublicListener, logger, config) + for _, s := range svc { + s.RegisterService(server.GrpcServer) + } + return server, nil +} + +// NewPrivate creates new Server listening on the PrivateListener address with the given logger and config. +// Services passed in the svc slice are registered with the server. +func NewPrivate(logger *zap.Logger, config Config, svc []ServiceAPI) (*Server, error) { + if len(svc) == 0 { + return nil, errors.New("no services to register") + } + + server := New(config.PrivateListener, logger, config) + for _, s := range svc { + s.RegisterService(server.GrpcServer) + } + return server, nil +} + +// NewTLS creates a new Server listening on the TLSListener address with the given logger and config. +// Services passed in the svc slice are registered with the server. +func NewTLS(logger *zap.Logger, config Config, svc []ServiceAPI) (*Server, error) { + if len(svc) == 0 { + return nil, errors.New("no services to register") + } + + serverCert, err := tls.LoadX509KeyPair(config.TLSCert, config.TLSKey) + if err != nil { + return nil, fmt.Errorf("load server certificate: %w", err) + } + caCert, err := os.ReadFile(config.TLSCACert) + if err != nil { + return nil, fmt.Errorf("load ca certificate: %w", err) + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("setup CA certificate") + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{serverCert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: certPool, + } + + server := New(config.TLSListener, logger, config, grpc.Creds(credentials.NewTLS(tlsConfig))) + for _, s := range svc { + s.RegisterService(server.GrpcServer) + } + return server, nil +} + +// New creates and returns a new Server listening on the given address. +// The server is configured with the given logger and config. Additional grpc options can be passed. +func New(listener string, logger *zap.Logger, config Config, grpcOpts ...grpc.ServerOption) *Server { + opts := []grpc.ServerOption{ + grpc.ChainStreamInterceptor(grpctags.StreamServerInterceptor(), grpczap.StreamServerInterceptor(logger), streamingGrpcLogStart), + grpc.ChainUnaryInterceptor(grpctags.UnaryServerInterceptor(), grpczap.UnaryServerInterceptor(logger), unaryGrpcLogStart), + grpc.MaxSendMsgSize(config.GrpcSendMsgSize), + grpc.MaxRecvMsgSize(config.GrpcRecvMsgSize), + } + + // this is done to prevent routers from cleaning up our connections (e.g aws load balances..) + // TODO: these parameters work for now but we might need to revisit or add them as configuration + // TODO: Configure maxconns, maxconcurrentcons .. + opts = append(opts, + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: time.Minute * 120, + MaxConnectionAge: time.Minute * 180, + MaxConnectionAgeGrace: time.Minute * 10, + Time: time.Minute, + Timeout: time.Minute * 3, + }), + ) + + opts = append(opts, grpcOpts...) return &Server{ listener: listener, - logger: lg, + logger: logger, GrpcServer: grpc.NewServer(opts...), } } // Start starts the server. func (s *Server) Start() error { - s.logger.With().Info("starting grpc server", - log.String("address", s.listener), - log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { + s.logger.Info("starting grpc server", + zap.String("address", s.listener), + zap.Array("services", zapcore.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error { for svc := range s.GrpcServer.GetServiceInfo() { encoder.AppendString(svc) } @@ -53,14 +159,14 @@ func (s *Server) Start() error { ) lis, err := net.Listen("tcp", s.listener) if err != nil { - s.logger.Error("error listening: %v", err) + s.logger.Error("start listen server", zap.Error(err)) return err } s.BoundAddress = lis.Addr().String() reflection.Register(s.GrpcServer) s.grp.Go(func() error { if err := s.GrpcServer.Serve(lis); err != nil { - s.logger.Error("error serving grpc server: %v", err) + s.logger.Error("serving grpc server", zap.Error(err)) return err } return nil @@ -84,17 +190,3 @@ func (s *Server) Close() error { s.GrpcServer.Stop() return s.grp.Wait() } - -// ServerOptions are shared by all grpc servers. -var ServerOptions = []grpc.ServerOption{ - // XXX: this is done to prevent routers from cleaning up our connections (e.g aws load balances..) - // TODO: these parameters work for now but we might need to revisit or add them as configuration - // TODO: Configure maxconns, maxconcurrentcons .. - grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: time.Minute * 120, - MaxConnectionAge: time.Minute * 180, - MaxConnectionAgeGrace: time.Minute * 10, - Time: time.Minute, - Timeout: time.Minute * 3, - }), -} diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/grpcserver_test.go index 61d65c717e8..116c3434b35 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/grpcserver_test.go @@ -10,7 +10,6 @@ import ( "math" "math/big" "net" - "net/http" "os" "path/filepath" "strconv" @@ -24,14 +23,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" "github.com/spacemeshos/go-spacemesh/activation" @@ -89,9 +87,6 @@ var ( challenge = newChallenge(1, prevAtxID, prevAtxID, postGenesisEpoch) globalAtx *types.VerifiedActivationTx globalAtx2 *types.VerifiedActivationTx - signer *signing.EdSigner - signer1 *signing.EdSigner - signer2 *signing.EdSigner globalTx *types.Transaction globalTx2 *types.Transaction ballot1 = genLayerBallot(types.LayerID(11)) @@ -131,10 +126,10 @@ func genLayerBlock(layerID types.LayerID, txs []types.TransactionID) *types.Bloc return b } -func dialGrpc(ctx context.Context, tb testing.TB, address string) *grpc.ClientConn { +func dialGrpc(ctx context.Context, tb testing.TB, cfg Config) *grpc.ClientConn { tb.Helper() conn, err := grpc.DialContext(ctx, - address, + cfg.PublicListener, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) @@ -143,33 +138,21 @@ func dialGrpc(ctx context.Context, tb testing.TB, address string) *grpc.ClientCo return conn } -func newEdSigner(t *testing.T) *signing.EdSigner { - t.Helper() - signer, err := signing.NewEdSigner() - require.NoError(t, err) - return signer -} - -func newAddress(t *testing.T) types.Address { - t.Helper() - return wallet.Address(newEdSigner(t).PublicKey().Bytes()) -} - func TestMain(m *testing.M) { types.SetLayersPerEpoch(layersPerEpoch) var err error - signer, err = signing.NewEdSigner() + signer, err := signing.NewEdSigner() if err != nil { log.Println("failed to create signer:", err) os.Exit(1) } - signer1, err = signing.NewEdSigner() + signer1, err := signing.NewEdSigner() if err != nil { log.Println("failed to create signer:", err) os.Exit(1) } - signer2, err = signing.NewEdSigner() + signer2, err := signing.NewEdSigner() if err != nil { log.Println("failed to create signer:", err) os.Exit(1) @@ -432,49 +415,20 @@ func newChallenge(sequence uint64, prevAtxID, posAtxID types.ATXID, epoch types. } } -func marshalProto(t *testing.T, msg proto.Message) []byte { - buf, err := protojson.Marshal(msg) - require.NoError(t, err) - return buf -} - func launchServer(tb testing.TB, services ...ServiceAPI) (Config, func()) { cfg := DefaultTestConfig() + cfg.PublicListener = "127.0.0.1:0" // run on random port - // run on random ports - grpcService := New("127.0.0.1:0", logtest.New(tb).Named("grpc")) - jsonService := NewJSONHTTPServer("127.0.0.1:0", logtest.New(tb).WithName("grpc.JSON")) - - // attach services - for _, svc := range services { - svc.RegisterService(grpcService) - } + grpcService, err := NewPublic(zaptest.NewLogger(tb).Named("grpc"), cfg, services) + require.NoError(tb, err) - // start gRPC and json servers + // start gRPC server require.NoError(tb, grpcService.Start()) - if len(services) > 0 { - require.NoError(tb, jsonService.StartService(context.Background(), services...)) - } // update config with bound addresses cfg.PublicListener = grpcService.BoundAddress - cfg.JSONListener = jsonService.BoundAddress - - return cfg, func() { - assert.NoError(tb, grpcService.Close()) - assert.NoError(tb, jsonService.Shutdown(context.Background())) - } -} -func callEndpoint(t *testing.T, url string, payload []byte) ([]byte, int) { - resp, err := http.Post(url, "application/json", bytes.NewReader(payload)) - require.NoError(t, err) - require.Equal(t, "application/json", resp.Header.Get("Content-Type")) - buf, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - - return buf, resp.StatusCode + return cfg, func() { assert.NoError(tb, grpcService.Close()) } } func getFreePort(optionalPort int) (int, error) { @@ -496,382 +450,13 @@ func TestNewServersConfig(t *testing.T) { port2, err := getFreePort(0) require.NoError(t, err, "Should be able to establish a connection on a port") - grpcService := New(fmt.Sprintf(":%d", port1), logtest.New(t).Named("grpc")) - jsonService := NewJSONHTTPServer(fmt.Sprintf(":%d", port2), logtest.New(t).WithName("grpc.JSON")) + grpcService := New(fmt.Sprintf(":%d", port1), zaptest.NewLogger(t).Named("grpc"), DefaultTestConfig()) + jsonService := NewJSONHTTPServer(fmt.Sprintf(":%d", port2), zaptest.NewLogger(t).Named("grpc.JSON")) require.Contains(t, grpcService.listener, strconv.Itoa(port1), "Expected same port") require.Contains(t, jsonService.listener, strconv.Itoa(port2), "Expected same port") } -func TestNodeService(t *testing.T) { - ctrl := gomock.NewController(t) - syncer := NewMocksyncer(ctrl) - syncer.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes() - peerCounter := NewMockpeerCounter(ctrl) - peerCounter.EXPECT().PeerCount().Return(uint64(0)).AnyTimes() - genTime := NewMockgenesisTimeAPI(ctrl) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - version := "v0.0.0" - build := "cafebabe" - grpcService := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build) - cfg, cleanup := launchServer(t, grpcService) - t.Cleanup(cleanup) - - conn := dialGrpc(ctx, t, cfg.PublicListener) - c := pb.NewNodeServiceClient(conn) - - // Construct an array of test cases to test each endpoint in turn - testCases := []struct { - name string - run func(t *testing.T) - }{ - {"Echo", func(t *testing.T) { - const message = "Hello World" - res, err := c.Echo(context.Background(), &pb.EchoRequest{ - Msg: &pb.SimpleString{Value: message}, - }) - require.NoError(t, err) - require.Equal(t, message, res.Msg.Value) - - // now try sending bad payloads - _, err = c.Echo(context.Background(), &pb.EchoRequest{Msg: nil}) - require.Error(t, err) - grpcStatus, ok := status.FromError(err) - require.True(t, ok) - require.Equal(t, codes.InvalidArgument, grpcStatus.Code()) - require.Equal(t, "Must include `Msg`", grpcStatus.Message()) - - _, err = c.Echo(context.Background(), &pb.EchoRequest{}) - require.Error(t, err) - grpcStatus, ok = status.FromError(err) - require.True(t, ok) - require.Equal(t, codes.InvalidArgument, grpcStatus.Code()) - require.Equal(t, "Must include `Msg`", grpcStatus.Message()) - }}, - {"Version", func(t *testing.T) { - res, err := c.Version(context.Background(), &emptypb.Empty{}) - require.NoError(t, err) - require.Equal(t, version, res.VersionString.Value) - }}, - {"Build", func(t *testing.T) { - res, err := c.Build(context.Background(), &emptypb.Empty{}) - require.NoError(t, err) - require.Equal(t, build, res.BuildString.Value) - }}, - {"Status", func(t *testing.T) { - // First do a mock checking during a genesis layer - // During genesis all layers should be set to current layer - - layerCurrent := types.LayerID(layersPerEpoch) // end of first epoch - genTime.EXPECT().CurrentLayer().Return(layerCurrent) - req := &pb.StatusRequest{} - res, err := c.Status(context.Background(), req) - require.NoError(t, err) - require.Equal(t, uint64(0), res.Status.ConnectedPeers) - require.Equal(t, false, res.Status.IsSynced) - require.Equal(t, layerLatest.Uint32(), res.Status.SyncedLayer.Number) - require.Equal(t, layerCurrent.Uint32(), res.Status.TopLayer.Number) - require.Equal(t, layerLatest.Uint32(), res.Status.VerifiedLayer.Number) - - // Now do a mock check post-genesis - layerCurrent = types.LayerID(12) - genTime.EXPECT().CurrentLayer().Return(layerCurrent) - res, err = c.Status(context.Background(), req) - require.NoError(t, err) - require.Equal(t, uint64(0), res.Status.ConnectedPeers) - require.Equal(t, false, res.Status.IsSynced) - require.Equal(t, layerLatest.Uint32(), res.Status.SyncedLayer.Number) - require.Equal(t, layerCurrent.Uint32(), res.Status.TopLayer.Number) - require.Equal(t, layerVerified.Uint32(), res.Status.VerifiedLayer.Number) - }}, - {"NodeInfo", func(t *testing.T) { - resp, err := c.NodeInfo(ctx, &emptypb.Empty{}) - require.NoError(t, err) - require.Equal(t, resp.Hrp, types.NetworkHRP()) - require.Equal(t, resp.FirstGenesis, types.FirstEffectiveGenesis().Uint32()) - require.Equal(t, resp.EffectiveGenesis, types.GetEffectiveGenesis().Uint32()) - require.Equal(t, resp.EpochSize, types.GetLayersPerEpoch()) - }}, - // NOTE: ErrorStream and StatusStream have comprehensive, E2E tests in cmd/node/node_test.go. - } - - for _, tc := range testCases { - t.Run(tc.name, tc.run) - } -} - -func TestGlobalStateService(t *testing.T) { - svc := NewGlobalStateService(meshAPIMock, conStateAPI) - cfg, cleanup := launchServer(t, svc) - t.Cleanup(cleanup) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) - c := pb.NewGlobalStateServiceClient(conn) - - // Construct an array of test cases to test each endpoint in turn - testCases := []struct { - name string - run func(*testing.T) - }{ - {"GlobalStateHash", func(t *testing.T) { - res, err := c.GlobalStateHash(context.Background(), &pb.GlobalStateHashRequest{}) - require.NoError(t, err) - require.Equal(t, layerVerified.Uint32(), res.Response.Layer.Number) - require.Equal(t, stateRoot.Bytes(), res.Response.RootHash) - }}, - {"Account", func(t *testing.T) { - res, err := c.Account(context.Background(), &pb.AccountRequest{ - AccountId: &pb.AccountId{Address: addr1.String()}, - }) - require.NoError(t, err) - require.Equal(t, addr1.String(), res.AccountWrapper.AccountId.Address) - require.Equal(t, uint64(accountBalance), res.AccountWrapper.StateCurrent.Balance.Value) - require.Equal(t, uint64(accountCounter), res.AccountWrapper.StateCurrent.Counter) - require.Equal(t, uint64(accountBalance+1), res.AccountWrapper.StateProjected.Balance.Value) - require.Equal(t, uint64(accountCounter+1), res.AccountWrapper.StateProjected.Counter) - }}, - {"AccountDataQuery_MissingFilter", func(t *testing.T) { - _, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{}) - require.Error(t, err) - require.Contains(t, err.Error(), "`Filter` must be provided") - }}, - {"AccountDataQuery_MissingFlags", func(t *testing.T) { - _, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{ - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - }, - }) - require.Error(t, err) - require.Contains(t, err.Error(), "`Filter.AccountMeshDataFlags` must set at least one") - }}, - {"AccountDataQuery_BadOffset", func(t *testing.T) { - res, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{ - MaxResults: uint32(1), - Offset: math.MaxUint32, - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), - }, - }) - // huge offset is not an error, we just expect no results - require.NoError(t, err) - require.Equal(t, uint32(0), res.TotalResults) - require.Equal(t, 0, len(res.AccountItem)) - }}, - {"AccountDataQuery_ZeroMaxResults", func(t *testing.T) { - res, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{ - MaxResults: uint32(0), - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), - }, - }) - // zero maxresults means return everything - require.NoError(t, err) - require.Equal(t, uint32(2), res.TotalResults) - require.Equal(t, 2, len(res.AccountItem)) - }}, - {"AccountDataQuery_OneResult", func(t *testing.T) { - res, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{ - MaxResults: uint32(1), - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), - }, - }) - require.NoError(t, err) - require.Equal(t, uint32(2), res.TotalResults) - require.Equal(t, 1, len(res.AccountItem)) - checkAccountDataQueryItemReward(t, res.AccountItem[0].Datum) - }}, - {"AccountDataQuery", func(t *testing.T) { - res, err := c.AccountDataQuery(context.Background(), &pb.AccountDataQueryRequest{ - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - AccountDataFlags: uint32(pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD), - }, - }) - require.NoError(t, err) - require.Equal(t, uint32(2), res.TotalResults) - require.Equal(t, 2, len(res.AccountItem)) - checkAccountDataQueryItemReward(t, res.AccountItem[0].Datum) - checkAccountDataQueryItemAccount(t, res.AccountItem[1].Datum) - }}, - {"AppEventStream", func(t *testing.T) { - stream, err := c.AppEventStream(context.Background(), &pb.AppEventStreamRequest{}) - // We expect to be able to open the stream but for it to fail upon the first request - require.NoError(t, err) - _, err = stream.Recv() - statusCode := status.Code(err) - require.Equal(t, codes.Unimplemented, statusCode) - }}, - {name: "AccountDataStream", run: func(t *testing.T) { - // common testing framework - generateRunFn := func(req *pb.AccountDataStreamRequest) func(*testing.T) { - return func(*testing.T) { - // Just try opening and immediately closing the stream - stream, err := c.AccountDataStream(context.Background(), req) - require.NoError(t, err, "unexpected error opening stream") - - // Do we need this? It doesn't seem to cause any harm - stream.Context().Done() - } - } - generateRunFnError := func(msg string, req *pb.AccountDataStreamRequest) func(*testing.T) { - return func(t *testing.T) { - // there should be no error opening the stream - stream, err := c.AccountDataStream(context.Background(), req) - require.NoError(t, err, "unexpected error opening stream") - - // sending a request should generate an error - _, err = stream.Recv() - require.Error(t, err, "expected an error") - require.Contains(t, err.Error(), msg, "received unexpected error") - statusCode := status.Code(err) - require.Equal(t, codes.InvalidArgument, statusCode, "expected InvalidArgument error") - - // Do we need this? It doesn't seem to cause any harm - stream.Context().Done() - } - } - subtests := []struct { - name string - run func(*testing.T) - }{ - // ERROR INPUTS - // We expect these to produce errors - { - name: "missing filter", - run: generateRunFnError("`Filter` must be provided", &pb.AccountDataStreamRequest{}), - }, - { - name: "empty filter", - run: generateRunFnError("`Filter.AccountId` must be provided", &pb.AccountDataStreamRequest{ - Filter: &pb.AccountDataFilter{}, - }), - }, - { - name: "missing address", - run: generateRunFnError("`Filter.AccountId` must be provided", &pb.AccountDataStreamRequest{ - Filter: &pb.AccountDataFilter{ - AccountDataFlags: uint32( - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), - }, - }), - }, - { - name: "filter with zero flags", - run: generateRunFnError("`Filter.AccountDataFlags` must set at least one bitfield", &pb.AccountDataStreamRequest{ - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: addr1.String()}, - AccountDataFlags: uint32(0), - }, - }), - }, - - // SUCCESS - { - name: "empty address", - run: generateRunFn(&pb.AccountDataStreamRequest{ - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{}, - AccountDataFlags: uint32( - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), - }, - }), - }, - { - name: "invalid address", - run: generateRunFn(&pb.AccountDataStreamRequest{ - Filter: &pb.AccountDataFilter{ - AccountId: &pb.AccountId{Address: types.GenerateAddress([]byte{'A'}).String()}, - AccountDataFlags: uint32( - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_REWARD | - pb.AccountDataFlag_ACCOUNT_DATA_FLAG_ACCOUNT), - }, - }), - }, - } - - // Run sub-subtests - for _, r := range subtests { - t.Run(r.name, r.run) - } - }}, - {name: "GlobalStateStream", run: func(t *testing.T) { - // common testing framework - generateRunFn := func(req *pb.GlobalStateStreamRequest) func(*testing.T) { - return func(*testing.T) { - // Just try opening and immediately closing the stream - stream, err := c.GlobalStateStream(context.Background(), req) - require.NoError(t, err, "unexpected error opening stream") - - // Do we need this? It doesn't seem to cause any harm - stream.Context().Done() - } - } - generateRunFnError := func(msg string, req *pb.GlobalStateStreamRequest) func(*testing.T) { - return func(t *testing.T) { - // there should be no error opening the stream - stream, err := c.GlobalStateStream(context.Background(), req) - require.NoError(t, err, "unexpected error opening stream") - - // sending a request should generate an error - _, err = stream.Recv() - require.Error(t, err, "expected an error") - require.Contains(t, err.Error(), msg, "received unexpected error") - statusCode := status.Code(err) - require.Equal(t, codes.InvalidArgument, statusCode, "expected InvalidArgument error") - - // Do we need this? It doesn't seem to cause any harm - stream.Context().Done() - } - } - subtests := []struct { - name string - run func(*testing.T) - }{ - // ERROR INPUTS - // We expect these to produce errors - { - name: "zero flags", - run: generateRunFnError("`GlobalStateDataFlags` must set at least one bitfield", - &pb.GlobalStateStreamRequest{GlobalStateDataFlags: uint32(0)}), - }, - - // SUCCESS - { - name: "nonzero flags", - run: generateRunFn(&pb.GlobalStateStreamRequest{ - GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_ACCOUNT), - }), - }, - } - - // Run sub-subtests - for _, r := range subtests { - t.Run(r.name, r.run) - } - }}, - } - - // Run subtests - for _, tc := range testCases { - t.Run(tc.name, tc.run) - } -} - type smesherServiceConn struct { pb.SmesherServiceClient @@ -891,7 +476,7 @@ func setupSmesherService(t *testing.T) (*smesherServiceConn, context.Context) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) client := pb.NewSmesherServiceClient(conn) return &smesherServiceConn{ @@ -953,10 +538,11 @@ func TestSmesherService(t *testing.T) { t.Run("SmesherID", func(t *testing.T) { t.Parallel() c, ctx := setupSmesherService(t) - c.smeshingProvider.EXPECT().SmesherID().Return(signer.NodeID()) + nodeId := types.RandomNodeID() + c.smeshingProvider.EXPECT().SmesherID().Return(nodeId) res, err := c.SmesherID(ctx, &emptypb.Empty{}) require.NoError(t, err) - require.Equal(t, signer.NodeID().Bytes(), res.PublicKey) + require.Equal(t, nodeId.Bytes(), res.PublicKey) }) t.Run("SetCoinbaseMissingArgs", func(t *testing.T) { @@ -1063,14 +649,14 @@ func TestMeshService(t *testing.T) { genTime.EXPECT().GenesisTime().Return(genesis) genTime.EXPECT().CurrentLayer().Return(layerCurrent).AnyTimes() db := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - grpcService := NewMeshService(db, meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) + svc := NewMeshService(db, meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) require.NoError(t, activesets.Add(db, ballot1.EpochData.ActiveSetHash, &types.EpochActiveSet{Set: types.ATXIDList{globalAtx.ID(), globalAtx2.ID()}})) - cfg, cleanup := launchServer(t, grpcService) + cfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewMeshServiceClient(conn) // Construct an array of test cases to test each endpoint in turn @@ -1575,13 +1161,13 @@ func TestTransactionServiceSubmitUnsync(t *testing.T) { txHandler := NewMocktxValidator(ctrl) txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil) - grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) - cfg, cleanup := launchServer(t, grpcService) + svc := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) + cfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewTransactionServiceClient(conn) serializedTx, err := codec.Encode(globalTx) @@ -1620,7 +1206,7 @@ func TestTransactionServiceSubmitInvalidTx(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewTransactionServiceClient(conn) serializedTx, err := codec.Encode(globalTx) @@ -1653,7 +1239,7 @@ func TestTransactionService_SubmitNoConcurrency(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewTransactionServiceClient(conn) for i := 0; i < numTxs; i++ { res, err := c.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{ @@ -1681,7 +1267,7 @@ func TestTransactionService(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewTransactionServiceClient(conn) // Construct an array of test cases to test each endpoint in turn @@ -2021,7 +1607,7 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewMeshServiceClient(conn) // set up the grpc listener stream @@ -2077,7 +1663,7 @@ func TestAccountDataStream_comprehensive(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewGlobalStateServiceClient(conn) // set up the grpc listener stream @@ -2136,7 +1722,7 @@ func TestGlobalStateStream_comprehensive(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewGlobalStateServiceClient(conn) // set up the grpc listener stream @@ -2200,7 +1786,7 @@ func TestLayerStream_comprehensive(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) // set up the grpc listener stream c := pb.NewMeshServiceClient(conn) @@ -2341,8 +1927,8 @@ func TestMultiService(t *testing.T) { cfg, shutDown := launchServer(t, svc1, svc2) t.Cleanup(shutDown) - c1 := pb.NewNodeServiceClient(dialGrpc(ctx, t, cfg.PublicListener)) - c2 := pb.NewMeshServiceClient(dialGrpc(ctx, t, cfg.PublicListener)) + c1 := pb.NewNodeServiceClient(dialGrpc(ctx, t, cfg)) + c2 := pb.NewMeshServiceClient(dialGrpc(ctx, t, cfg)) // call endpoints and validate results const message = "Hello World" @@ -2371,50 +1957,6 @@ func TestMultiService(t *testing.T) { require.Contains(t, err2.Error(), "rpc error: code = Unavailable") } -func TestJsonApi(t *testing.T) { - const message = "hello world!" - - // we cannot start the gateway service without enabling at least one service - cfg, shutDown := launchServer(t) - t.Cleanup(shutDown) - time.Sleep(time.Second) - - payload := marshalProto(t, &pb.EchoRequest{Msg: &pb.SimpleString{Value: message}}) - url := fmt.Sprintf("http://%s/%s", cfg.JSONListener, "v1/node/echo") - _, err := http.Post(url, "application/json", bytes.NewReader(payload)) - require.Error(t, err) - shutDown() - - // enable services and try again - ctrl := gomock.NewController(t) - syncer := NewMocksyncer(ctrl) - syncer.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes() - peerCounter := NewMockpeerCounter(ctrl) - genTime := NewMockgenesisTimeAPI(ctrl) - genesis := time.Unix(genTimeUnix, 0) - genTime.EXPECT().GenesisTime().Return(genesis) - svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe") - svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) - cfg, cleanup := launchServer(t, svc1, svc2) - t.Cleanup(cleanup) - time.Sleep(time.Second) - - // generate request payload (api input params) - payload = marshalProto(t, &pb.EchoRequest{Msg: &pb.SimpleString{Value: message}}) - respBody, respStatus := callEndpoint(t, fmt.Sprintf("http://%s/v1/node/echo", cfg.JSONListener), payload) - require.Equal(t, http.StatusOK, respStatus) - var msg pb.EchoResponse - require.NoError(t, protojson.Unmarshal(respBody, &msg)) - require.Equal(t, message, msg.Msg.Value) - - // Test MeshService - respBody2, respStatus2 := callEndpoint(t, fmt.Sprintf("http://%s/v1/mesh/genesistime", cfg.JSONListener), nil) - require.Equal(t, http.StatusOK, respStatus2) - var msg2 pb.GenesisTimeResponse - require.NoError(t, protojson.Unmarshal(respBody2, &msg2)) - require.Equal(t, uint64(genesis.Unix()), msg2.Unixtime.Value) -} - func TestDebugService(t *testing.T) { ctrl := gomock.NewController(t) identity := NewMocknetworkIdentity(ctrl) @@ -2426,7 +1968,7 @@ func TestDebugService(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) c := pb.NewDebugServiceClient(conn) t.Run("Accounts", func(t *testing.T) { @@ -2531,8 +2073,8 @@ func TestEventsReceived(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn1 := dialGrpc(ctx, t, cfg.PublicListener) - conn2 := dialGrpc(ctx, t, cfg.PublicListener) + conn1 := dialGrpc(ctx, t, cfg) + conn2 := dialGrpc(ctx, t, cfg) txClient := pb.NewTransactionServiceClient(conn1) accountClient := pb.NewGlobalStateServiceClient(conn2) @@ -2608,9 +2150,9 @@ func TestTransactionsRewards(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - client := pb.NewGlobalStateServiceClient(dialGrpc(ctx, t, cfg.PublicListener)) + client := pb.NewGlobalStateServiceClient(dialGrpc(ctx, t, cfg)) - address := newAddress(t) + address := wallet.Address(types.RandomNodeID().Bytes()) weight := new(big.Rat).SetFloat64(18.7) rewards := []types.CoinbaseReward{{Coinbase: address, Weight: types.RatNumFromBigRat(weight)}} @@ -2698,7 +2240,7 @@ func TestVMAccountUpdates(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - client := pb.NewGlobalStateServiceClient(dialGrpc(ctx, t, cfg.PublicListener)) + client := pb.NewGlobalStateServiceClient(dialGrpc(ctx, t, cfg)) eg, ctx := errgroup.WithContext(ctx) states := make(chan *pb.AccountState, len(accounts)) for _, account := range accounts { @@ -2785,7 +2327,7 @@ func TestMeshService_EpochStream(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) client := pb.NewMeshServiceClient(conn) stream, err := client.EpochStream(ctx, &pb.EpochStreamRequest{Epoch: epoch.Uint32()}) diff --git a/api/grpcserver/grpcserver_tls_test.go b/api/grpcserver/grpcserver_tls_test.go new file mode 100644 index 00000000000..2faa27e3546 --- /dev/null +++ b/api/grpcserver/grpcserver_tls_test.go @@ -0,0 +1,46 @@ +package grpcserver + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +const ( + caCert = "testdata/ca.crt" + serverCert = "testdata/server.crt" + serverKey = "testdata/server.key" + clientCert = "testdata/client.crt" + clientKey = "testdata/client.key" +) + +func launchTLSServer(tb testing.TB, services ...ServiceAPI) (Config, func()) { + pwd, err := os.Getwd() + require.NoError(tb, err) + caCert := filepath.Join(pwd, caCert) + require.FileExists(tb, caCert) + serverCert := filepath.Join(pwd, serverCert) + require.FileExists(tb, serverCert) + serverKey := filepath.Join(pwd, serverKey) + require.FileExists(tb, serverKey) + + cfg := DefaultTestConfig() + cfg.TLSCACert = caCert + cfg.TLSCert = serverCert + cfg.TLSKey = serverKey + + grpcService, err := NewTLS(zaptest.NewLogger(tb).Named("grpc.TLS"), cfg, services) + require.NoError(tb, err) + + // start gRPC server + require.NoError(tb, grpcService.Start()) + + // update config with bound addresses + cfg.TLSListener = grpcService.BoundAddress + + return cfg, func() { assert.NoError(tb, grpcService.Close()) } +} diff --git a/api/grpcserver/http_server.go b/api/grpcserver/http_server.go index 0d026f224a6..1dc58e23b66 100644 --- a/api/grpcserver/http_server.go +++ b/api/grpcserver/http_server.go @@ -8,17 +8,15 @@ import ( "net/http" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "golang.org/x/sync/errgroup" - - "github.com/spacemeshos/go-spacemesh/log" ) // JSONHTTPServer is a JSON http server providing the Spacemesh API. // It is implemented using a grpc-gateway. See https://github.com/grpc-ecosystem/grpc-gateway . type JSONHTTPServer struct { listener string - logger log.Logger + logger *zap.Logger // BoundAddress contains the address that the server bound to, useful if // the server uses a dynamic port. It is set during startup and can be @@ -26,11 +24,11 @@ type JSONHTTPServer struct { // been waited on) BoundAddress string server *http.Server - grp errgroup.Group + eg errgroup.Group } // NewJSONHTTPServer creates a new json http server. -func NewJSONHTTPServer(listener string, lg log.Logger) *JSONHTTPServer { +func NewJSONHTTPServer(listener string, lg *zap.Logger) *JSONHTTPServer { return &JSONHTTPServer{ logger: lg, listener: listener, @@ -49,7 +47,7 @@ func (s *JSONHTTPServer) Shutdown(ctx context.Context) error { return fmt.Errorf("shutdown: %w", err) } } - err := s.grp.Wait() + err := s.eg.Wait() if errors.Is(err, http.ErrServerClosed) { return nil } @@ -66,52 +64,27 @@ func (s *JSONHTTPServer) StartService( s.logger.Error("not starting grpc gateway service; at least one service must be enabled") return errors.New("no services provided") } - ctx, cancel := context.WithCancel(ctx) - - // This will close all downstream connections when the server closes - defer cancel() - - mux := runtime.NewServeMux() // register each individual, enabled service - serviceCount := 0 + mux := runtime.NewServeMux() for _, svc := range services { - var err error - switch typed := svc.(type) { - case *GlobalStateService: - err = pb.RegisterGlobalStateServiceHandlerServer(ctx, mux, typed) - case *MeshService: - err = pb.RegisterMeshServiceHandlerServer(ctx, mux, typed) - case *NodeService: - err = pb.RegisterNodeServiceHandlerServer(ctx, mux, typed) - case *SmesherService: - err = pb.RegisterSmesherServiceHandlerServer(ctx, mux, typed) - case *TransactionService: - err = pb.RegisterTransactionServiceHandlerServer(ctx, mux, typed) - case *DebugService: - err = pb.RegisterDebugServiceHandlerServer(ctx, mux, typed) - } - if err != nil { - s.logger.Error("registering %T with grpc gateway failed with %v", svc, err) - return err + if err := svc.RegisterHandlerService(mux); err != nil { + return fmt.Errorf("registering service %s with grpc gateway failed: %w", svc, err) } - serviceCount++ } - s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener)) - + s.logger.Info("starting grpc gateway server", zap.String("address", s.listener)) lis, err := net.Listen("tcp", s.listener) if err != nil { - s.logger.Error("error listening: %v", err) - return err + return fmt.Errorf("listening on %s: %w", s.listener, err) } s.BoundAddress = lis.Addr().String() s.server = &http.Server{ Handler: mux, } - s.grp.Go(func() error { + s.eg.Go(func() error { if err := s.server.Serve(lis); err != nil { - s.logger.Error("error from grpc http server: %v", err) + s.logger.Error("serving grpc server", zap.Error(err)) return nil } return nil diff --git a/api/grpcserver/http_server_test.go b/api/grpcserver/http_server_test.go new file mode 100644 index 00000000000..4448b20451d --- /dev/null +++ b/api/grpcserver/http_server_test.go @@ -0,0 +1,91 @@ +package grpcserver + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/datastore" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/sql" +) + +func launchJsonServer(tb testing.TB, services ...ServiceAPI) (Config, func()) { + cfg := DefaultTestConfig() + + // run on random port + jsonService := NewJSONHTTPServer("127.0.0.1:0", zaptest.NewLogger(tb).Named("grpc.JSON")) + + // start json server + require.NoError(tb, jsonService.StartService(context.Background(), services...)) + + // update config with bound address + cfg.JSONListener = jsonService.BoundAddress + + return cfg, func() { assert.NoError(tb, jsonService.Shutdown(context.Background())) } +} + +func callEndpoint(ctx context.Context, tb testing.TB, url string, body []byte) ([]byte, int) { + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + require.NoError(tb, err) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(tb, err) + require.Equal(tb, "application/json", resp.Header.Get("Content-Type")) + buf, err := io.ReadAll(resp.Body) + require.NoError(tb, err) + require.NoError(tb, resp.Body.Close()) + return buf, resp.StatusCode +} + +func TestJsonApi(t *testing.T) { + const layerDuration = 10 * time.Second + const layerAvgSize = 10 + const txsPerProposal = 99 + const version = "v0.0.0" + const build = "cafebabe" + + ctrl, ctx := gomock.WithContext(context.Background(), t) + peerCounter := NewMockpeerCounter(ctrl) + meshAPIMock := NewMockmeshAPI(ctrl) + genTime := NewMockgenesisTimeAPI(ctrl) + syncer := NewMocksyncer(ctrl) + conStateAPI := NewMockconservativeState(ctrl) + svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build) + svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, 5, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) + cfg, cleanup := launchJsonServer(t, svc1, svc2) + t.Cleanup(cleanup) + time.Sleep(time.Second) + + // generate request payload (api input params) + const message = "hello world!" + payload, err := protojson.Marshal(&pb.EchoRequest{Msg: &pb.SimpleString{Value: message}}) + require.NoError(t, err) + respBody, respStatus := callEndpoint(ctx, t, fmt.Sprintf("http://%s/v1/node/echo", cfg.JSONListener), payload) + require.Equal(t, http.StatusOK, respStatus) + var msg pb.EchoResponse + require.NoError(t, protojson.Unmarshal(respBody, &msg)) + require.Equal(t, message, msg.Msg.Value) + + // Test MeshService + now := time.Now() + genTime.EXPECT().GenesisTime().Return(now) + respBody2, respStatus2 := callEndpoint(ctx, t, fmt.Sprintf("http://%s/v1/mesh/genesistime", cfg.JSONListener), nil) + require.Equal(t, http.StatusOK, respStatus2) + var msg2 pb.GenesisTimeResponse + require.NoError(t, protojson.Unmarshal(respBody2, &msg2)) + require.Equal(t, uint64(now.Unix()), msg2.Unixtime.Value) +} diff --git a/api/grpcserver/mesh_service.go b/api/grpcserver/mesh_service.go index c8a9a8aa814..fdb4d53d661 100644 --- a/api/grpcserver/mesh_service.go +++ b/api/grpcserver/mesh_service.go @@ -8,8 +8,10 @@ import ( "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -35,8 +37,17 @@ type MeshService struct { } // RegisterService registers this service with a grpc server instance. -func (s MeshService) RegisterService(server *Server) { - pb.RegisterMeshServiceServer(server.GrpcServer, s) +func (s MeshService) RegisterService(server *grpc.Server) { + pb.RegisterMeshServiceServer(server, s) +} + +func (s MeshService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterMeshServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (s MeshService) String() string { + return "MeshService" } // NewMeshService creates a new service using config data. diff --git a/api/grpcserver/mesh_service_test.go b/api/grpcserver/mesh_service_test.go index a68a6902ff8..16044eb339e 100644 --- a/api/grpcserver/mesh_service_test.go +++ b/api/grpcserver/mesh_service_test.go @@ -147,7 +147,7 @@ func TestMeshService_MalfeasanceQuery(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) client := pb.NewMeshServiceClient(conn) nodeID, proof := BallotMalfeasance(t, db) @@ -190,7 +190,7 @@ func TestMeshService_MalfeasanceStream(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn := dialGrpc(ctx, t, cfg.PublicListener) + conn := dialGrpc(ctx, t, cfg) client := pb.NewMeshServiceClient(conn) for i := 0; i < 10; i++ { diff --git a/api/grpcserver/node_service.go b/api/grpcserver/node_service.go index 58c3b693403..c3fb61e4c26 100644 --- a/api/grpcserver/node_service.go +++ b/api/grpcserver/node_service.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap/zapcore" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -29,8 +31,17 @@ type NodeService struct { } // RegisterService registers this service with a grpc server instance. -func (s NodeService) RegisterService(server *Server) { - pb.RegisterNodeServiceServer(server.GrpcServer, s) +func (s NodeService) RegisterService(server *grpc.Server) { + pb.RegisterNodeServiceServer(server, s) +} + +func (s NodeService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterNodeServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (s NodeService) String() string { + return "NodeService" } // NewNodeService creates a new grpc service using config data. diff --git a/api/grpcserver/node_service_test.go b/api/grpcserver/node_service_test.go new file mode 100644 index 00000000000..752635c3ab0 --- /dev/null +++ b/api/grpcserver/node_service_test.go @@ -0,0 +1,153 @@ +package grpcserver + +import ( + "context" + "testing" + "time" + + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/spacemeshos/go-spacemesh/common/types" +) + +type nodeServiceConn struct { + pb.NodeServiceClient + + meshAPI *MockmeshAPI + genTime *MockgenesisTimeAPI + peerCounter *MockpeerCounter + syncer *Mocksyncer +} + +func setupNodeService(t *testing.T) (*nodeServiceConn, context.Context) { + ctrl, mockCtx := gomock.WithContext(context.Background(), t) + peerCounter := NewMockpeerCounter(ctrl) + meshAPI := NewMockmeshAPI(ctrl) + genTime := NewMockgenesisTimeAPI(ctrl) + syncer := NewMocksyncer(ctrl) + + version := "v0.0.0" + build := "cafebabe" + grpcService := NewNodeService(peerCounter, meshAPI, genTime, syncer, version, build) + cfg, cleanup := launchServer(t, grpcService) + t.Cleanup(cleanup) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + conn := dialGrpc(ctx, t, cfg) + client := pb.NewNodeServiceClient(conn) + + return &nodeServiceConn{ + NodeServiceClient: client, + + meshAPI: meshAPI, + genTime: genTime, + peerCounter: peerCounter, + syncer: syncer, + }, mockCtx +} + +func TestNodeService(t *testing.T) { + t.Run("Echo", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + const message = "Hello World" + res, err := c.Echo(ctx, &pb.EchoRequest{ + Msg: &pb.SimpleString{Value: message}, + }) + require.NoError(t, err) + require.Equal(t, message, res.Msg.Value) + + // now try sending bad payloads + _, err = c.Echo(ctx, &pb.EchoRequest{Msg: nil}) + require.Error(t, err) + grpcStatus, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.InvalidArgument, grpcStatus.Code()) + require.Equal(t, "Must include `Msg`", grpcStatus.Message()) + + _, err = c.Echo(ctx, &pb.EchoRequest{}) + require.Error(t, err) + grpcStatus, ok = status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.InvalidArgument, grpcStatus.Code()) + require.Equal(t, "Must include `Msg`", grpcStatus.Message()) + }) + t.Run("Version", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + res, err := c.Version(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Equal(t, "v0.0.0", res.VersionString.Value) + }) + t.Run("Build", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + res, err := c.Build(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Equal(t, "cafebabe", res.BuildString.Value) + }) + t.Run("Status genesis", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + // First do a mock checking during a genesis layer + // During genesis all layers should be set to current layer + layerLatest := types.LayerID(10) + c.meshAPI.EXPECT().LatestLayer().Return(layerLatest) + layerCurrent := types.LayerID(layersPerEpoch) // end of first epoch + c.genTime.EXPECT().CurrentLayer().Return(layerCurrent) + c.peerCounter.EXPECT().PeerCount().Return(0) + c.syncer.EXPECT().IsSynced(gomock.Any()).Return(false) + + res, err := c.Status(ctx, &pb.StatusRequest{}) + require.NoError(t, err) + require.Equal(t, uint64(0), res.Status.ConnectedPeers) + require.Equal(t, false, res.Status.IsSynced) + require.Equal(t, layerLatest.Uint32(), res.Status.SyncedLayer.Number) + require.Equal(t, layerCurrent.Uint32(), res.Status.TopLayer.Number) + require.Equal(t, layerLatest.Uint32(), res.Status.VerifiedLayer.Number) + }) + t.Run("Status post-genesis", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + // Now do a mock check post-genesis + layerLatest := types.LayerID(10) + c.meshAPI.EXPECT().LatestLayer().Return(layerLatest) + layerCurrent = types.LayerID(12) + c.genTime.EXPECT().CurrentLayer().Return(layerCurrent) + layerVerified := types.LayerID(8) + c.meshAPI.EXPECT().LatestLayerInState().Return(layerVerified) + c.peerCounter.EXPECT().PeerCount().Return(100) + c.syncer.EXPECT().IsSynced(gomock.Any()).Return(false) + + res, err := c.Status(ctx, &pb.StatusRequest{}) + require.NoError(t, err) + require.Equal(t, uint64(100), res.Status.ConnectedPeers) + require.Equal(t, false, res.Status.IsSynced) + require.Equal(t, layerLatest.Uint32(), res.Status.SyncedLayer.Number) + require.Equal(t, layerCurrent.Uint32(), res.Status.TopLayer.Number) + require.Equal(t, layerVerified.Uint32(), res.Status.VerifiedLayer.Number) + }) + t.Run("NodeInfo", func(t *testing.T) { + t.Parallel() + c, ctx := setupNodeService(t) + + resp, err := c.NodeInfo(ctx, &emptypb.Empty{}) + require.NoError(t, err) + require.Equal(t, resp.Hrp, types.NetworkHRP()) + require.Equal(t, resp.FirstGenesis, types.FirstEffectiveGenesis().Uint32()) + require.Equal(t, resp.EffectiveGenesis, types.GetEffectiveGenesis().Uint32()) + require.Equal(t, resp.EpochSize, types.GetLayersPerEpoch()) + }) + // NOTE: ErrorStream and StatusStream have comprehensive, E2E tests in cmd/node/node_test.go. +} diff --git a/api/grpcserver/post_client.go b/api/grpcserver/post_client.go index 2c6781d25c3..525ac04b4b4 100644 --- a/api/grpcserver/post_client.go +++ b/api/grpcserver/post_client.go @@ -75,19 +75,28 @@ func (pc *postClient) Proof(ctx context.Context, challenge []byte) (*types.Post, } } - meta := proofResp.GetMetadata() - if !bytes.Equal(meta.GetChallenge(), challenge) { - return nil, nil, fmt.Errorf("unexpected challenge: %x", meta.GetChallenge()) + proof := proofResp.GetProof() + proofMeta := proofResp.GetMetadata() + if proofMeta == nil { + return nil, nil, fmt.Errorf("proof metadata is nil") + } + + if !bytes.Equal(proofMeta.GetChallenge(), challenge) { + return nil, nil, fmt.Errorf("unexpected challenge: %x", proofMeta.GetChallenge()) + } + + postMeta := proofMeta.GetMeta() + if postMeta == nil { + return nil, nil, fmt.Errorf("post metadata is nil") } - proof := proofResp.GetProof() return &types.Post{ Nonce: proof.GetNonce(), Indices: proof.GetIndices(), Pow: proof.GetPow(), }, &types.PostMetadata{ - Challenge: meta.GetChallenge(), - LabelsPerUnit: meta.GetLabelsPerUnit(), + Challenge: proofMeta.GetChallenge(), + LabelsPerUnit: postMeta.GetLabelsPerUnit(), }, nil case pb.GenProofStatus_GEN_PROOF_STATUS_ERROR: return nil, nil, fmt.Errorf("error generating proof: %s", proofResp) diff --git a/api/grpcserver/post_service.go b/api/grpcserver/post_service.go index 89adf7a6031..7512629b150 100644 --- a/api/grpcserver/post_service.go +++ b/api/grpcserver/post_service.go @@ -1,11 +1,14 @@ package grpcserver import ( + "context" "fmt" "sync" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/common/types" @@ -27,8 +30,17 @@ type postCommand struct { } // RegisterService registers this service with a grpc server instance. -func (s *PostService) RegisterService(server *Server) { - pb.RegisterPostServiceServer(server.GrpcServer, s) +func (s *PostService) RegisterService(server *grpc.Server) { + pb.RegisterPostServiceServer(server, s) +} + +func (s *PostService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterPostServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (s *PostService) String() string { + return "PostService" } // NewPostService creates a new grpc service using config data. diff --git a/api/grpcserver/post_service_test.go b/api/grpcserver/post_service_test.go index d960c62f09b..f687ba760cc 100644 --- a/api/grpcserver/post_service_test.go +++ b/api/grpcserver/post_service_test.go @@ -3,6 +3,8 @@ package grpcserver import ( "context" "fmt" + "os" + "path/filepath" "testing" "time" @@ -84,6 +86,32 @@ func launchPostSupervisor(tb testing.TB, log *zap.Logger, cfg Config, postOpts a return func() { assert.NoError(tb, ps.Stop()) } } +func launchPostSupervisorTLS(tb testing.TB, log *zap.Logger, cfg Config, postOpts activation.PostSetupOpts) func() { + pwd, err := os.Getwd() + require.NoError(tb, err) + caCert := filepath.Join(pwd, caCert) + require.FileExists(tb, caCert) + clientCert := filepath.Join(pwd, clientCert) + require.FileExists(tb, clientCert) + clientKey := filepath.Join(pwd, clientKey) + require.FileExists(tb, clientKey) + + cmdCfg := activation.DefaultTestPostServiceConfig() + cmdCfg.CACert = caCert + cmdCfg.Cert = clientCert + cmdCfg.Key = clientKey + cmdCfg.NodeAddress = fmt.Sprintf("https://%s", cfg.TLSListener) + postCfg := activation.DefaultPostConfig() + provingOpts := activation.DefaultPostProvingOpts() + provingOpts.RandomXMode = activation.PostRandomXModeLight + + ps, err := activation.NewPostSupervisor(log, cmdCfg, postCfg, postOpts, provingOpts) + require.NoError(tb, err) + require.NotNil(tb, ps) + require.NoError(tb, ps.Start()) + return func() { assert.NoError(tb, ps.Stop()) } +} + func Test_GenerateProof(t *testing.T) { log := zaptest.NewLogger(t) svc := NewPostService(log) @@ -127,6 +155,49 @@ func Test_GenerateProof(t *testing.T) { require.Nil(t, meta) } +func Test_GenerateProof_TLS(t *testing.T) { + log := zaptest.NewLogger(t) + svc := NewPostService(log) + cfg, cleanup := launchTLSServer(t, svc) + t.Cleanup(cleanup) + + opts := activation.DefaultPostSetupOpts() + opts.DataDir = t.TempDir() + opts.ProviderID.SetInt64(int64(initialization.CPUProviderID())) + opts.Scrypt.N = 2 // Speedup initialization in tests. + id := initPost(t, log.Named("post"), opts) + postCleanup := launchPostSupervisorTLS(t, log.Named("supervisor"), cfg, opts) + t.Cleanup(postCleanup) + + var client activation.PostClient + require.Eventually(t, func() bool { + var err error + client, err = svc.Client(id) + return err == nil + }, 10*time.Second, 100*time.Millisecond, "timed out waiting for connection") + + challenge := make([]byte, 32) + for i := range challenge { + challenge[i] = byte(0xca) + } + + proof, meta, err := client.Proof(context.Background(), challenge) + require.NoError(t, err) + require.NotNil(t, proof) + require.NotNil(t, meta) + + // drop connection + postCleanup() + require.Eventually(t, func() bool { + proof, meta, err = client.Proof(context.Background(), challenge) + return err != nil + }, 5*time.Second, 100*time.Millisecond) + + require.ErrorContains(t, err, "post client closed") + require.Nil(t, proof) + require.Nil(t, meta) +} + func Test_Cancel_GenerateProof(t *testing.T) { log := zaptest.NewLogger(t) svc := NewPostService(log) diff --git a/api/grpcserver/smesher_service.go b/api/grpcserver/smesher_service.go index 3bc2b724aa2..bb89c205b13 100644 --- a/api/grpcserver/smesher_service.go +++ b/api/grpcserver/smesher_service.go @@ -7,11 +7,13 @@ import ( "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/spacemeshos/post/config" "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/code" rpcstatus "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -31,8 +33,17 @@ type SmesherService struct { } // RegisterService registers this service with a grpc server instance. -func (s SmesherService) RegisterService(server *Server) { - pb.RegisterSmesherServiceServer(server.GrpcServer, s) +func (s SmesherService) RegisterService(server *grpc.Server) { + pb.RegisterSmesherServiceServer(server, s) +} + +func (s SmesherService) RegisterHandlerService(mux *runtime.ServeMux) error { + return pb.RegisterSmesherServiceHandlerServer(context.Background(), mux, s) +} + +// String returns the name of this service. +func (s SmesherService) String() string { + return "SmesherService" } // NewSmesherService creates a new grpc service using config data. diff --git a/api/grpcserver/testdata/README.md b/api/grpcserver/testdata/README.md new file mode 100644 index 00000000000..0deb71a48c7 --- /dev/null +++ b/api/grpcserver/testdata/README.md @@ -0,0 +1,38 @@ +# Test data + +The files in this folder are only intended to be used for testing. They are not intended to be used in production. + +## Certificates + +The certificate files are used for testing TLS connections. They have been generated using the following commands: + +```bash +# create certificate extensions to allow using them for localhost +cat > domains.ext < 0 { - app.grpcPublicService = app.newGrpc(logger, app.Config.API.PublicListener) - } - if len(app.Config.API.PrivateServices) > 0 { - app.grpcPrivateService = app.newGrpc(logger, app.Config.API.PrivateListener) - } + + // check services for uniques across all endpoints for _, svc := range app.Config.API.PublicServices { if _, exists := unique[svc]; exists { return fmt.Errorf("can't start more than one %s", svc) @@ -1375,7 +1332,6 @@ func (app *App) startAPIServices(ctx context.Context) error { return err } logger.Info("registering public service %s", svc) - gsvc.RegisterService(app.grpcPublicService) public = append(public, gsvc) unique[svc] = struct{}{} } @@ -1388,48 +1344,84 @@ func (app *App) startAPIServices(ctx context.Context) error { return err } logger.Info("registering private service %s", svc) - gsvc.RegisterService(app.grpcPrivateService) + private = append(private, gsvc) unique[svc] = struct{}{} } - if len(app.Config.API.JSONListener) > 0 { - if len(public) == 0 { - return fmt.Errorf("can't start json server without public services") + for _, svc := range app.Config.API.TLSServices { + if _, exists := unique[svc]; exists { + return fmt.Errorf("can't start more than one %s", svc) } - app.jsonAPIService = grpcserver.NewJSONHTTPServer( - app.Config.API.JSONListener, - logger.WithName("JSON"), - ) - app.jsonAPIService.StartService(ctx, public...) + gsvc, err := app.initService(ctx, svc) + if err != nil { + return err + } + logger.Info("registering authenticated service %s", svc) + authenticated = append(authenticated, gsvc) + unique[svc] = struct{}{} } - if app.grpcPublicService != nil { - if err := app.grpcPublicService.Start(); err != nil { + + // start servers if at least one endpoint is defined for them + if len(public) > 0 { + var err error + app.grpcPublicServer, err = grpcserver.NewPublic(logger.Zap(), app.Config.API, public) + if err != nil { + return err + } + if err := app.grpcPublicServer.Start(); err != nil { return err } } - if app.grpcPrivateService != nil { - if err := app.grpcPrivateService.Start(); err != nil { + if len(private) > 0 { + var err error + app.grpcPrivateServer, err = grpcserver.NewPrivate(logger.Zap(), app.Config.API, private) + if err != nil { + return err + } + if err := app.grpcPrivateServer.Start(); err != nil { return err } } + if len(authenticated) > 0 { + var err error + app.grpcTLSServer, err = grpcserver.NewTLS(logger.Zap(), app.Config.API, authenticated) + if err != nil { + return err + } + } + + if len(app.Config.API.JSONListener) > 0 { + if len(public) == 0 { + return fmt.Errorf("start json server without public services") + } + app.jsonAPIServer = grpcserver.NewJSONHTTPServer( + app.Config.API.JSONListener, + logger.Zap().Named("JSON"), + ) + if err := app.jsonAPIServer.StartService(ctx, public...); err != nil { + return fmt.Errorf("start listen server: %w", err) + } + } return nil } func (app *App) stopServices(ctx context.Context) { - if app.jsonAPIService != nil { - if err := app.jsonAPIService.Shutdown(ctx); err != nil { + if app.jsonAPIServer != nil { + if err := app.jsonAPIServer.Shutdown(ctx); err != nil { app.log.With().Error("error stopping json gateway server", log.Err(err)) } } - if app.grpcPublicService != nil { + if app.grpcPublicServer != nil { app.log.Info("stopping public grpc service") - // does not return any errors - _ = app.grpcPublicService.Close() + app.grpcPublicServer.Close() // err is always nil } - if app.grpcPrivateService != nil { + if app.grpcPrivateServer != nil { app.log.Info("stopping private grpc service") - // does not return any errors - _ = app.grpcPrivateService.Close() + app.grpcPrivateServer.Close() // err is always nil + } + if app.grpcTLSServer != nil { + app.log.Info("stopping tls grpc service") + app.grpcTLSServer.Close() // err is always nil } if app.updater != nil { diff --git a/node/node_test.go b/node/node_test.go index 50dded11a62..3fac5f276a1 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -450,6 +450,7 @@ func TestSpacemeshApp_NodeService(t *testing.T) { app := New(WithLog(logger)) app.Config = getTestDefaultConfig(t) + types.SetNetworkHRP(app.Config.NetworkHRP) // ensure that the correct HRP is set when generating the address below app.Config.SMESHING.CoinbaseAccount = types.GenerateAddress([]byte{1}).String() app.Config.SMESHING.Opts.DataDir = t.TempDir() app.Config.SMESHING.Opts.Scrypt.N = 2 @@ -1179,7 +1180,7 @@ func TestAdminEvents(t *testing.T) { app.eg.Wait() // https://github.com/spacemeshos/go-spacemesh/issues/4653 return nil }) - t.Cleanup(func() { eg.Wait() }) + t.Cleanup(func() { assert.NoError(t, eg.Wait()) }) grpcCtx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() diff --git a/node/util_test.go b/node/test_network.go similarity index 97% rename from node/util_test.go rename to node/test_network.go index aa55b35990a..00eefa91caa 100644 --- a/node/util_test.go +++ b/node/test_network.go @@ -64,7 +64,7 @@ func NewTestNetwork(t *testing.T, conf config.Config, l log.Log, size int) []*Te ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn, err := grpc.DialContext(ctx, app.grpcPublicService.BoundAddress, + conn, err := grpc.DialContext(ctx, app.grpcPublicServer.BoundAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), )