Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: nodeclaim add controller garbagecollection and tagging #41

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {
op.GetClient(),
op.EventRecorder,
cloudProvider,
op.InstanceProvider,
op.PricingProvider,
)...).
Start(ctx, cloudProvider)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ require (
k8s.io/component-base v0.30.3 // indirect
k8s.io/csi-translation-lib v0.30.3 // indirect
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"

nodeclaimgarbagecollection "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimtagging "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/nodeclaim/tagging"
controllerspricing "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/providers/pricing"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/pricing"
)

func NewControllers(ctx context.Context, mgr manager.Manager, clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, pricingProvider pricing.Provider) []controller.Controller {
cloudProvider cloudprovider.CloudProvider, instanceProvider instance.Provider, pricingProvider pricing.Provider) []controller.Controller {

controllers := []controller.Controller{
controllerspricing.NewController(pricingProvider),
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
nodeclaimtagging.NewController(kubeClient, instanceProvider),
}
return controllers
}
117 changes: 117 additions & 0 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2024 The CloudPilot AI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package garbagecollection

import (
"context"
"fmt"
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator/injection"
)

type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
successfulCount: 0,
}
}

func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "instance.garbagecollection")

// We LIST machines on the CloudProvider BEFORE we grab Machines/Nodes on the cluster so that we make sure that, if
// LISTing instances takes a long time, our information is more updated by the time we get to Machine and Node LIST
// This works since our CloudProvider instances are deleted based on whether the Machine exists or not, not vise-versa
retrieved, err := c.cloudProvider.List(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing cloudprovider machines, %w", err)
}
managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool {
return nc.DeletionTimestamp.IsZero()
})
nodeClaimList := &karpv1.NodeClaimList{}
if err = c.kubeClient.List(ctx, nodeClaimList); err != nil {
return reconcile.Result{}, err
}
nodeList := &corev1.NodeList{}
if err = c.kubeClient.List(ctx, nodeList); err != nil {
return reconcile.Result{}, err
}
resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) {
return n.Status.ProviderID, n.Status.ProviderID != ""
})...)
errs := make([]error, len(retrieved))
workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) {
if !resolvedProviderIDs.Has(managedRetrieved[i].Status.ProviderID) &&
time.Since(managedRetrieved[i].CreationTimestamp.Time) > time.Second*30 {
errs[i] = c.garbageCollect(ctx, managedRetrieved[i], nodeList)
}
})
if err = multierr.Combine(errs...); err != nil {
return reconcile.Result{}, err
}
c.successfulCount++
return reconcile.Result{RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2)}, nil
}

func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *corev1.NodeList) error {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("provider-id", nodeClaim.Status.ProviderID))
if err := c.cloudProvider.Delete(ctx, nodeClaim); err != nil {
return cloudprovider.IgnoreNodeClaimNotFoundError(err)
}
log.FromContext(ctx).V(1).Info("garbage collected cloudprovider instance")

// Go ahead and cleanup the node if we know that it exists to make scheduling go quicker
if node, ok := lo.Find(nodeList.Items, func(n corev1.Node) bool {
return n.Spec.ProviderID == nodeClaim.Status.ProviderID
}); ok {
if err := c.kubeClient.Delete(ctx, &node); err != nil {
return client.IgnoreNotFound(err)
}
log.FromContext(ctx).WithValues("Node", klog.KRef("", node.Name)).V(1).Info("garbage collected node")
}
return nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("instance.garbagecollection").
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}
141 changes: 141 additions & 0 deletions pkg/controllers/nodeclaim/tagging/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2024 The CloudPilot AI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tagging

import (
"context"
"fmt"
"time"

"github.com/awslabs/operatorpkg/reasonable"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/apis/v1alpha1"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/operator/options"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils"
)

type Controller struct {
kubeClient client.Client
instanceProvider instance.Provider
}

func NewController(kubeClient client.Client, instanceProvider instance.Provider) *Controller {
return &Controller{
kubeClient: kubeClient,
instanceProvider: instanceProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, nodeClaim *karpv1.NodeClaim) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodeclaim.tagging")

stored := nodeClaim.DeepCopy()
if !isTaggable(nodeClaim) {
return reconcile.Result{}, nil
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("provider-id", nodeClaim.Status.ProviderID))
id, err := utils.ParseInstanceID(nodeClaim.Status.ProviderID)
if err != nil {
// We don't throw an error here since we don't want to retry until the ProviderID has been updated.
log.FromContext(ctx).Error(err, "failed parsing instance id")
return reconcile.Result{}, nil
}
if err = c.tagInstance(ctx, nodeClaim, id); err != nil {
return reconcile.Result{}, cloudprovider.IgnoreNodeClaimNotFoundError(err)
}
nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{
v1alpha1.AnnotationInstanceTagged: "true",
v1alpha1.AnnotationClusterNameTaggedCompatability: "true",
})
if !equality.Semantic.DeepEqual(nodeClaim, stored) {
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.tagging").
For(&karpv1.NodeClaim{}).
WithEventFilter(predicate.NewPredicateFuncs(func(o client.Object) bool {
return isTaggable(o.(*karpv1.NodeClaim))
})).
// Ok with using the default MaxConcurrentReconciles of 1 to avoid throttling from CreateTag write API
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) tagInstance(ctx context.Context, nc *karpv1.NodeClaim, id string) error {
tags := map[string]string{
v1alpha1.TagName: nc.Status.NodeName,
v1alpha1.TagNodeClaim: nc.Name,
v1alpha1.EKSClusterNameTagKey: options.FromContext(ctx).ClusterName,
}

// Remove tags which have been already populated
instance, err := c.instanceProvider.Get(ctx, id)
if err != nil {
return fmt.Errorf("tagging nodeclaim, %w", err)
}
tags = lo.OmitByKeys(tags, lo.Keys(instance.Tags))
if len(tags) == 0 {
return nil
}

// Ensures that no more than 1 CreateTags call is made per second. Rate limiting is required since CreateTags
// shares a pool with other mutating calls (e.g. CreateFleet).
defer time.Sleep(time.Second)
if err := c.instanceProvider.CreateTags(ctx, id, tags); err != nil {
return fmt.Errorf("tagging nodeclaim, %w", err)
}
return nil
}

func isTaggable(nc *karpv1.NodeClaim) bool {
// Instance has already been tagged
instanceTagged := nc.Annotations[v1alpha1.AnnotationInstanceTagged]
clusterNameTagged := nc.Annotations[v1alpha1.AnnotationClusterNameTaggedCompatability]
if instanceTagged == "true" && clusterNameTagged == "true" {
return false
}
// Node name is not yet known
if nc.Status.NodeName == "" {
return false
}
// NodeClaim is currently terminating
if !nc.DeletionTimestamp.IsZero() {
return false
}
return true
}
16 changes: 16 additions & 0 deletions vendor/github.com/awslabs/operatorpkg/reasonable/reasonable.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ github.com/awslabs/operatorpkg/controller
github.com/awslabs/operatorpkg/metrics
github.com/awslabs/operatorpkg/object
github.com/awslabs/operatorpkg/option
github.com/awslabs/operatorpkg/reasonable
github.com/awslabs/operatorpkg/singleton
github.com/awslabs/operatorpkg/status
# github.com/beorn7/perks v1.0.1
Expand Down