Skip to content

Commit

Permalink
Merge pull request #15509 from fuweid/fix-15495
Browse files Browse the repository at this point in the history
server/embed: fix data race when start insecure grpc
  • Loading branch information
serathius authored Mar 19, 2023
2 parents 946918e + a9988e2 commit 30abf17
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
46 changes: 32 additions & 14 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,29 @@ func (sctx *serveCtx) serve(
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)

var gs *grpc.Server
defer func() {
if err != nil && gs != nil {
sctx.lg.Warn("stopping grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped grpc server due to error", zap.Error(err))
}
}()

// Make sure serversC is closed even if we prematurely exit the function.
defer close(sctx.serversC)

if sctx.insecure {
gs = v3rpc.Server(s, nil, nil, gopts...)
gs := v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}

defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
}
}(gs)

grpcl := m.Match(cmux.HTTP2())
go func() { errHandler(gs.Serve(grpcl)) }()
go func(gs *grpc.Server, grpcLis net.Listener) {
errHandler(gs.Serve(grpcLis))
}(gs, grpcl)

var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
Expand All @@ -156,7 +158,10 @@ func (sctx *serveCtx) serve(
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

go func(srvhttp *http.Server, httpLis net.Listener) {
errHandler(srvhttp.Serve(httpLis))
}(srvhttp, httpl)

sctx.serversC <- &servers{grpc: gs, http: srvhttp}
sctx.lg.Info(
Expand All @@ -170,12 +175,22 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, nil, gopts...)

gs := v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}

defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
}
}(gs)

handler = grpcHandlerFunc(gs, handler)

var gwmux *gw.ServeMux
Expand Down Expand Up @@ -208,7 +223,10 @@ func (sctx *serveCtx) serve(
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()

go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, tlsl)

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
sctx.lg.Info(
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func fetchGrpcGateway(endpoint string, httpVersion string, connType e2e.ClientCo
}
req := e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Timeout: 5, HttpVersion: httpVersion}
respData, err := curl(endpoint, "POST", req, connType)
if err != nil {
return err
}
return validateGrpcgatewayRangeReponse([]byte(respData))
}

Expand Down

0 comments on commit 30abf17

Please sign in to comment.