Skip to content

Commit

Permalink
Merge pull request #155 from jwcesign/main
Browse files Browse the repository at this point in the history
fix: calculate the overhead with the maximum value
  • Loading branch information
jwcesign authored Dec 2, 2024
2 parents 4eea3de + 9050754 commit 9a44e87
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 68 deletions.
2 changes: 1 addition & 1 deletion charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ controller:
# -- The external kubernetes cluster id for new nodes to connect with.
clusterID: ""
# -- The VM memory overhead as a percent that will be subtracted from the total memory for all instance types. The value of `0.075` equals to 7.5%.
vmMemoryOverheadPercent: 0.075
vmMemoryOverheadPercent: 0.065
# -- The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one
# time which usually results in fewer but larger nodes.
batchMaxDuration: 10s
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont

unavailableOfferingsCache := alicache.NewUnavailableOfferings()
instanceTypeProvider := instancetype.NewDefaultProvider(
*ecsClient.RegionId, ecsClient,
*ecsClient.RegionId, operator.GetClient(), ecsClient,
cache.New(alicache.InstanceTypesAndZonesTTL, alicache.DefaultCleanupInterval),
unavailableOfferingsCache,
pricingProvider, ackProvider)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Options struct {
func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.ClusterID, "cluster-id", env.WithDefaultString("CLUSTER_ID", ""), "The external kubernetes cluster id for new nodes to connect with.")
// TODO: for different OS, the overhead is different, find a way to fix this.
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.065), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.BoolVar(&o.Interruption, "interruption", env.WithDefaultBool("INTERRUPTION", true), "Enable interruption handling.")
fs.BoolVar(&o.TelemetryShare, "telemetry-share", env.WithDefaultBool("TELEMETRY_SHARE", true), "Enable telemetry sharing.")
fs.IntVar(&o.APGCreationQPS, "apg-qps", int(env.WithDefaultInt64("APG_CREATION_QPS", 100)), "The QPS limit for creating AutoProvisionGroup.")
Expand Down
44 changes: 42 additions & 2 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand All @@ -52,6 +54,7 @@ type Provider interface {

type DefaultProvider struct {
region string
kubeClient client.Client
ecsClient *ecsclient.Client
pricingProvider pricing.Provider
ackProvider ack.Provider
Expand All @@ -77,10 +80,11 @@ type DefaultProvider struct {
instanceTypesOfferingsSeqNum uint64
}

func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
func NewDefaultProvider(region string, kubeClient client.Client, ecsClient *ecsclient.Client,
instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings,
pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
Expand Down Expand Up @@ -175,6 +179,11 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
return nil, fmt.Errorf("failed to get cluster CNI: %w", err)
}

nodeResourceOverhead, err := p.nodeOverhead(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get node resource overhead: %w", err)
}

result := lo.Map(p.instanceTypesInfo, func(i *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType, _ int) *cloudprovider.InstanceType {
zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData {
if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) {
Expand All @@ -194,7 +203,7 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
// so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types
// !!! Important !!!
offers := p.createOfferings(ctx, *i.InstanceTypeId, zoneData)
return NewInstanceType(ctx, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, clusterCNI)
return NewInstanceType(ctx, nodeResourceOverhead, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, clusterCNI)
})

// Filter out nil values
Expand All @@ -204,6 +213,37 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
return result, nil
}

func (p *DefaultProvider) nodeOverhead(ctx context.Context) (corev1.ResourceList, error) {
var nodes corev1.NodeList
if err := p.kubeClient.List(ctx, &nodes); err != nil {
return corev1.ResourceList{}, err
}

// We do not sure how to calculate the overhead of the node, let's just use the maximum possible
// To avoid some loop node creation
maxCPUOverHead := int64(0)
maxMemoryOverHead := int64(0)
for _, node := range nodes.Items {
capacity := node.Status.Capacity
allocatable := node.Status.Allocatable

cpuOverHead := capacity.Cpu().MilliValue() - allocatable.Cpu().MilliValue()
memoryOverHead := capacity.Memory().Value() - allocatable.Memory().Value()

if cpuOverHead > maxCPUOverHead {
maxCPUOverHead = cpuOverHead
}
if memoryOverHead > maxMemoryOverHead {
maxMemoryOverHead = memoryOverHead
}
}

return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(maxCPUOverHead, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(maxMemoryOverHead, resource.DecimalSI),
}, nil
}

func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypesOfferings do not result in cache misses and multiple
Expand Down
68 changes: 5 additions & 63 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ var (
)

const (
MemoryAvailable = "memory.available"
NodeFSAvailable = "nodefs.available"

GiBMiBRatio = 1024
MiBByteRatio = 1024 * 1024
TerwayMinENIRequirements = 11
Expand All @@ -60,7 +57,7 @@ type ZoneData struct {
Available bool
}

func NewInstanceType(ctx context.Context,
func NewInstanceType(ctx context.Context, overhead corev1.ResourceList,
info *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType,
kc *v1alpha1.KubeletConfiguration, region string, systemDisk *v1alpha1.SystemDisk,
offerings cloudprovider.Offerings, clusterCNI string) *cloudprovider.InstanceType {
Expand All @@ -74,9 +71,10 @@ func NewInstanceType(ctx context.Context,
Offerings: offerings,
Capacity: computeCapacity(ctx, info, kc.MaxPods, kc.PodsPerCore, systemDisk, clusterCNI),
Overhead: &cloudprovider.InstanceTypeOverhead{
KubeReserved: kubeReservedResources(kc.KubeReserved),
SystemReserved: systemReservedResources(kc.SystemReserved),
EvictionThreshold: evictionThreshold(memory(ctx, info), ephemeralStorage(systemDisk), kc.EvictionHard, kc.EvictionSoft),
// Follow overhead will be merged, so we can set only one overhead totally
KubeReserved: overhead,
SystemReserved: corev1.ResourceList{},
EvictionThreshold: corev1.ResourceList{},
},
}
if it.Requirements.Compatible(scheduling.NewRequirements(scheduling.NewRequirement(corev1.LabelOSStable, corev1.NodeSelectorOpIn, string(corev1.Windows)))) == nil {
Expand Down Expand Up @@ -178,62 +176,6 @@ func computeCapacity(ctx context.Context,
return resourceList
}

func kubeReservedResources(kubeReserved map[string]string) corev1.ResourceList {
resources := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("447Mi"),
corev1.ResourceCPU: resource.MustParse("35m"),
}

return lo.Assign(resources, lo.MapEntries(kubeReserved, func(k string, v string) (corev1.ResourceName, resource.Quantity) {
return corev1.ResourceName(k), resource.MustParse(v)
}))
}

func systemReservedResources(systemReserved map[string]string) corev1.ResourceList {
resources := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("447Mi"),
corev1.ResourceCPU: resource.MustParse("35m"),
}

return lo.Assign(resources, lo.MapEntries(systemReserved, func(k string, v string) (corev1.ResourceName, resource.Quantity) {
return corev1.ResourceName(k), resource.MustParse(v)
}))
}

func evictionThreshold(memory *resource.Quantity, storage *resource.Quantity, evictionHard map[string]string, evictionSoft map[string]string) corev1.ResourceList {
overhead := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("300Mi"),
}

override := corev1.ResourceList{}
var evictionSignals []map[string]string
if evictionHard != nil {
evictionSignals = append(evictionSignals, evictionHard)
}
if evictionSoft != nil {
evictionSignals = append(evictionSignals, evictionSoft)
}

for _, m := range evictionSignals {
temp := corev1.ResourceList{}
if v, ok := m[MemoryAvailable]; ok {
temp[corev1.ResourceMemory] = computeEvictionSignal(*memory, v)
}
if v, ok := m[NodeFSAvailable]; ok {
temp[corev1.ResourceEphemeralStorage] = computeEvictionSignal(*storage, v)
}
override = resources.MaxResources(override, temp)
}
// Assign merges maps from left to right so overrides will always be taken last
return lo.Assign(overhead, override)
}

// computeEvictionSignal computes the resource quantity value for an eviction signal value, computed off the
// base capacity value if the signal value is a percentage or as a resource quantity if the signal value isn't a percentage
func computeEvictionSignal(capacity resource.Quantity, signalValue string) resource.Quantity {
Expand Down

0 comments on commit 9a44e87

Please sign in to comment.