Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Remediate unhealthy MachinePool machines #11392

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions config/crd/bases/cluster.x-k8s.io_machinepools.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions exp/api/v1beta1/machinepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type MachinePoolSpec struct {
// failureDomains is the list of failure domains this MachinePool should be attached to.
// +optional
FailureDomains []string `json:"failureDomains,omitempty"`

// The strategy for replacing existing machines with
// new ones.
// +optional
Strategy *MachinePoolStrategy `json:"strategy,omitempty"`
}

// ANCHOR_END: MachinePoolSpec
Expand Down Expand Up @@ -161,6 +166,21 @@ type MachinePoolV1Beta2Status struct {

// ANCHOR_END: MachinePoolStatus

// ANCHOR: MachinePoolStrategy

// MachinePoolStrategy defines how to replace existing machines
// with new ones.
type MachinePoolStrategy struct {
// Remediation controls the strategy of remediating unhealthy machines
// as marked by a MachineHealthCheck. This only applies to infrastructure
// providers supporting "MachinePool Machines". For other providers,
// no remediation is done.
// +optional
Remediation *clusterv1.RemediationStrategy `json:"remediation,omitempty"`
}

// ANCHOR_END: MachinePoolStrategy

// MachinePoolPhase is a string representation of a MachinePool Phase.
//
// This type is a high-level indicator of the status of the MachinePool as it is provisioned,
Expand Down
25 changes: 25 additions & 0 deletions exp/api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

154 changes: 142 additions & 12 deletions exp/internal/controllers/machinepool_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"context"
"fmt"
"reflect"
"sort"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -44,6 +47,7 @@ import (
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/cluster-api/util/labels"
Expand Down Expand Up @@ -279,15 +283,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
// Get the nodeRefsMap from the cluster.
s.nodeRefMap, getNodeRefsErr = r.getNodeRefMap(ctx, clusterClient)

err = r.reconcileMachines(ctx, s, infraConfig)
res := ctrl.Result{}

reconcileMachinesRes, err := r.reconcileMachines(ctx, s, infraConfig)
res = util.LowestNonZeroResult(res, reconcileMachinesRes)

if err != nil || getNodeRefsErr != nil {
return ctrl.Result{}, kerrors.NewAggregate([]error{errors.Wrapf(err, "failed to reconcile Machines for MachinePool %s", klog.KObj(mp)), errors.Wrapf(getNodeRefsErr, "failed to get nodeRefs for MachinePool %s", klog.KObj(mp))})
}

if !mp.Status.InfrastructureReady {
log.Info("Infrastructure provider is not yet ready", infraConfig.GetKind(), klog.KObj(infraConfig))
return ctrl.Result{}, nil
return res, nil
}

var providerIDList []string
Expand All @@ -306,7 +313,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *

if len(providerIDList) == 0 && mp.Status.Replicas != 0 {
log.Info("Retrieved empty spec.providerIDList from infrastructure provider but status.replicas is not zero.", "replicas", mp.Status.Replicas)
return ctrl.Result{}, nil
return res, nil
}

if !reflect.DeepEqual(mp.Spec.ProviderIDList, providerIDList) {
Expand All @@ -316,7 +323,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
mp.Status.UnavailableReplicas = mp.Status.Replicas
}

return ctrl.Result{}, nil
return res, nil
}

// reconcileMachines reconciles Machines associated with a MachinePool.
Expand All @@ -326,18 +333,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
// infrastructure is created accordingly.
// Note: When supported by the cloud provider implementation of the MachinePool, machines will provide a means to interact
// with the corresponding infrastructure (e.g. delete a specific machine in case MachineHealthCheck detects it is unhealthy).
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, infraMachinePool *unstructured.Unstructured) error {
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, infraMachinePool *unstructured.Unstructured) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
mp := s.machinePool

var infraMachineKind string
if err := util.UnstructuredUnmarshalField(infraMachinePool, &infraMachineKind, "status", "infrastructureMachineKind"); err != nil {
if errors.Is(err, util.ErrUnstructuredFieldNotFound) {
log.V(4).Info("MachinePool Machines not supported, no infraMachineKind found")
return nil
return ctrl.Result{}, nil
}

return errors.Wrapf(err, "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s", klog.KObj(mp))
return ctrl.Result{}, errors.Wrapf(err, "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s", klog.KObj(mp))
}

infraMachineSelector := metav1.LabelSelector{
Expand All @@ -354,7 +361,7 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,
infraMachineList.SetAPIVersion(infraMachinePool.GetAPIVersion())
infraMachineList.SetKind(infraMachineKind + "List")
if err := r.Client.List(ctx, &infraMachineList, client.InNamespace(mp.Namespace), client.MatchingLabels(infraMachineSelector.MatchLabels)); err != nil {
return errors.Wrapf(err, "failed to list infra machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
return ctrl.Result{}, errors.Wrapf(err, "failed to list infra machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

// Add watcher for infraMachine, if there isn't one already; this will allow this controller to reconcile
Expand All @@ -365,21 +372,26 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,

// Add watcher for infraMachine, if there isn't one already.
if err := r.externalTracker.Watch(log, sampleInfraMachine, handler.EnqueueRequestsFromMapFunc(r.infraMachineToMachinePoolMapper)); err != nil {
return err
return ctrl.Result{}, err
}

// Get the list of machines managed by this controller, and align it with the infra machines managed by
// the InfraMachinePool controller.
machineList := &clusterv1.MachineList{}
if err := r.Client.List(ctx, machineList, client.InNamespace(mp.Namespace), client.MatchingLabels(infraMachineSelector.MatchLabels)); err != nil {
return err
return ctrl.Result{}, err
}

if err := r.createOrUpdateMachines(ctx, s, machineList.Items, infraMachineList.Items); err != nil {
return errors.Wrapf(err, "failed to create machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
return ctrl.Result{}, errors.Wrapf(err, "failed to create machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

return nil
res, err := r.reconcileUnhealthyMachines(ctx, s, machineList.Items)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to reconcile unhealthy machines for MachinePool %s", klog.KObj(mp))
}

return res, nil
}

// createOrUpdateMachines creates a MachinePool Machine for each infraMachine if it doesn't already exist and sets the owner reference and infraRef.
Expand Down Expand Up @@ -579,3 +591,121 @@ func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, c client.Clie

return nodeRefsMap, nil
}

func (r *MachinePoolReconciler) reconcileUnhealthyMachines(ctx context.Context, s *scope, machines []clusterv1.Machine) (ctrl.Result, error) {
if len(machines) == 0 {
return ctrl.Result{}, nil
}

log := ctrl.LoggerFrom(ctx)
mp := s.machinePool

machinesWithHealthCheck := slices.DeleteFunc(slices.Clone(machines), func(machine clusterv1.Machine) bool {
return !conditions.Has(&machine, clusterv1.MachineHealthCheckSucceededCondition)
})
if len(machinesWithHealthCheck) == 0 {
// This means there is no MachineHealthCheck selecting any machines
// of this machine pool. In this case, do not requeue so often,
// but still check regularly in case a MachineHealthCheck became
// deployed or activated. This long interval shouldn't be a problem
// at cluster creation, since newly-created nodes should anyway
// trigger MachinePool reconciliation as the infrastructure provider
// creates the InfraMachines.
log.V(4).Info("Skipping reconciliation of unhealthy MachinePool machines because there are no health-checked machines")
return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil
}

unhealthyMachines := slices.DeleteFunc(slices.Clone(machines), func(machine clusterv1.Machine) bool {
return !collections.IsUnhealthyAndOwnerRemediated(&machine)
})
log.V(4).Info("Reconciling unhealthy MachinePool machines", "unhealthyMachines", len(unhealthyMachines))

// Calculate how many in flight machines we should remediate.
// By default, we allow all machines to be remediated at the same time.
maxInFlight := len(unhealthyMachines)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll find this mostly copied from the MachineSet remediation code, except that I don't like changing the meaning of a variable throughout the function, so I split up inFlight/maxInFlight variables.

if mp.Spec.Strategy != nil && mp.Spec.Strategy.Remediation != nil {
if mp.Spec.Strategy.Remediation.MaxInFlight != nil {
var err error
replicas := int(ptr.Deref(mp.Spec.Replicas, 1))
maxInFlight, err = intstr.GetScaledValueFromIntOrPercent(mp.Spec.Strategy.Remediation.MaxInFlight, replicas, true)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to calculate maxInFlight to remediate machines: %v", err)
}
log = log.WithValues("maxInFlight", maxInFlight, "replicas", replicas)
}
}

machinesToRemediate := make([]*clusterv1.Machine, 0, len(unhealthyMachines))
inFlight := 0
for _, m := range unhealthyMachines {
if !m.DeletionTimestamp.IsZero() {
if conditions.IsTrue(&m, clusterv1.MachineOwnerRemediatedCondition) {
// Machine has been remediated by this controller and still in flight.
inFlight++
}
continue
}
if conditions.IsFalse(&m, clusterv1.MachineOwnerRemediatedCondition) {
machinesToRemediate = append(machinesToRemediate, &m)
}
}
log = log.WithValues("inFlight", inFlight)

if len(machinesToRemediate) == 0 {
// There's a MachineHealthCheck monitoring the machines, but currently
// no action to be taken. A machine could require remediation at any
// time, so use a short interval until next reconciliation.
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

if inFlight >= maxInFlight {
log.V(3).Info("Remediation strategy is set, and maximum in flight has been reached", "machinesToBeRemediated", len(machinesToRemediate))

// Check soon again whether the already-remediating (= deleting) machines are gone
// so that more machines can be remediated
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

// Sort the machines from newest to oldest.
// We are trying to remediate machines failing to come up first because
// there is a chance that they are not hosting any workloads (minimize disruption).
sort.SliceStable(machinesToRemediate, func(i, j int) bool {
return machinesToRemediate[i].CreationTimestamp.After(machinesToRemediate[j].CreationTimestamp.Time)
})

haveMoreMachinesToRemediate := false
if len(machinesToRemediate) > (maxInFlight - inFlight) {
haveMoreMachinesToRemediate = true
log.V(5).Info("Remediation strategy is set, limiting in flight operations", "machinesToBeRemediated", len(machinesToRemediate))
machinesToRemediate = machinesToRemediate[:(maxInFlight - inFlight)]
}

// Remediate unhealthy machines by deleting them
var errs []error
for _, m := range machinesToRemediate {
log.Info("Deleting unhealthy Machine", "Machine", klog.KObj(m))
patch := client.MergeFrom(m.DeepCopy())
if err := r.Client.Delete(ctx, m); err != nil {
if apierrors.IsNotFound(err) {
continue
}
errs = append(errs, errors.Wrapf(err, "failed to delete Machine %s", klog.KObj(m)))
continue
}
conditions.MarkTrue(m, clusterv1.MachineOwnerRemediatedCondition)
if err := r.Client.Status().Patch(ctx, m, patch); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrapf(err, "failed to update status of Machine %s", klog.KObj(m)))
}
}

if len(errs) > 0 {
return ctrl.Result{}, errors.Wrapf(kerrors.NewAggregate(errs), "failed to delete unhealthy Machines")
}

if haveMoreMachinesToRemediate {
// More machines need remediation, so reconcile again sooner
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
Loading
Loading