Skip to content

Commit

Permalink
chore: Graceful shutdown for API Server (#18642) (#21224) (#21229)
Browse files Browse the repository at this point in the history
* fix: Graceful shutdown for the API server (#18642) (#20981)

* fix: Graceful shutdown for the API server (#18642)

Closes #18642

Implements a graceful shutdown the the API server. Without this, ArgoCD API server will eventually return 502 during rolling update. However, healthcheck would return 503 if the server is terminating.





* Init server only once, but keep re-initializing listeners



* Check error for SetParamInSettingConfigMap as needed after fresh master



* Prevent a data race



* Remove unused variable, don't pass lock when not necessary



* Try overriding URL instead of additional URLs



* Use a more specific url



---------





* Use a custom signal for graceful restart



* Re-run tests



---------

Signed-off-by: Andrii Korotkov <[email protected]>
Co-authored-by: Andrii Korotkov <[email protected]>
Co-authored-by: Leonardo Luz Almeida <[email protected]>
Co-authored-by: Michael Crenshaw <[email protected]>
  • Loading branch information
4 people authored Dec 18, 2024
1 parent a89d012 commit 018014c
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewCommand() *cobra.Command {
hydratorEnabled,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer(), nil)

stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion cmd/argocd-repo-server/commands/argocd_repo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewCommand() *cobra.Command {

askPassServer := askpass.NewServer(askpass.SocketPath)
metricsServer := metrics.NewMetricsServer()
cacheutil.CollectMetrics(redisClient, metricsServer)
cacheutil.CollectMetrics(redisClient, metricsServer, nil)
server, err := reposerver.NewServer(metricsServer, cache, tlsConfigCustomizer, repository.RepoServerInitConstants{
ParallelismLimit: parallelismLimit,
PauseGenerationAfterFailedGenerationAttempts: pauseGenerationAfterFailedGenerationAttempts,
Expand Down
15 changes: 9 additions & 6 deletions cmd/argocd-server/commands/argocd_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,22 +260,25 @@ func NewCommand() *cobra.Command {
stats.RegisterHeapDumper("memprofile")
argocd := server.NewServer(ctx, argoCDOpts, appsetOpts)
argocd.Init(ctx)
lns, err := argocd.Listen()
errors.CheckError(err)
for {
var closer func()
ctx, cancel := context.WithCancel(ctx)
serverCtx, cancel := context.WithCancel(ctx)
lns, err := argocd.Listen()
errors.CheckError(err)
if otlpAddress != "" {
closer, err = traceutil.InitTracer(ctx, "argocd-server", otlpAddress, otlpInsecure, otlpHeaders, otlpAttrs)
closer, err = traceutil.InitTracer(serverCtx, "argocd-server", otlpAddress, otlpInsecure, otlpHeaders, otlpAttrs)
if err != nil {
log.Fatalf("failed to initialize tracing: %v", err)
}
}
argocd.Run(ctx, lns)
cancel()
argocd.Run(serverCtx, lns)
if closer != nil {
closer()
}
cancel()
if argocd.TerminateRequested() {
break
}
}
},
Example: templates.Examples(`
Expand Down
187 changes: 154 additions & 33 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"net/url"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
"regexp"
go_runtime "runtime"
"runtime/debug"
"strings"
gosync "sync"
"sync/atomic"
"syscall"
"time"

// nolint:staticcheck
Expand Down Expand Up @@ -187,17 +191,20 @@ type ArgoCDServer struct {
db db.ArgoDB

// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan struct{}
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
stopCh chan os.Signal
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
shutdown func()
terminateRequested atomic.Bool
available atomic.Bool
}

type ArgoCDServerOpts struct {
Expand Down Expand Up @@ -241,6 +248,9 @@ type ApplicationSetOpts struct {
EnableScmProviders bool
}

// GracefulRestartSignal implements a signal to be used for a graceful restart trigger.
type GracefulRestartSignal struct{}

// HTTPMetricsRegistry exposes operations to update http metrics in the Argo CD
// API server.
type HTTPMetricsRegistry interface {
Expand All @@ -253,6 +263,14 @@ type HTTPMetricsRegistry interface {
ObserveExtensionRequestDuration(extension string, duration time.Duration)
}

// String is a part of os.Signal interface to represent a signal as a string.
func (g GracefulRestartSignal) String() string {
return "GracefulRestartSignal"
}

// Signal is a part of os.Signal interface doing nothing.
func (g GracefulRestartSignal) Signal() {}

// initializeDefaultProject creates the default project if it does not already exist
func initializeDefaultProject(opts ArgoCDServerOpts) error {
defaultProj := &v1alpha1.AppProject{
Expand Down Expand Up @@ -330,6 +348,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
pg := extension.NewDefaultProjectGetter(projLister, dbInstance)
ug := extension.NewDefaultUserGetter(policyEnf)
em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, enf, ug)
noopShutdown := func() {
log.Error("API Server Shutdown function called but server is not started yet.")
}

a := &ArgoCDServer{
ArgoCDServerOpts: opts,
Expand All @@ -353,6 +374,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
secretInformer: secretInformer,
configMapInformer: configMapInformer,
extensionManager: em,
shutdown: noopShutdown,
stopCh: make(chan os.Signal, 1),
}

err = a.logInClusterWarnings()
Expand All @@ -370,6 +393,12 @@ const (
)

func (a *ArgoCDServer) healthCheck(r *http.Request) error {
if a.terminateRequested.Load() {
return errors.New("API Server is terminating and unable to serve requests.")
}
if !a.available.Load() {
return errors.New("API Server is not available. It either hasn't started or is restarting.")
}
if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" {
argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset)
_, err := argoDB.ListClusters(r.Context())
Expand Down Expand Up @@ -516,11 +545,19 @@ func (a *ArgoCDServer) Init(ctx context.Context) {
// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of
// golang/protobuf).
func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
defer func() {
if r := recover(); r != nil {
log.WithField("trace", string(debug.Stack())).Error("Recovered from panic: ", r)
a.terminateRequested.Store(true)
a.shutdown()
}
}()

a.userStateStorage.Init(ctx)

metricsServ := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort)
if a.RedisClient != nil {
cacheutil.CollectMetrics(a.RedisClient, metricsServ)
cacheutil.CollectMetrics(a.RedisClient, metricsServ, a.userStateStorage.GetLockObject())
}

svcSet := newArgoCDServiceSet(a)
Expand Down Expand Up @@ -602,35 +639,118 @@ func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
log.Fatal("Timed out waiting for project cache to sync")
}

a.stopCh = make(chan struct{})
<-a.stopCh
shutdownFunc := func() {
log.Info("API Server shutdown initiated. Shutting down servers...")
a.available.Store(false)
shutdownCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
var wg gosync.WaitGroup

// Shutdown http server
wg.Add(1)
go func() {
defer wg.Done()
err := httpS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down http server: %s", err)
}
}()

if a.useTLS() {
// Shutdown https server
wg.Add(1)
go func() {
defer wg.Done()
err := httpsS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down https server: %s", err)
}
}()
}

// Shutdown gRPC server
wg.Add(1)
go func() {
defer wg.Done()
grpcS.GracefulStop()
}()

// Shutdown metrics server
wg.Add(1)
go func() {
defer wg.Done()
err := metricsServ.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down metrics server: %s", err)
}
}()

if a.useTLS() {
// Shutdown tls server
wg.Add(1)
go func() {
defer wg.Done()
tlsm.Close()
}()
}

// Shutdown tcp server
wg.Add(1)
go func() {
defer wg.Done()
tcpm.Close()
}()

c := make(chan struct{})
// This goroutine will wait for all servers to conclude the shutdown
// process
go func() {
defer close(c)
wg.Wait()
}()

select {
case <-c:
log.Info("All servers were gracefully shutdown. Exiting...")
case <-shutdownCtx.Done():
log.Warn("Graceful shutdown timeout. Exiting...")
}
}
a.shutdown = shutdownFunc
signal.Notify(a.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
a.available.Store(true)

select {
case signal := <-a.stopCh:
log.Infof("API Server received signal: %s", signal.String())
gracefulRestartSignal := GracefulRestartSignal{}
if signal != gracefulRestartSignal {
a.terminateRequested.Store(true)
}
a.shutdown()
case <-ctx.Done():
log.Infof("API Server: %s", ctx.Err())
a.terminateRequested.Store(true)
a.shutdown()
}
}

func (a *ArgoCDServer) Initialized() bool {
return a.projInformer.HasSynced() && a.appInformer.HasSynced()
}

// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel
// as opposed to a watch.
func (a *ArgoCDServer) TerminateRequested() bool {
return a.terminateRequested.Load()
}

// checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown
func (a *ArgoCDServer) checkServeErr(name string, err error) {
if err != nil {
if a.stopCh == nil {
// a nil stopCh indicates a graceful shutdown
log.Infof("graceful shutdown %s: %v", name, err)
} else {
log.Fatalf("%s: %v", name, err)
}
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error received from server %s: %v", name, err)
} else {
log.Infof("graceful shutdown %s", name)
}
}

// Shutdown stops the Argo CD server
func (a *ArgoCDServer) Shutdown() {
log.Info("Shut down requested")
stopCh := a.stopCh
a.stopCh = nil
if stopCh != nil {
close(stopCh)
log.Infof("Graceful shutdown of %s initiated", name)
}
}

Expand Down Expand Up @@ -735,9 +855,10 @@ func (a *ArgoCDServer) watchSettings() {
}
}
log.Info("shutting down settings watch")
a.Shutdown()
a.settingsMgr.Unsubscribe(updateCh)
close(updateCh)
// Triggers server restart
a.stopCh <- GracefulRestartSignal{}
}

func (a *ArgoCDServer) rbacPolicyLoader(ctx context.Context) {
Expand Down
Loading

0 comments on commit 018014c

Please sign in to comment.