From 05a768b0e8d88e072753ca3e0bd737c56971c716 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Sun, 1 Sep 2024 13:32:52 +0200 Subject: [PATCH 1/6] The stacks don't need to be stored in the manager struct The map is only ever used in the loop to create and remove stacks, so it doesn't need to be stored in the struct. This ensures that there can't be any racy concurrent accesses to it. Signed-off-by: Tom Wieczorek (cherry picked from commit ba547ede5f660f015cc40d4cecc2514088922166) (cherry picked from commit c6dba063e0426515ec91144f5e2f6a6ef7fcbabe) --- pkg/applier/manager.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 4774b75fdf44..170c5bdcaf06 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -38,12 +38,10 @@ type Manager struct { K0sVars *config.CfgVars KubeClientFactory kubeutil.ClientFactoryInterface - // client kubernetes.Interface applier Applier bundlePath string cancelWatcher context.CancelFunc log *logrus.Entry - stacks map[string]stack LeaderElector leaderelector.Interface } @@ -62,7 +60,6 @@ func (m *Manager) Init(ctx context.Context) error { return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err) } m.log = logrus.WithField("component", constant.ApplierManagerComponentName) - m.stacks = make(map[string]stack) m.bundlePath = m.K0sVars.ManifestsDir m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) @@ -120,8 +117,10 @@ func (m *Manager) runWatchers(ctx context.Context) error { return err } + stacks := make(map[string]stack, len(dirs)) + for _, dir := range dirs { - m.createStack(ctx, path.Join(m.bundlePath, dir)) + m.createStack(ctx, stacks, path.Join(m.bundlePath, dir)) } for { @@ -139,11 +138,12 @@ func (m *Manager) runWatchers(ctx context.Context) error { switch event.Op { case fsnotify.Create: if dir.IsDirectory(event.Name) { - m.createStack(ctx, event.Name) + m.createStack(ctx, stacks, event.Name) } case fsnotify.Remove: - m.removeStack(ctx, event.Name) + m.removeStack(ctx, stacks, event.Name) } + case <-ctx.Done(): log.Info("manifest watcher done") return nil @@ -151,15 +151,15 @@ func (m *Manager) runWatchers(ctx context.Context) error { } } -func (m *Manager) createStack(ctx context.Context, name string) { +func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name string) { // safeguard in case the fswatcher would trigger an event for an already existing stack - if _, ok := m.stacks[name]; ok { + if _, ok := stacks[name]; ok { return } stackCtx, cancelStack := context.WithCancel(ctx) stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)} - m.stacks[name] = stack + stacks[name] = stack go func() { log := m.log.WithField("stack", name) @@ -180,8 +180,8 @@ func (m *Manager) createStack(ctx context.Context, name string) { }() } -func (m *Manager) removeStack(ctx context.Context, name string) { - stack, ok := m.stacks[name] +func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name string) { + stack, ok := stacks[name] if !ok { m.log. WithField("path", name). @@ -189,7 +189,7 @@ func (m *Manager) removeStack(ctx context.Context, name string) { return } - delete(m.stacks, name) + delete(stacks, name) stack.CancelFunc() log := m.log.WithField("stack", name) From b9fd9bd56048fcbcb33718625652ad2e6f5a5fd4 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Sun, 1 Sep 2024 13:46:25 +0200 Subject: [PATCH 2/6] Don't check for closed watch channels The only reason these channels get closed is if the watcher itself gets closed. This happens only when the method returns, which in turn only happens when the context is done. In this case, the loop has already exited without a select on a potentially closed channel. So the branches that checked for closed channels were effectively unreachable during runtime. Signed-off-by: Tom Wieczorek (cherry picked from commit db5e0d231d628ed606b113d753af0b8591db885b) (cherry picked from commit 102b7e3d89b3f44e50356cc3b98fc74afa9e4247) --- pkg/applier/manager.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 170c5bdcaf06..6f772a211551 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -125,16 +125,10 @@ func (m *Manager) runWatchers(ctx context.Context) error { for { select { - case err, ok := <-watcher.Errors: - if !ok { - return err - } + case err := <-watcher.Errors: + log.WithError(err).Error("Watch error") - log.Warnf("watch error: %s", err.Error()) - case event, ok := <-watcher.Events: - if !ok { - return nil - } + case event := <-watcher.Events: switch event.Op { case fsnotify.Create: if dir.IsDirectory(event.Name) { From 95e5ec079ccaa6318cc9c5041d9593865a411977 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 3 Sep 2024 08:09:37 +0200 Subject: [PATCH 3/6] Wait for goroutines to exit in applier manager Rename cancelWatcher to stop and wait until the newly added stopped channel is closed. Also, add a stopped channel to each stack to do the same for each stack-specific goroutine. Signed-off-by: Tom Wieczorek (cherry picked from commit 402c72804fc7d0fb83a074055f867aee642c8f8c) (cherry picked from commit dbc286cefbcc5e1114cf909e25852edfb7808397) --- pkg/applier/manager.go | 46 ++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 6f772a211551..33758b5d05fc 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -38,10 +38,10 @@ type Manager struct { K0sVars *config.CfgVars KubeClientFactory kubeutil.ClientFactoryInterface - applier Applier - bundlePath string - cancelWatcher context.CancelFunc - log *logrus.Entry + applier Applier + bundlePath string + stop func() + log *logrus.Entry LeaderElector leaderelector.Interface } @@ -49,7 +49,8 @@ type Manager struct { var _ manager.Component = (*Manager)(nil) type stack = struct { - context.CancelFunc + cancel context.CancelFunc + stopped <-chan struct{} *StackApplier } @@ -65,15 +66,18 @@ func (m *Manager) Init(ctx context.Context) error { m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) m.LeaderElector.AddAcquiredLeaseCallback(func() { - watcherCtx, cancel := context.WithCancel(ctx) - m.cancelWatcher = cancel + ctx, cancel := context.WithCancel(ctx) + stopped := make(chan struct{}) + + m.stop = func() { cancel(); <-stopped } go func() { - _ = m.runWatchers(watcherCtx) + defer close(stopped) + _ = m.runWatchers(ctx) }() }) m.LeaderElector.AddLostLeaseCallback(func() { - if m.cancelWatcher != nil { - m.cancelWatcher() + if m.stop != nil { + m.stop() } }) @@ -87,8 +91,8 @@ func (m *Manager) Start(_ context.Context) error { // Stop stops the Manager func (m *Manager) Stop() error { - if m.cancelWatcher != nil { - m.cancelWatcher() + if m.stop != nil { + m.stop() } return nil } @@ -140,6 +144,10 @@ func (m *Manager) runWatchers(ctx context.Context) error { case <-ctx.Done(): log.Info("manifest watcher done") + for _, stack := range stacks { + <-stack.stopped + } + return nil } } @@ -151,22 +159,25 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name return } - stackCtx, cancelStack := context.WithCancel(ctx) - stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)} + ctx, cancel := context.WithCancel(ctx) + stopped := make(chan struct{}) + + stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)} stacks[name] = stack go func() { + defer close(stopped) log := m.log.WithField("stack", name) for { log.Info("Running stack") - if err := stack.Run(stackCtx); err != nil { + if err := stack.Run(ctx); err != nil { log.WithError(err).Error("Failed to run stack") } select { case <-time.After(10 * time.Second): continue - case <-stackCtx.Done(): + case <-ctx.Done(): log.Info("Stack done") return } @@ -184,7 +195,8 @@ func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name } delete(stacks, name) - stack.CancelFunc() + stack.cancel() + <-stack.stopped log := m.log.WithField("stack", name) if err := stack.DeleteStack(ctx); err != nil { From 394198b08ce94158ac595d3e515d9e1a2af8cc4e Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 3 Sep 2024 08:43:57 +0200 Subject: [PATCH 4/6] Improve logging in applier manager Cancel the contexts with a cause. Add this cause to the log statements when exiting loops. Rename bundlePath to bundleDir to reflect the fact that it is a directory, not a file. Signed-off-by: Tom Wieczorek (cherry picked from commit edb105c7e354fae00455e9f2e5f6f867a16f8171) (cherry picked from commit a22902b5c4ae56133022ea1d8801717d593fee82) --- pkg/applier/manager.go | 60 ++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 33758b5d05fc..f9611a20217a 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -18,6 +18,7 @@ package applier import ( "context" + "errors" "fmt" "path" "time" @@ -38,10 +39,10 @@ type Manager struct { K0sVars *config.CfgVars KubeClientFactory kubeutil.ClientFactoryInterface - applier Applier - bundlePath string - stop func() - log *logrus.Entry + applier Applier + bundleDir string + stop func(reason string) + log *logrus.Entry LeaderElector leaderelector.Interface } @@ -49,7 +50,7 @@ type Manager struct { var _ manager.Component = (*Manager)(nil) type stack = struct { - cancel context.CancelFunc + cancel context.CancelCauseFunc stopped <-chan struct{} *StackApplier } @@ -61,23 +62,23 @@ func (m *Manager) Init(ctx context.Context) error { return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err) } m.log = logrus.WithField("component", constant.ApplierManagerComponentName) - m.bundlePath = m.K0sVars.ManifestsDir + m.bundleDir = m.K0sVars.ManifestsDir m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) m.LeaderElector.AddAcquiredLeaseCallback(func() { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) stopped := make(chan struct{}) - m.stop = func() { cancel(); <-stopped } + m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped } go func() { defer close(stopped) - _ = m.runWatchers(ctx) + m.runWatchers(ctx) }() }) m.LeaderElector.AddLostLeaseCallback(func() { if m.stop != nil { - m.stop() + m.stop("lost leadership") } }) @@ -92,45 +93,48 @@ func (m *Manager) Start(_ context.Context) error { // Stop stops the Manager func (m *Manager) Stop() error { if m.stop != nil { - m.stop() + m.stop("applier manager is stopping") } return nil } -func (m *Manager) runWatchers(ctx context.Context) error { - log := logrus.WithField("component", constant.ApplierManagerComponentName) - +func (m *Manager) runWatchers(ctx context.Context) { watcher, err := fsnotify.NewWatcher() if err != nil { - log.WithError(err).Error("failed to create watcher") - return err + m.log.WithError(err).Error("Failed to create watcher") + return } - defer watcher.Close() + defer func() { + if err := watcher.Close(); err != nil { + m.log.WithError(err).Error("Failed to close watcher") + } + }() - err = watcher.Add(m.bundlePath) + err = watcher.Add(m.bundleDir) if err != nil { - log.Warnf("Failed to start watcher: %s", err.Error()) + m.log.WithError(err).Error("Failed to watch bundle directory") } // Add all directories after the bundle dir has been added to the watcher. // Doing it the other way round introduces a race condition when directories // get created after the initial listing but before the watch starts. - dirs, err := dir.GetAll(m.bundlePath) + dirs, err := dir.GetAll(m.bundleDir) if err != nil { - return err + m.log.WithError(err).Error("Failed to read bundle directory") + return } stacks := make(map[string]stack, len(dirs)) for _, dir := range dirs { - m.createStack(ctx, stacks, path.Join(m.bundlePath, dir)) + m.createStack(ctx, stacks, path.Join(m.bundleDir, dir)) } for { select { case err := <-watcher.Errors: - log.WithError(err).Error("Watch error") + m.log.WithError(err).Error("Watch error") case event := <-watcher.Events: switch event.Op { @@ -143,12 +147,12 @@ func (m *Manager) runWatchers(ctx context.Context) error { } case <-ctx.Done(): - log.Info("manifest watcher done") + m.log.Infof("Watch loop done (%v)", context.Cause(ctx)) for _, stack := range stacks { <-stack.stopped } - return nil + return } } } @@ -159,7 +163,7 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name return } - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) stopped := make(chan struct{}) stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)} @@ -178,7 +182,7 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name case <-time.After(10 * time.Second): continue case <-ctx.Done(): - log.Info("Stack done") + log.Infof("Stack done (%v)", context.Cause(ctx)) return } } @@ -195,7 +199,7 @@ func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name } delete(stacks, name) - stack.cancel() + stack.cancel(errors.New("stack removed")) <-stack.stopped log := m.log.WithField("stack", name) From 87ce481b453112f21c2294a714f680ca971273fa Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 3 Sep 2024 08:55:13 +0200 Subject: [PATCH 5/6] Restart applier manager watch loop on errors Exit the loop on error and restart it after a one-minute delay to allow it to recover in a new run. Also replace the bespoke retry loop for stacks with the Kubernetes client's wait package. Signed-off-by: Tom Wieczorek (cherry picked from commit 404c6cf5c820a583bbd50dff05b35d3073037567) (cherry picked from commit 305846061314f7ba7a1227a34b3a6d045870c596) --- pkg/applier/manager.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index f9611a20217a..27f1ecc48d79 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -30,6 +30,8 @@ import ( "github.com/k0sproject/k0s/pkg/constant" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -73,7 +75,7 @@ func (m *Manager) Init(ctx context.Context) error { m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped } go func() { defer close(stopped) - m.runWatchers(ctx) + wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute) }() }) m.LeaderElector.AddLostLeaseCallback(func() { @@ -113,8 +115,11 @@ func (m *Manager) runWatchers(ctx context.Context) { err = watcher.Add(m.bundleDir) if err != nil { m.log.WithError(err).Error("Failed to watch bundle directory") + return } + m.log.Info("Starting watch loop") + // Add all directories after the bundle dir has been added to the watcher. // Doing it the other way round introduces a race condition when directories // get created after the initial listing but before the watch starts. @@ -125,6 +130,7 @@ func (m *Manager) runWatchers(ctx context.Context) { return } + ctx, cancel := context.WithCancelCause(ctx) stacks := make(map[string]stack, len(dirs)) for _, dir := range dirs { @@ -135,6 +141,7 @@ func (m *Manager) runWatchers(ctx context.Context) { select { case err := <-watcher.Errors: m.log.WithError(err).Error("Watch error") + cancel(err) case event := <-watcher.Events: switch event.Op { @@ -172,20 +179,15 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name go func() { defer close(stopped) log := m.log.WithField("stack", name) - for { + + wait.UntilWithContext(ctx, func(ctx context.Context) { log.Info("Running stack") if err := stack.Run(ctx); err != nil { log.WithError(err).Error("Failed to run stack") } + }, 1*time.Minute) - select { - case <-time.After(10 * time.Second): - continue - case <-ctx.Done(): - log.Infof("Stack done (%v)", context.Cause(ctx)) - return - } - } + log.Infof("Stack done (%v)", context.Cause(ctx)) }() } From 04ba246511a007c88495a66850881a8589893e68 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 3 Sep 2024 09:03:26 +0200 Subject: [PATCH 6/6] Remove unused applier field from applier manager Seems to be a remnant from the past. Signed-off-by: Tom Wieczorek (cherry picked from commit c2beea7658e47c06fd4898239e08e752d0d518ca) (cherry picked from commit 4b2efbe7bac49476d8028c7ddcd00343c7c5e848) --- pkg/applier/manager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 27f1ecc48d79..bdc26015fe8d 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -41,7 +41,6 @@ type Manager struct { K0sVars *config.CfgVars KubeClientFactory kubeutil.ClientFactoryInterface - applier Applier bundleDir string stop func(reason string) log *logrus.Entry @@ -66,8 +65,6 @@ func (m *Manager) Init(ctx context.Context) error { m.log = logrus.WithField("component", constant.ApplierManagerComponentName) m.bundleDir = m.K0sVars.ManifestsDir - m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) - m.LeaderElector.AddAcquiredLeaseCallback(func() { ctx, cancel := context.WithCancelCause(ctx) stopped := make(chan struct{})