Skip to content

Commit

Permalink
feat: fix the exception when scale 100+ or more ecs
Browse files Browse the repository at this point in the history
  • Loading branch information
peng19940915 authored and jwcesign committed Nov 18, 2024
1 parent 1f7a016 commit 8398981
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ spec:
evictionSoft
rule: has(self.evictionSoftGracePeriod) ? self.evictionSoftGracePeriod.all(e,
(e in self.evictionSoft)):true
resourceGroupId:
description: ResourceGroupID is the resource group id in ECS
pattern: rg-[0-9a-z]+
type: string
securityGroupSelectorTerms:
description: SecurityGroupSelectorTerms is a list of or security group
selector terms. The terms are ORed.
Expand Down Expand Up @@ -266,6 +270,7 @@ spec:
type: string
type: array
performanceLevel:
default: PL0
description: |-
The performance level of the ESSD to use as the system disk. Default value: PL0.
Valid values:
Expand Down Expand Up @@ -311,6 +316,13 @@ spec:
rule: self.all(k, k !='karpenter.sh/nodeclaim')
- message: tag contains a restricted tag matching karpenter.k8s.alibabacloud/ecsnodeclass
rule: self.all(k, k !='karpenter.k8s.alibabacloud/ecsnodeclass')
vSwitchSelectionPolicy:
default: cheapest
description: VSwitchSelectionPolicy is the policy to select the vSwitch.
enum:
- balanced
- cheapest
type: string
vSwitchSelectorTerms:
description: VSwitchSelectorTerms is a list of or vSwitch selector
terms. The terms are ORed.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/time v0.6.0
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/api v0.146.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231009173412-8bfb1ae86b6c // indirect
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/v1alpha1/ecsnodeclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
VSwitchSelectionPolicyBalanced = "balanced"
)

// ECSNodeClassSpec is the top level specification for the AlibabaCloud Karpenter Provider.
// This will contain the configuration necessary to launch instances in AlibabaCloud.
type ECSNodeClassSpec struct {
Expand All @@ -34,6 +38,10 @@ type ECSNodeClassSpec struct {
// +kubebuilder:validation:MaxItems:=30
// +required
VSwitchSelectorTerms []VSwitchSelectorTerm `json:"vSwitchSelectorTerms" hash:"ignore"`
// VSwitchSelectionPolicy is the policy to select the vSwitch.
// +kubebuilder:validation:Enum:=balanced;cheapest
// +kubebuilder:default:=cheapest
VSwitchSelectionPolicy string `json:"vSwitchSelectionPolicy,omitempty"`
// SecurityGroupSelectorTerms is a list of or security group selector terms. The terms are ORed.
// +kubebuilder:validation:XValidation:message="securityGroupSelectorTerms cannot be empty",rule="self.size() != 0"
// +kubebuilder:validation:XValidation:message="expected at least one, got none, ['tags', 'id', 'name']",rule="self.all(x, has(x.tags) || has(x.id) || has(x.name))"
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/interruption"
nodeclaimgarbagecollection "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimtagging "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/tagging"
nodeclaimunregisteredtaint "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/unregisteredtaint"
nodeclasshash "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/hash"
nodeclaasstatus "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/status"
nodeclasstermination "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/termination"
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, clk clock.Clock, r
nodeclasstermination.NewController(kubeClient, recorder),
controllerspricing.NewController(pricingProvider),
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
nodeclaimunregisteredtaint.NewController(kubeClient),
nodeclaimtagging.NewController(kubeClient, instanceProvider),
providersinstancetype.NewController(instanceTypeProvider),
}
Expand Down
99 changes: 99 additions & 0 deletions pkg/controllers/nodeclaim/unregisteredtaint/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package unregisteredtaint

/*
Copyright 2024 The CloudPilot AI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
)

// Controller is used to delete the unregistered taint when the node is ready
type Controller struct {
kubeClient client.Client
}

func NewController(kubeClient client.Client) *Controller {
return &Controller{
kubeClient: kubeClient,
}
}

func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
logger := log.FromContext(ctx)
// get all nodes
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList, client.HasLabels{v1.NodeRegisteredLabelKey}); err != nil {
return reconcile.Result{}, err
}
for i := range nodeList.Items {
node := &nodeList.Items[i]
if !hasUnregisteredTaint(node) || !isNodeReady(node) {
continue
}

nodeCopy := node.DeepCopy()
// remove the unregistered taint
nodeCopy.Spec.Taints = lo.Reject(nodeCopy.Spec.Taints, func(item corev1.Taint, index int) bool {
return item.MatchTaint(&v1.UnregisteredNoExecuteTaint)
})

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return c.kubeClient.Patch(ctx, nodeCopy, client.MergeFromWithOptions(node, client.MergeFromWithOptimisticLock{}))
}); err != nil {
logger.Error(err, "failed to remove unregistered taint", "node", node.Name)
continue
}
logger.Info("removed unregistered taint from node", "node", node.Name)
}

// check again every minute
return reconcile.Result{RequeueAfter: time.Minute}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.unregisteredtaint").
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}

func hasUnregisteredTaint(node *corev1.Node) bool {
_, has := lo.Find(node.Spec.Taints, func(item corev1.Taint) bool {
return item.Key == v1.UnregisteredTaintKey
})
return has
}

func isNodeReady(node *corev1.Node) bool {
_, ready := lo.Find(node.Status.Conditions, func(item corev1.NodeCondition) bool {
return item.Type == corev1.NodeReady &&
item.Status == corev1.ConditionTrue &&
time.Since(item.LastTransitionTime.Time) > time.Second*15
})

return ready
}
1 change: 1 addition & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
ackProvider := ack.NewDefaultProvider(clusterID, ackClient, cache.New(alicache.ClusterAttachScriptTTL, alicache.DefaultCleanupInterval))

instanceProvider := instance.NewDefaultProvider(
ctx,
region,
ecsClient,
imageResolver,
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Options struct {
VMMemoryOverheadPercent float64
Interruption bool
TelemetryShare bool
APGCreationQPS int
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -48,6 +49,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.065), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.BoolVar(&o.Interruption, "interruption", env.WithDefaultBool("INTERRUPTION", true), "Enable interruption handling.")
fs.BoolVar(&o.TelemetryShare, "telemetry-share", env.WithDefaultBool("TELEMETRY_SHARE", true), "Enable telemetry sharing.")
fs.IntVar(&o.APGCreationQPS, "apg-qps", int(env.WithDefaultInt64("APG_CREATION_QPS", 100)), "The QPS limit for creating AutoProvisionGroup.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
39 changes: 27 additions & 12 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package instance

import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"math/big"
"net/http"
"strings"
"time"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"go.uber.org/multierr"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -69,16 +72,17 @@ type DefaultProvider struct {
imageFamilyResolver imagefamily.Resolver
vSwitchProvider vswitch.Provider
ackProvider ack.Provider
createLimiter *rate.Limiter
}

func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
func NewDefaultProvider(ctx context.Context, region string, ecsClient *ecsclient.Client,
imageFamilyResolver imagefamily.Resolver, vSwitchProvider vswitch.Provider,
ackProvider ack.Provider) *DefaultProvider {
p := &DefaultProvider{
ecsClient: ecsClient,
region: region,
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),

ecsClient: ecsClient,
region: region,
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),
createLimiter: rate.NewLimiter(rate.Limit(1), options.FromContext(ctx).APGCreationQPS),
imageFamilyResolver: imageFamilyResolver,
vSwitchProvider: vSwitchProvider,
ackProvider: ackProvider,
Expand All @@ -90,6 +94,11 @@ func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim,
instanceTypes []*cloudprovider.InstanceType,
) (*Instance, error) {
// Wait for rate limiter
if err := p.createLimiter.Wait(ctx); err != nil {
log.FromContext(ctx).Error(err, "rate limit exceeded")
return nil, fmt.Errorf("rate limit exceeded: %w", err)
}
schedulingRequirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
// Only filter the instances if there are no minValues in the requirement.
if !schedulingRequirements.HasMinValues() {
Expand All @@ -107,7 +116,6 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNod

return NewInstanceFromProvisioningGroup(launchInstance, createAutoProvisioningGroupRequest, p.region), nil
}

func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) {
if instance, ok := p.instanceCache.Get(id); ok {
return instance.(*Instance), nil
Expand Down Expand Up @@ -370,6 +378,7 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1alpha
}

runtime := &util.RuntimeOptions{}

resp, err := p.ecsClient.CreateAutoProvisioningGroupWithOptions(createAutoProvisioningGroupRequest, runtime)
if err != nil {
return nil, nil, fmt.Errorf("creating auto provisioning group, %w", err)
Expand Down Expand Up @@ -470,7 +479,7 @@ func (p *DefaultProvider) getProvisioningGroup(ctx context.Context, nodeClass *v
break
}

vSwitchID := p.getVSwitchID(instanceType, zonalVSwitchs, requirements, capacityType)
vSwitchID := p.getVSwitchID(instanceType, zonalVSwitchs, requirements, capacityType, nodeClass.Spec.VSwitchSelectionPolicy)
if vSwitchID == "" {
return nil, errors.New("vSwitchID not found")
}
Expand Down Expand Up @@ -570,23 +579,29 @@ func (p *DefaultProvider) checkODFallback(nodeClaim *karpv1.NodeClaim, instanceT
}

func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType,
zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements, capacityType string) string {
zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements, capacityType string, vSwitchSelectionPolicy string) string {
cheapestVSwitchID := ""
cheapestPrice := math.MaxFloat64

if capacityType == karpv1.CapacityTypeOnDemand || vSwitchSelectionPolicy == v1alpha1.VSwitchSelectionPolicyBalanced {
// For on-demand, randomly select a zone's vswitch
zoneIDs := lo.Keys(zonalVSwitchs)
if len(zoneIDs) > 0 {
randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(zoneIDs))))
return zonalVSwitchs[zoneIDs[randomIndex.Int64()]].ID
}
}

// For different AZ, the spot price may differ. So we need to get the cheapest vSwitch in the zone
for i := range instanceType.Offerings {
if reqs.Compatible(instanceType.Offerings[i].Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil {
continue
}

vswitch, ok := zonalVSwitchs[instanceType.Offerings[i].Requirements.Get(corev1.LabelTopologyZone).Any()]
if !ok {
continue
}
if capacityType == karpv1.CapacityTypeOnDemand {
return vswitch.ID
}

if instanceType.Offerings[i].Price < cheapestPrice {
cheapestVSwitchID = vswitch.ID
cheapestPrice = instanceType.Offerings[i].Price
Expand Down

0 comments on commit 8398981

Please sign in to comment.