Skip to content

Commit

Permalink
instance use cache
Browse files Browse the repository at this point in the history
  • Loading branch information
helen-frank committed Nov 17, 2024
1 parent c19cbaf commit 8c06079
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
ackProvider := ack.NewDefaultProvider(clusterID, ackClient)

instanceProvider := instance.NewDefaultProvider(
ctx,
region,
ecsClient,
imageResolver,
Expand Down
95 changes: 71 additions & 24 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"math"
"net/http"
"strings"
"time"

ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -48,6 +50,7 @@ const (
// TODO: After that open up the configuration options
instanceTypeFlexibilityThreshold = 5 // falling back to on-demand without flexibility risks insufficient capacity errors
maxInstanceTypes = 20
instanceCacheExpiration = 15 * time.Second
)

type Provider interface {
Expand All @@ -59,25 +62,29 @@ type Provider interface {
}

type DefaultProvider struct {
ecsClient *ecsclient.Client
region string
ecsClient *ecsclient.Client
region string
instanceCache *cache.Cache

imageFamilyResolver imagefamily.Resolver
vSwitchProvider vswitch.Provider
ackProvider ack.Provider
}

func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
func NewDefaultProvider(ctx context.Context, region string, ecsClient *ecsclient.Client,
imageFamilyResolver imagefamily.Resolver, vSwitchProvider vswitch.Provider,
ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
ecsClient: ecsClient,
region: region,
p := &DefaultProvider{
ecsClient: ecsClient,
region: region,
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),

imageFamilyResolver: imageFamilyResolver,
vSwitchProvider: vSwitchProvider,
ackProvider: ackProvider,
}

return p
}

func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim,
Expand All @@ -102,31 +109,23 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNod
}

func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) {
describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
RegionId: tea.String(p.region),
InstanceIds: tea.String("[\"" + id + "\"]"),
if instance, ok := p.instanceCache.Get(id); ok {
return instance.(*Instance), nil
}
runtime := &util.RuntimeOptions{}

resp, err := p.ecsClient.DescribeInstancesWithOptions(describeInstancesRequest, runtime)
// List all instances to update the cache
instances, err := p.List(ctx)
if err != nil {
return nil, err
}

if resp == nil || resp.Body == nil || resp.Body.Instances == nil {
return nil, fmt.Errorf("failed to get instance %s, %s", id, tea.Prettify(resp))
}

// If the instance size is 0, which means it's deleted, return notfound error
if len(resp.Body.Instances.Instance) == 0 {
return nil, cloudprovider.NewNodeClaimNotFoundError(alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s", id)))
}

if len(resp.Body.Instances.Instance) != 1 {
return nil, alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s, got %d", id, len(resp.Body.Instances.Instance)))
p.syncAllInstances(instances)
currentInstance, ok := p.instanceCache.Get(id)
if !ok || currentInstance == nil {
return nil, cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance not found"))
}

return NewInstance(resp.Body.Instances.Instance[0]), nil
return currentInstance.(*Instance), nil
}

func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
Expand Down Expand Up @@ -171,6 +170,8 @@ func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
}
}

p.syncAllInstances(instances)

return instances, nil
}

Expand Down Expand Up @@ -207,6 +208,7 @@ func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
return fmt.Errorf("terminating instance id: %s, %w", id, err)
}

p.instanceCache.Delete(id)
return nil
}

Expand Down Expand Up @@ -234,7 +236,9 @@ func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[st
return fmt.Errorf("tagging instance, %w", err)
}

return nil
_, err := p.syncInstance(id)

return err
}

// filterInstanceTypes is used to provide filtering on the list of potential instance types to further limit it to those
Expand Down Expand Up @@ -586,3 +590,46 @@ type LaunchTemplate struct {
SecurityGroupIds []*string
SystemDisk *v1alpha1.SystemDisk
}

func (p *DefaultProvider) getInstance(id string) (*Instance, error) {
describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
RegionId: tea.String(p.region),
InstanceIds: tea.String("[\"" + id + "\"]"),
}
runtime := &util.RuntimeOptions{}

resp, err := p.ecsClient.DescribeInstancesWithOptions(describeInstancesRequest, runtime)
if err != nil {
return nil, err
}

if resp == nil || resp.Body == nil || resp.Body.Instances == nil {
return nil, fmt.Errorf("failed to get instance %s, %s", id, tea.Prettify(resp))
}

// If the instance size is 0, which means it's deleted, return notfound error
if len(resp.Body.Instances.Instance) == 0 {
return nil, cloudprovider.NewNodeClaimNotFoundError(alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s", id)))
}

if len(resp.Body.Instances.Instance) != 1 {
return nil, alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s, got %d", id, len(resp.Body.Instances.Instance)))
}

return NewInstance(resp.Body.Instances.Instance[0]), nil
}

func (p *DefaultProvider) syncInstance(id string) (*Instance, error) {
instance, err := p.getInstance(id)
if err != nil {
return nil, err
}
p.instanceCache.Set(id, instance, cache.DefaultExpiration)
return instance, nil
}

func (p *DefaultProvider) syncAllInstances(instances []*Instance) {
for _, instance := range instances {
p.instanceCache.Set(instance.ID, instance, cache.DefaultExpiration)
}
}

0 comments on commit 8c06079

Please sign in to comment.