Skip to content

Commit

Permalink
Updating control nodes one by one. WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Makhov <[email protected]>
  • Loading branch information
makhov committed Dec 23, 2024
1 parent c962a22 commit 2553319
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 138 deletions.
169 changes: 101 additions & 68 deletions internal/controller/controlplane/k0s_controlplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,18 @@ func (c *K0sController) reconcileMachines(ctx context.Context, cluster *clusterv
desiredMachineNames := make(map[string]bool)

var clusterIsUpdating bool
var clusterIsMutating bool
for _, m := range machines.SortedByCreationTimestamp() {
if m.Spec.Version == nil || (!versionMatches(m, kcp.Spec.Version)) {
clusterIsUpdating = true
clusterIsMutating = true
if kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace {
desiredMachineNames[m.Name] = true
} else {
machineNamesToDelete[m.Name] = true
}
} else if !matchesTemplateClonedFrom(infraMachines, kcp, m) {
clusterIsMutating = true
machineNamesToDelete[m.Name] = true
} else if machines.Len() > int(kcp.Spec.Replicas)+len(machineNamesToDelete) {
machineNamesToDelete[m.Name] = true
Expand Down Expand Up @@ -356,94 +359,106 @@ func (c *K0sController) reconcileMachines(ctx context.Context, cluster *clusterv
}
}

i := 0
for len(desiredMachineNames) < int(kcp.Spec.Replicas) {
name := machineName(kcp.Name, i)
log.Log.Info("desire machine", "name", len(desiredMachineNames))
_, ok := machineNamesToDelete[name]
if !ok {
_, exists := machines[name]
desiredMachineNames[name] = exists
if len(machineNamesToDelete)+len(desiredMachineNames) > int(kcp.Spec.Replicas) {
//for m := range machines {
// if machineNamesToDelete[m] {
// continue
// }
//
// err := c.checkMachineIsReady(ctx, m, cluster)
// if err != nil {
// logger.Error(err, "Error checking machine left", "machine", m)
// return err
// }
//}
m := machines.Newest().Name
err := c.checkMachineIsReady(ctx, m, cluster)
if err != nil {
logger.Error(err, "Error checking machine left", "machine", m)
return err
}
i++
}
log.Log.Info("Desired machines", "count", len(desiredMachineNames))

for name, exists := range desiredMachineNames {
if !exists || kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace {

// Wait for the previous machine to be created to avoid etcd issues if cluster if updating
// OR
// Wait for the first controller to start before creating the next one
// Some providers don't publish failure domains immediately, so wait for the first machine to be ready
// It's not slowing down the process overall, as we wait to the first machine anyway to create join tokens
if clusterIsUpdating || (machines.Len() == 1 && kcp.Spec.Replicas > 1) {
err := c.checkMachineIsReady(ctx, machines.Newest().Name, cluster)
if err != nil {
return err
}
}

machineFromTemplate, err := c.createMachineFromTemplate(ctx, name, cluster, kcp)
if err != nil {
return fmt.Errorf("error creating machine from template: %w", err)
}

infraRef := corev1.ObjectReference{
APIVersion: machineFromTemplate.GetAPIVersion(),
Kind: machineFromTemplate.GetKind(),
Name: machineFromTemplate.GetName(),
Namespace: kcp.Namespace,
}
logger.Info("Found machines to delete", "count", len(machineNamesToDelete))

selectedFailureDomain := failuredomains.PickFewest(ctx, cluster.Status.FailureDomains.FilterControlPlane(), machines)
machine, err := c.createMachine(ctx, name, cluster, kcp, infraRef, selectedFailureDomain)
if err != nil {
return fmt.Errorf("error creating machine: %w", err)
}
machines[machine.Name] = machine
// Remove the oldest machine abd wait for the machine to be deleted to avoid etcd issues
machineToDelete := machines.Filter(func(m *clusterv1.Machine) bool {
return machineNamesToDelete[m.Name]
}).Oldest()
logger.Info("Found oldest machine to delete", "machine", machineToDelete.Name)
if machineToDelete.Status.Phase == string(clusterv1.MachinePhaseDeleting) {
logger.Info("Machine is being deleted, waiting for it to be deleted", "machine", machineToDelete.Name)
return fmt.Errorf("waiting for previous machine to be deleted")
}

err = c.createBootstrapConfig(ctx, name, cluster, kcp, machines[name])
err = c.runMachineDeletionSequence(ctx, logger, cluster, kcp, machineToDelete)
if err != nil {
return fmt.Errorf("error creating bootstrap config: %w", err)
return err
}

logger.Info("Deleted machine", "machine", machineToDelete.Name)
}

if len(machineNamesToDelete) > 0 {
for m := range machines {
if machineNamesToDelete[m] {
continue
}
//i := 0
if len(desiredMachineNames) < int(kcp.Spec.Replicas) {
//name := names.SimpleNameGenerator.GenerateName(kcp.Name + "-")

err := c.checkMachineIsReady(ctx, m, cluster)
name := machineName(kcp, machineNamesToDelete, desiredMachineNames)
log.Log.Info("desire machine", "name", len(desiredMachineNames))
_, ok := machineNamesToDelete[name]
if !ok {
_, exists := machines[name]
desiredMachineNames[name] = exists
}
//i++
//}
//log.Log.Info("Desired machines", "count", len(desiredMachineNames))
//
//for name, exists := range desiredMachineNames {
// if !exists || kcp.Spec.UpdateStrategy == cpv1beta1.UpdateInPlace {

// Wait for the previous machine to be created to avoid etcd issues if cluster if updating
// OR
// Wait for the first controller to start before creating the next one
// Some providers don't publish failure domains immediately, so wait for the first machine to be ready
// It's not slowing down the process overall, as we wait to the first machine anyway to create join tokens
if clusterIsMutating || (machines.Len() == 1 && kcp.Spec.Replicas > 1) {
err := c.checkMachineIsReady(ctx, machines.Newest().Name, cluster)
if err != nil {
logger.Error(err, "Error checking machine left", "machine", m)
return err
}
}
}

if len(machineNamesToDelete) > 0 {
logger.Info("Found machines to delete", "count", len(machineNamesToDelete))
machineFromTemplate, err := c.createMachineFromTemplate(ctx, name, cluster, kcp)
if err != nil {
return fmt.Errorf("error creating machine from template: %w", err)
}

// Remove the oldest machine abd wait for the machine to be deleted to avoid etcd issues
machineToDelete := machines.Filter(func(m *clusterv1.Machine) bool {
return machineNamesToDelete[m.Name]
}).Oldest()
logger.Info("Found oldest machine to delete", "machine", machineToDelete.Name)
if machineToDelete.Status.Phase == string(clusterv1.MachinePhaseDeleting) {
logger.Info("Machine is being deleted, waiting for it to be deleted", "machine", machineToDelete.Name)
return fmt.Errorf("waiting for previous machine to be deleted")
infraRef := corev1.ObjectReference{
APIVersion: machineFromTemplate.GetAPIVersion(),
Kind: machineFromTemplate.GetKind(),
Name: machineFromTemplate.GetName(),
Namespace: kcp.Namespace,
}

err := c.runMachineDeletionSequence(ctx, logger, cluster, kcp, machineToDelete)
selectedFailureDomain := failuredomains.PickFewest(ctx, cluster.Status.FailureDomains.FilterControlPlane(), machines)
machine, err := c.createMachine(ctx, name, cluster, kcp, infraRef, selectedFailureDomain)
if err != nil {
return err
return fmt.Errorf("error creating machine: %w", err)
}
machines[machine.Name] = machine
desiredMachineNames[machine.Name] = true
//}

logger.Info("Deleted machine", "machine", machineToDelete.Name)
err = c.createBootstrapConfig(ctx, name, cluster, kcp, machines[name])
if err != nil {
return fmt.Errorf("error creating bootstrap config: %w", err)
}
}

if len(desiredMachineNames) < int(kcp.Spec.Replicas) {
return ErrNewMachinesNotReady
}

return nil
}

Expand Down Expand Up @@ -833,8 +848,26 @@ func (c *K0sController) createFRPToken(ctx context.Context, cluster *clusterv1.C
})
}

func machineName(base string, i int) string {
return fmt.Sprintf("%s-%d", base, i)
func machineName(kcp *cpv1beta1.K0sControlPlane, machineToDelete, desiredMachines map[string]bool) string {
if len(machineToDelete) == 0 {
return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines))
}

f := false
for i := 0; i < int(kcp.Spec.Replicas); i++ {
name := fmt.Sprintf("%s-%d", kcp.Name, i)
_, ok := machineToDelete[name]
if ok {
f = true
}
}

if f {
return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines)+int(kcp.Spec.Replicas))
}
return fmt.Sprintf("%s-%d", kcp.Name, len(desiredMachines))

//return fmt.Sprintf("%s-0", kcp.Name)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
4 changes: 2 additions & 2 deletions inttest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ check-capi-remote-machine: TIMEOUT=12m
check-capi-remote-machine-template: TIMEOUT=12m
check-capi-remote-machine-template-update: TIMEOUT=10m
check-capi-docker-machine-template-update: TIMEOUT=15m
check-capi-docker-machine-template-update-recreate: TIMEOUT=15m
check-capi-docker-machine-change-template: TIMEOUT=15m
check-capi-docker-machine-template-update-recreate: TIMEOUT=20m
check-capi-docker-machine-change-template: TIMEOUT=20m
check-capi-remote-machine-job-provision: TIMEOUT=10m
check-upgrade: TIMEOUT=20m
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,12 @@ func (s *CAPIDockerMachineChangeTemplate) TestCAPIControlPlaneDockerDownScaling(
})
s.Require().NoError(err)

for i := 0; i < 3; i++ {
nodes, err := util.GetControlPlaneNodesIDs("docker-test-")
s.Require().NoError(err)

for _, node := range nodes {
err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
output, err := exec.Command("docker", "exec", fmt.Sprintf("docker-test-%d", i), "k0s", "status").Output()
output, err := exec.Command("docker", "exec", node, "k0s", "status").Output()
if err != nil {
return false, nil
}
Expand All @@ -151,66 +154,57 @@ func (s *CAPIDockerMachineChangeTemplate) TestCAPIControlPlaneDockerDownScaling(
s.updateClusterObjects()

err = wait.PollUntilContextCancel(s.ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
newNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-")
var obj unstructured.UnstructuredList
err := s.client.RESTClient().
Get().
AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines").
Do(s.ctx).
Into(&obj)
if err != nil {
return false, nil
}

return len(newNodeIDs) == 6, nil
})
s.Require().NoError(err)

err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
veryNewNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-")
if err != nil {
return false, nil
for _, item := range obj.Items {
if strings.Contains(item.GetName(), "worker") {
continue
}
if item.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation] != "docker-test-cp-template-new" {
return false, nil
}
}

return len(veryNewNodeIDs) == 3, nil
return true, nil
})
s.Require().NoError(err)

var obj unstructured.Unstructured
err = s.client.RESTClient().
Get().
AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines/docker-test-4").
Do(s.ctx).
Into(&obj)
s.Require().NoError(err)
s.Require().Equal("docker-test-cp-template-new", obj.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation])

time.Sleep(time.Minute)
s.T().Log("updating cluster objects again")
s.updateClusterObjectsAgain()

err = wait.PollUntilContextCancel(s.ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
newNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-")
var obj unstructured.UnstructuredList
err := s.client.RESTClient().
Get().
AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines").
Do(s.ctx).
Into(&obj)
if err != nil {
return false, nil
}

return len(newNodeIDs) == 6, nil
})
s.Require().NoError(err)
for _, item := range obj.Items {
if strings.Contains(item.GetName(), "worker") {
continue
}

err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
veryNewNodeIDs, err := util.GetControlPlaneNodesIDs("docker-test-")
if err != nil {
return false, nil
if item.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation] != "docker-test-cp-template-new-2" {
return false, nil
}
}

return len(veryNewNodeIDs) == 3, nil
return true, nil
})
s.Require().NoError(err)

err = s.client.RESTClient().
Get().
AbsPath("/apis/infrastructure.cluster.x-k8s.io/v1beta1/namespaces/default/dockermachines/docker-test-2").
Do(s.ctx).
Into(&obj)
s.Require().NoError(err)
s.Require().Equal("docker-test-cp-template-new-2", obj.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation])

err = wait.PollUntilContextCancel(s.ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
b, _ := s.client.RESTClient().
Get().
Expand Down Expand Up @@ -242,7 +236,8 @@ func (s *CAPIDockerMachineChangeTemplate) updateClusterObjectsAgain() {

func (s *CAPIDockerMachineChangeTemplate) deleteCluster() {
// Exec via kubectl
out, err := exec.Command("kubectl", "delete", "-f", s.clusterYamlsPath).CombinedOutput()
out, err := exec.Command("kubectl", "delete", "cluster", "docker-test").CombinedOutput()
//out, err := exec.Command("kubectl", "delete", "-f", s.clusterYamlsPath).CombinedOutput()
s.Require().NoError(err, "failed to delete cluster objects: %s", string(out))
}

Expand Down
Loading

0 comments on commit 2553319

Please sign in to comment.