From d3ed5fa8312fe2fa0b32a7cf1e34482604b35dd4 Mon Sep 17 00:00:00 2001 From: Alexey Makhov Date: Mon, 23 Dec 2024 11:24:25 +0200 Subject: [PATCH] Updating control nodes one by one. WIP Signed-off-by: Alexey Makhov --- .../k0s_controlplane_controller.go | 169 +++++++++++------- inttest/Makefile | 4 +- ...api_docker_machine_change_template_test.go | 75 ++++---- ...r_machine_template_update_recreate_test.go | 53 +++--- 4 files changed, 163 insertions(+), 138 deletions(-) diff --git a/internal/controller/controlplane/k0s_controlplane_controller.go b/internal/controller/controlplane/k0s_controlplane_controller.go index dad25e934..1a0836572 100644 --- a/internal/controller/controlplane/k0s_controlplane_controller.go +++ b/internal/controller/controlplane/k0s_controlplane_controller.go @@ -314,15 +314,18 @@ func (c *K0sController) reconcileMachines(ctx context.Context, cluster *clusterv desiredMachineNames := make(map[string]bool) var clusterIsUpdating bool + var clusterIsMutating bool for _, m := range machines.SortedByCreationTimestamp() { if m.Spec.Version == nil || (!versionMatches(m, kcp.Spec.Version)) { clusterIsUpdating = true + clusterIsMutating = true if kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace { desiredMachineNames[m.Name] = true } else { machineNamesToDelete[m.Name] = true } } else if !matchesTemplateClonedFrom(infraMachines, kcp, m) { + clusterIsMutating = true machineNamesToDelete[m.Name] = true } else if machines.Len() > int(kcp.Spec.Replicas)+len(machineNamesToDelete) { machineNamesToDelete[m.Name] = true @@ -356,94 +359,106 @@ func (c *K0sController) reconcileMachines(ctx context.Context, cluster *clusterv } } - i := 0 - for len(desiredMachineNames) < int(kcp.Spec.Replicas) { - name := machineName(kcp.Name, i) - log.Log.Info("desire machine", "name", len(desiredMachineNames)) - _, ok := machineNamesToDelete[name] - if !ok { - _, exists := machines[name] - desiredMachineNames[name] = exists + if len(machineNamesToDelete)+len(desiredMachineNames) > int(kcp.Spec.Replicas) { + //for m := range machines { + // if machineNamesToDelete[m] { + // continue + // } + // + // err := c.checkMachineIsReady(ctx, m, cluster) + // if err != nil { + // logger.Error(err, "Error checking machine left", "machine", m) + // return err + // } + //} + m := machines.Newest().Name + err := c.checkMachineIsReady(ctx, m, cluster) + if err != nil { + logger.Error(err, "Error checking machine left", "machine", m) + return err } - i++ - } - log.Log.Info("Desired machines", "count", len(desiredMachineNames)) - - for name, exists := range desiredMachineNames { - if !exists || kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace { - - // Wait for the previous machine to be created to avoid etcd issues if cluster if updating - // OR - // Wait for the first controller to start before creating the next one - // Some providers don't publish failure domains immediately, so wait for the first machine to be ready - // It's not slowing down the process overall, as we wait to the first machine anyway to create join tokens - if clusterIsUpdating || (machines.Len() == 1 && kcp.Spec.Replicas > 1) { - err := c.checkMachineIsReady(ctx, machines.Newest().Name, cluster) - if err != nil { - return err - } - } - - machineFromTemplate, err := c.createMachineFromTemplate(ctx, name, cluster, kcp) - if err != nil { - return fmt.Errorf("error creating machine from template: %w", err) - } - infraRef := corev1.ObjectReference{ - APIVersion: machineFromTemplate.GetAPIVersion(), - Kind: machineFromTemplate.GetKind(), - Name: machineFromTemplate.GetName(), - Namespace: kcp.Namespace, - } + logger.Info("Found machines to delete", "count", len(machineNamesToDelete)) - selectedFailureDomain := failuredomains.PickFewest(ctx, cluster.Status.FailureDomains.FilterControlPlane(), machines) - machine, err := c.createMachine(ctx, name, cluster, kcp, infraRef, selectedFailureDomain) - if err != nil { - return fmt.Errorf("error creating machine: %w", err) - } - machines[machine.Name] = machine + // Remove the oldest machine abd wait for the machine to be deleted to avoid etcd issues + machineToDelete := machines.Filter(func(m *clusterv1.Machine) bool { + return machineNamesToDelete[m.Name] + }).Oldest() + logger.Info("Found oldest machine to delete", "machine", machineToDelete.Name) + if machineToDelete.Status.Phase == string(clusterv1.MachinePhaseDeleting) { + logger.Info("Machine is being deleted, waiting for it to be deleted", "machine", machineToDelete.Name) + return fmt.Errorf("waiting for previous machine to be deleted") } - err = c.createBootstrapConfig(ctx, name, cluster, kcp, machines[name]) + err = c.runMachineDeletionSequence(ctx, logger, cluster, kcp, machineToDelete) if err != nil { - return fmt.Errorf("error creating bootstrap config: %w", err) + return err } + + logger.Info("Deleted machine", "machine", machineToDelete.Name) } - if len(machineNamesToDelete) > 0 { - for m := range machines { - if machineNamesToDelete[m] { - continue - } + //i := 0 + if len(desiredMachineNames) < int(kcp.Spec.Replicas) { + //name := names.SimpleNameGenerator.GenerateName(kcp.Name + "-") - err := c.checkMachineIsReady(ctx, m, cluster) + name := machineName(kcp, machineNamesToDelete, desiredMachineNames) + log.Log.Info("desire machine", "name", len(desiredMachineNames)) + //_, ok := machineNamesToDelete[name] + //if !ok { + // _, exists := machines[name] + // desiredMachineNames[name] = exists + //} + //i++ + //} + //log.Log.Info("Desired machines", "count", len(desiredMachineNames)) + // + //for name, exists := range desiredMachineNames { + // if !exists || kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace { + + // Wait for the previous machine to be created to avoid etcd issues if cluster if updating + // OR + // Wait for the first controller to start before creating the next one + // Some providers don't publish failure domains immediately, so wait for the first machine to be ready + // It's not slowing down the process overall, as we wait to the first machine anyway to create join tokens + if clusterIsMutating || (machines.Len() == 1 && kcp.Spec.Replicas > 1) { + err := c.checkMachineIsReady(ctx, machines.Newest().Name, cluster) if err != nil { - logger.Error(err, "Error checking machine left", "machine", m) return err } } - } - if len(machineNamesToDelete) > 0 { - logger.Info("Found machines to delete", "count", len(machineNamesToDelete)) + machineFromTemplate, err := c.createMachineFromTemplate(ctx, name, cluster, kcp) + if err != nil { + return fmt.Errorf("error creating machine from template: %w", err) + } - // Remove the oldest machine abd wait for the machine to be deleted to avoid etcd issues - machineToDelete := machines.Filter(func(m *clusterv1.Machine) bool { - return machineNamesToDelete[m.Name] - }).Oldest() - logger.Info("Found oldest machine to delete", "machine", machineToDelete.Name) - if machineToDelete.Status.Phase == string(clusterv1.MachinePhaseDeleting) { - logger.Info("Machine is being deleted, waiting for it to be deleted", "machine", machineToDelete.Name) - return fmt.Errorf("waiting for previous machine to be deleted") + infraRef := corev1.ObjectReference{ + APIVersion: machineFromTemplate.GetAPIVersion(), + Kind: machineFromTemplate.GetKind(), + Name: machineFromTemplate.GetName(), + Namespace: kcp.Namespace, } - err := c.runMachineDeletionSequence(ctx, logger, cluster, kcp, machineToDelete) + selectedFailureDomain := failuredomains.PickFewest(ctx, cluster.Status.FailureDomains.FilterControlPlane(), machines) + machine, err := c.createMachine(ctx, name, cluster, kcp, infraRef, selectedFailureDomain) if err != nil { - return err + return fmt.Errorf("error creating machine: %w", err) } + machines[machine.Name] = machine + desiredMachineNames[machine.Name] = true + //} - logger.Info("Deleted machine", "machine", machineToDelete.Name) + err = c.createBootstrapConfig(ctx, name, cluster, kcp, machines[name]) + if err != nil { + return fmt.Errorf("error creating bootstrap config: %w", err) + } } + + if len(desiredMachineNames) < int(kcp.Spec.Replicas) { + return ErrNewMachinesNotReady + } + return nil } @@ -833,8 +848,26 @@ func (c *K0sController) createFRPToken(ctx context.Context, cluster *clusterv1.C }) } -func machineName(base string, i int) string { - return fmt.Sprintf("%s-%d", base, i) +func machineName(kcp *cpv1beta1.K0sControlPlane, machineToDelete, desiredMachines map[string]bool) string { + if len(machineToDelete) == 0 { + return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines)) + } + + f := false + for i := 0; i < int(kcp.Spec.Replicas); i++ { + name := fmt.Sprintf("%s-%d", kcp.Name, i) + _, ok := machineToDelete[name] + if ok { + f = true + } + } + + if f { + return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines)+int(kcp.Spec.Replicas)) + } + return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines)) + + //return fmt.Sprintf("%s-0", kcp.Name) } // SetupWithManager sets up the controller with the Manager. diff --git a/inttest/Makefile b/inttest/Makefile index 126a26f6e..331fd70c3 100644 --- a/inttest/Makefile +++ b/inttest/Makefile @@ -47,7 +47,7 @@ check-capi-remote-machine: TIMEOUT=12m check-capi-remote-machine-template: TIMEOUT=12m check-capi-remote-machine-template-update: TIMEOUT=10m check-capi-docker-machine-template-update: TIMEOUT=15m -check-capi-docker-machine-template-update-recreate: TIMEOUT=15m -check-capi-docker-machine-change-template: TIMEOUT=15m +check-capi-docker-machine-template-update-recreate: TIMEOUT=25m +check-capi-docker-machine-change-template: TIMEOUT=20m check-capi-remote-machine-job-provision: TIMEOUT=10m check-upgrade: TIMEOUT=20m diff --git a/inttest/capi-docker-machine-change-template/capi_docker_machine_change_template_test.go b/inttest/capi-docker-machine-change-template/capi_docker_machine_change_template_test.go index fdb7f4d4e..f47e2979f 100644 --- a/inttest/capi-docker-machine-change-template/capi_docker_machine_change_template_test.go +++ b/inttest/capi-docker-machine-change-template/capi_docker_machine_change_template_test.go @@ -132,9 +132,12 @@ func (s *CAPIDockerMachineChangeTemplate) TestCAPIControlPlaneDockerDownScaling( }) s.Require().NoError(err) - for i := 0; i < 3; i++ { + nodes, err := util.GetControlPlaneNodesIDs("docker-test-") + s.Require().NoError(err) + + for _, node := range nodes { err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { - output, err := exec.Command("docker", "exec", fmt.Sprintf("docker-test-%d", i), "k0s", "status").Output() + output, err := exec.Command("docker", "exec", node, "k0s", "status").Output() if err != nil { return false, nil } @@ -151,66 +154,57 @@ func (s *CAPIDockerMachineChangeTemplate) TestCAPIControlPlaneDockerDownScaling( s.updateClusterObjects() err = wait.PollUntilContextCancel(s.ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { - newNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") + var obj unstructured.UnstructuredList + err := s.client.RESTClient(). + Get(). + AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines"). + Do(s.ctx). + Into(&obj) if err != nil { return false, nil } - return len(newNodeIDs) == 6, nil - }) - s.Require().NoError(err) - - err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { - veryNewNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") - if err != nil { - return false, nil + for _, item := range obj.Items { + if strings.Contains(item.GetName(), "worker") { + continue + } + if item.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation] != "docker-test-cp-template-new" { + return false, nil + } } - return len(veryNewNodeIDs) == 3, nil + return true, nil }) s.Require().NoError(err) - var obj unstructured.Unstructured - err = s.client.RESTClient(). - Get(). - AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines/docker-test-4"). - Do(s.ctx). - Into(&obj) - s.Require().NoError(err) - s.Require().Equal("docker-test-cp-template-new", obj.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation]) - - time.Sleep(time.Minute) s.T().Log("updating cluster objects again") s.updateClusterObjectsAgain() err = wait.PollUntilContextCancel(s.ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { - newNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") + var obj unstructured.UnstructuredList + err := s.client.RESTClient(). + Get(). + AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines"). + Do(s.ctx). + Into(&obj) if err != nil { return false, nil } - return len(newNodeIDs) == 6, nil - }) - s.Require().NoError(err) + for _, item := range obj.Items { + if strings.Contains(item.GetName(), "worker") { + continue + } - err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { - veryNewNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") - if err != nil { - return false, nil + if item.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation] != "docker-test-cp-template-new-2" { + return false, nil + } } - return len(veryNewNodeIDs) == 3, nil + return true, nil }) s.Require().NoError(err) - err = s.client.RESTClient(). - Get(). - AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines/docker-test-2"). - Do(s.ctx). - Into(&obj) - s.Require().NoError(err) - s.Require().Equal("docker-test-cp-template-new-2", obj.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation]) - err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { b, _ := s.client.RESTClient(). Get(). @@ -242,7 +236,8 @@ func (s *CAPIDockerMachineChangeTemplate) updateClusterObjectsAgain() { func (s *CAPIDockerMachineChangeTemplate) deleteCluster() { // Exec via kubectl - out, err := exec.Command("kubectl", "delete", "-f", s.clusterYamlsPath).CombinedOutput() + out, err := exec.Command("kubectl", "delete", "cluster", "docker-test").CombinedOutput() + //out, err := exec.Command("kubectl", "delete", "-f", s.clusterYamlsPath).CombinedOutput() s.Require().NoError(err, "failed to delete cluster objects: %s", string(out)) } diff --git a/inttest/capi-docker-machine-template-update-recreate/capi_docker_machine_template_update_recreate_test.go b/inttest/capi-docker-machine-template-update-recreate/capi_docker_machine_template_update_recreate_test.go index 9cfba00a4..4d2efcbe0 100644 --- a/inttest/capi-docker-machine-template-update-recreate/capi_docker_machine_template_update_recreate_test.go +++ b/inttest/capi-docker-machine-template-update-recreate/capi_docker_machine_template_update_recreate_test.go @@ -19,6 +19,7 @@ package capidockermachinetemplateupdaterecreate import ( "context" "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "os" "os/exec" "strconv" @@ -150,36 +151,32 @@ func (s *CAPIDockerMachineTemplateUpdateRecreate) TestCAPIControlPlaneDockerDown s.T().Log("updating cluster objects") s.updateClusterObjects() - // nolint:staticcheck - err = wait.PollImmediateUntilWithContext(s.ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) { - newNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") - if err != nil { - return false, nil - } - - return len(newNodeIDs) == 6, nil - }) - s.Require().NoError(err) - - // nolint:staticcheck - err = wait.PollImmediateUntilWithContext(s.ctx, 1*time.Second, func(ctx context.Context) (bool, error) { - veryNewNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-") + err = wait.PollUntilContextCancel(s.ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { + var obj unstructured.UnstructuredList + err := s.client.RESTClient(). + Get(). + AbsPath("/apis/cluster.x-k8s.io/v1beta1/namespaces/default/machines"). + Do(s.ctx). + Into(&obj) if err != nil { return false, nil } - return len(veryNewNodeIDs) == 3, nil - }) - s.Require().NoError(err) + for _, item := range obj.Items { + if strings.Contains(item.GetName(), "worker") { + continue + } - // nolint:staticcheck - err = wait.PollImmediateUntilWithContext(s.ctx, 1*time.Second, func(ctx context.Context) (bool, error) { - output, err := exec.Command("docker", "exec", "docker-test-4", "k0s", "status").CombinedOutput() - if err != nil { - return false, nil + v, _, err := unstructured.NestedString(item.Object, "spec", "version") + if err != nil { + return false, nil + } + if v != "v1.31.2+k0s.0" { + return false, nil + } } - return strings.Contains(string(output), "Version: v1.28"), nil + return true, nil }) s.Require().NoError(err) } @@ -198,7 +195,7 @@ func (s *CAPIDockerMachineTemplateUpdateRecreate) updateClusterObjects() { func (s *CAPIDockerMachineTemplateUpdateRecreate) deleteCluster() { // Exec via kubectl - out, err := exec.Command("kubectl", "delete", "-f", s.clusterYamlsPath).CombinedOutput() + out, err := exec.Command("kubectl", "delete", "cluster", "docker-test").CombinedOutput() s.Require().NoError(err, "failed to delete cluster objects: %s", string(out)) } @@ -257,7 +254,7 @@ metadata: name: docker-test spec: replicas: 3 - version: v1.27.1+k0s.0 + version: v1.30.3+k0s.0 updateStrategy: Recreate k0sConfigSpec: k0s: @@ -291,7 +288,7 @@ metadata: name: docker-test-worker-0 namespace: default spec: - version: v1.27.1 + version: v1.30.3 clusterName: docker-test bootstrap: configRef: @@ -310,7 +307,7 @@ metadata: namespace: default spec: # version is deliberately different to be able to verify we actually pick it up :) - version: v1.27.1+k0s.0 + version: v1.30.3+k0s.0 --- apiVersion: infrastructure.cluster.x-k8s.io/v1beta1 kind: DockerMachine @@ -328,7 +325,7 @@ metadata: name: docker-test spec: replicas: 3 - version: v1.28.7+k0s.0 + version: v1.31.2+k0s.0 updateStrategy: Recreate k0sConfigSpec: k0s: