diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 4774b75fdf44..bdc26015fe8d 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -18,6 +18,7 @@ package applier import ( "context" + "errors" "fmt" "path" "time" @@ -29,6 +30,8 @@ import ( "github.com/k0sproject/k0s/pkg/constant" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -38,12 +41,9 @@ type Manager struct { K0sVars *config.CfgVars KubeClientFactory kubeutil.ClientFactoryInterface - // client kubernetes.Interface - applier Applier - bundlePath string - cancelWatcher context.CancelFunc - log *logrus.Entry - stacks map[string]stack + bundleDir string + stop func(reason string) + log *logrus.Entry LeaderElector leaderelector.Interface } @@ -51,7 +51,8 @@ type Manager struct { var _ manager.Component = (*Manager)(nil) type stack = struct { - context.CancelFunc + cancel context.CancelCauseFunc + stopped <-chan struct{} *StackApplier } @@ -62,21 +63,21 @@ func (m *Manager) Init(ctx context.Context) error { return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err) } m.log = logrus.WithField("component", constant.ApplierManagerComponentName) - m.stacks = make(map[string]stack) - m.bundlePath = m.K0sVars.ManifestsDir - - m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) + m.bundleDir = m.K0sVars.ManifestsDir m.LeaderElector.AddAcquiredLeaseCallback(func() { - watcherCtx, cancel := context.WithCancel(ctx) - m.cancelWatcher = cancel + ctx, cancel := context.WithCancelCause(ctx) + stopped := make(chan struct{}) + + m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped } go func() { - _ = m.runWatchers(watcherCtx) + defer close(stopped) + wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute) }() }) m.LeaderElector.AddLostLeaseCallback(func() { - if m.cancelWatcher != nil { - m.cancelWatcher() + if m.stop != nil { + m.stop("lost leadership") } }) @@ -90,98 +91,105 @@ func (m *Manager) Start(_ context.Context) error { // Stop stops the Manager func (m *Manager) Stop() error { - if m.cancelWatcher != nil { - m.cancelWatcher() + if m.stop != nil { + m.stop("applier manager is stopping") } return nil } -func (m *Manager) runWatchers(ctx context.Context) error { - log := logrus.WithField("component", constant.ApplierManagerComponentName) - +func (m *Manager) runWatchers(ctx context.Context) { watcher, err := fsnotify.NewWatcher() if err != nil { - log.WithError(err).Error("failed to create watcher") - return err + m.log.WithError(err).Error("Failed to create watcher") + return } - defer watcher.Close() + defer func() { + if err := watcher.Close(); err != nil { + m.log.WithError(err).Error("Failed to close watcher") + } + }() - err = watcher.Add(m.bundlePath) + err = watcher.Add(m.bundleDir) if err != nil { - log.Warnf("Failed to start watcher: %s", err.Error()) + m.log.WithError(err).Error("Failed to watch bundle directory") + return } + m.log.Info("Starting watch loop") + // Add all directories after the bundle dir has been added to the watcher. // Doing it the other way round introduces a race condition when directories // get created after the initial listing but before the watch starts. - dirs, err := dir.GetAll(m.bundlePath) + dirs, err := dir.GetAll(m.bundleDir) if err != nil { - return err + m.log.WithError(err).Error("Failed to read bundle directory") + return } + ctx, cancel := context.WithCancelCause(ctx) + stacks := make(map[string]stack, len(dirs)) + for _, dir := range dirs { - m.createStack(ctx, path.Join(m.bundlePath, dir)) + m.createStack(ctx, stacks, path.Join(m.bundleDir, dir)) } for { select { - case err, ok := <-watcher.Errors: - if !ok { - return err - } + case err := <-watcher.Errors: + m.log.WithError(err).Error("Watch error") + cancel(err) - log.Warnf("watch error: %s", err.Error()) - case event, ok := <-watcher.Events: - if !ok { - return nil - } + case event := <-watcher.Events: switch event.Op { case fsnotify.Create: if dir.IsDirectory(event.Name) { - m.createStack(ctx, event.Name) + m.createStack(ctx, stacks, event.Name) } case fsnotify.Remove: - m.removeStack(ctx, event.Name) + m.removeStack(ctx, stacks, event.Name) } + case <-ctx.Done(): - log.Info("manifest watcher done") - return nil + m.log.Infof("Watch loop done (%v)", context.Cause(ctx)) + for _, stack := range stacks { + <-stack.stopped + } + + return } } } -func (m *Manager) createStack(ctx context.Context, name string) { +func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name string) { // safeguard in case the fswatcher would trigger an event for an already existing stack - if _, ok := m.stacks[name]; ok { + if _, ok := stacks[name]; ok { return } - stackCtx, cancelStack := context.WithCancel(ctx) - stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)} - m.stacks[name] = stack + ctx, cancel := context.WithCancelCause(ctx) + stopped := make(chan struct{}) + + stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)} + stacks[name] = stack go func() { + defer close(stopped) log := m.log.WithField("stack", name) - for { + + wait.UntilWithContext(ctx, func(ctx context.Context) { log.Info("Running stack") - if err := stack.Run(stackCtx); err != nil { + if err := stack.Run(ctx); err != nil { log.WithError(err).Error("Failed to run stack") } + }, 1*time.Minute) - select { - case <-time.After(10 * time.Second): - continue - case <-stackCtx.Done(): - log.Info("Stack done") - return - } - } + log.Infof("Stack done (%v)", context.Cause(ctx)) }() } -func (m *Manager) removeStack(ctx context.Context, name string) { - stack, ok := m.stacks[name] +func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name string) { + stack, ok := stacks[name] if !ok { m.log. WithField("path", name). @@ -189,8 +197,9 @@ func (m *Manager) removeStack(ctx context.Context, name string) { return } - delete(m.stacks, name) - stack.CancelFunc() + delete(stacks, name) + stack.cancel(errors.New("stack removed")) + <-stack.stopped log := m.log.WithField("stack", name) if err := stack.DeleteStack(ctx); err != nil { diff --git a/pkg/applier/manager_test.go b/pkg/applier/manager_test.go new file mode 100644 index 000000000000..2eb2b9799fb4 --- /dev/null +++ b/pkg/applier/manager_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applier + +import ( + "context" + "embed" + "os" + "path" + "path/filepath" + "sync" + "testing" + "time" + + kubeutil "github.com/k0sproject/k0s/internal/testutil" + "github.com/k0sproject/k0s/pkg/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + yaml "sigs.k8s.io/yaml/goyaml.v2" +) + +//go:embed testdata/manager_test/* +var managerTestData embed.FS + +func TestManager(t *testing.T) { + ctx := context.Background() + + dir := t.TempDir() + + cfg := &config.CfgVars{ + ManifestsDir: dir, + } + + fakes := kubeutil.NewFakeClientFactory() + + le := new(mockLeaderElector) + + manager := &Manager{ + K0sVars: cfg, + KubeClientFactory: fakes, + LeaderElector: le, + } + + writeStack(t, dir, "testdata/manager_test/stack1") + + err := manager.Init(ctx) + require.NoError(t, err) + + err = manager.Start(ctx) + require.NoError(t, err) + + le.activate() + + // validate stack that already exists is applied + + cmgv, _ := schema.ParseResourceArg("configmaps.v1.") + podgv, _ := schema.ParseResourceArg("pods.v1.") + + waitForResource(t, fakes, *cmgv, "kube-system", "applier-test") + waitForResource(t, fakes, *podgv, "kube-system", "applier-test") + + r, err := getResource(fakes, *cmgv, "kube-system", "applier-test") + if assert.NoError(t, err) { + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + r, err = getResource(fakes, *podgv, "kube-system", "applier-test") + if assert.NoError(t, err) { + assert.Equal(t, "Pod", r.GetKind()) + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + + // update the stack and verify the changes are applied + + writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom1", "test") + + t.Log("waiting for pod to be updated") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + r, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if err != nil { + return false, nil + } + return r.GetLabels()["custom1"] == "test", nil + }) + + // lose and re-acquire leadership + le.deactivate() + le.activate() + + // validate a new stack that is added is applied + + writeStack(t, dir, "testdata/manager_test/stack2") + + deployGV, _ := schema.ParseResourceArg("deployments.v1.apps") + + waitForResource(t, fakes, *deployGV, "kube-system", "nginx") + + r, err = getResource(fakes, *deployGV, "kube-system", "nginx") + if assert.NoError(t, err) { + assert.Equal(t, "Deployment", r.GetKind()) + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + + // update the stack after the lease aquire and verify the changes are applied + + writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom2", "test") + + t.Log("waiting for pod to be updated") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + r, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if err != nil { + return false, nil + } + return r.GetLabels()["custom2"] == "test", nil + }) + + // delete the stack and verify the resources are deleted + + err = os.RemoveAll(filepath.Join(dir, "stack1")) + require.NoError(t, err) + + t.Log("waiting for pod to be deleted") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + _, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if errors.IsNotFound(err) { + return true, nil + } + return false, nil + }) +} + +func writeLabel(t *testing.T, file string, key string, value string) { + t.Helper() + contents, err := os.ReadFile(file) + require.NoError(t, err) + unst := map[interface{}]interface{}{} + err = yaml.Unmarshal(contents, &unst) + require.NoError(t, err) + unst["metadata"].(map[interface{}]interface{})["labels"].(map[interface{}]interface{})[key] = value + data, err := yaml.Marshal(unst) + require.NoError(t, err) + err = os.WriteFile(file, data, 0400) + require.NoError(t, err) +} + +func waitForResource(t *testing.T, fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) { + t.Logf("waiting for resource %s/%s", gv.Resource, name) + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + _, err := getResource(fakes, gv, namespace, name) + if errors.IsNotFound(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil + }) +} + +func getResource(fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) (*unstructured.Unstructured, error) { + return fakes.DynamicClient.Resource(gv).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) +} + +func waitFor(t *testing.T, interval, timeout time.Duration, fn wait.ConditionWithContextFunc) { + t.Helper() + err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, fn) + require.NoError(t, err) +} + +func writeStack(t *testing.T, dst string, src string) { + dstStackDir := filepath.Join(dst, path.Base(src)) + err := os.MkdirAll(dstStackDir, 0755) + require.NoError(t, err) + entries, err := managerTestData.ReadDir(src) + require.NoError(t, err) + for _, entry := range entries { + data, err := managerTestData.ReadFile(path.Join(src, entry.Name())) + require.NoError(t, err) + dst := filepath.Join(dstStackDir, entry.Name()) + t.Logf("writing file %s", dst) + err = os.WriteFile(dst, data, 0644) + require.NoError(t, err) + } +} + +type mockLeaderElector struct { + mu sync.Mutex + leader bool + acquired []func() + lost []func() +} + +func (e *mockLeaderElector) activate() { + e.mu.Lock() + defer e.mu.Unlock() + if !e.leader { + e.leader = true + for _, fn := range e.acquired { + fn() + } + } +} + +func (e *mockLeaderElector) deactivate() { + e.mu.Lock() + defer e.mu.Unlock() + if e.leader { + e.leader = false + for _, fn := range e.lost { + fn() + } + } +} + +func (e *mockLeaderElector) IsLeader() bool { + e.mu.Lock() + defer e.mu.Unlock() + return e.leader +} + +func (e *mockLeaderElector) AddAcquiredLeaseCallback(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + e.acquired = append(e.acquired, fn) + if e.leader { + fn() + } +} + +func (e *mockLeaderElector) AddLostLeaseCallback(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + e.lost = append(e.lost, fn) + if e.leader { + fn() + } +} diff --git a/pkg/applier/testdata/manager_test/stack1/configmap.yaml b/pkg/applier/testdata/manager_test/stack1/configmap.yaml new file mode 100644 index 000000000000..e8662b63e3c7 --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack1/configmap.yaml @@ -0,0 +1,9 @@ +kind: ConfigMap +apiVersion: v1 +metadata: + name: applier-test + namespace: kube-system + labels: + component: applier +data: + foo: bar diff --git a/pkg/applier/testdata/manager_test/stack1/pod.yaml b/pkg/applier/testdata/manager_test/stack1/pod.yaml new file mode 100644 index 000000000000..08957e15baf2 --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack1/pod.yaml @@ -0,0 +1,11 @@ +kind: Pod +apiVersion: v1 +metadata: + name: applier-test + namespace: kube-system + labels: + component: applier +spec: + containers: + - name: nginx + image: nginx:1.15 diff --git a/pkg/applier/testdata/manager_test/stack2/deploy.yaml b/pkg/applier/testdata/manager_test/stack2/deploy.yaml new file mode 100644 index 000000000000..48eabe53140f --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack2/deploy.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx + namespace: kube-system + labels: + component: applier +spec: + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: docker.io/nginx:1-alpine + resources: + limits: + memory: "64Mi" + cpu: "100m" + ports: + - containerPort: 80