Skip to content

Commit

Permalink
feat: support price provider
Browse files Browse the repository at this point in the history
Signed-off-by: jwcesign <[email protected]>
  • Loading branch information
jwcesign committed Oct 10, 2024
1 parent de5846d commit 1f7d61e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 29 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ require (
github.com/alibabacloud-go/vpc-20160428/v6 v6.10.4
github.com/aliyun/aliyun-cli v0.0.0-20240925084117-158a70e275f0
github.com/awslabs/operatorpkg v0.0.0-20240805231134-67d0acfb6306
github.com/cloudpilot-ai/priceserver v0.0.0-20241009154424-d3cb0d822cab
github.com/cloudpilot-ai/priceserver v0.0.0-20241010082453-c5519264f64c
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/samber/lo v1.47.0
github.com/stretchr/testify v1.9.0
go.uber.org/multierr v1.11.0
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -76,6 +77,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/clbanning/mxj/v2 v2.5.5 h1:oT81vUeEiQQ/DcHbzSytRngP6Ky9O+L+0Bw0zSJag9E=
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudpilot-ai/priceserver v0.0.0-20241009125028-12be3a49e1fd h1:baHLpb7l82irpEwq08ThLZiBHNn3jVp5fWDW122mVU4=
github.com/cloudpilot-ai/priceserver v0.0.0-20241009125028-12be3a49e1fd/go.mod h1:NORch8twW9a+z2hSHYz/iKKGSQmts913GsveSHQbnRM=
github.com/cloudpilot-ai/priceserver v0.0.0-20241009154424-d3cb0d822cab h1:VA7R4Z5wS/Kz4Cu1kMflsrJmS7SzskeQGPW8tv82Ckg=
github.com/cloudpilot-ai/priceserver v0.0.0-20241009154424-d3cb0d822cab/go.mod h1:NORch8twW9a+z2hSHYz/iKKGSQmts913GsveSHQbnRM=
github.com/cloudpilot-ai/priceserver v0.0.0-20241010082453-c5519264f64c h1:XrN2e9OpnHVWhdcJo6pehOrjU0qgg3I8tGd0H6f8FXU=
github.com/cloudpilot-ai/priceserver v0.0.0-20241010082453-c5519264f64c/go.mod h1:NORch8twW9a+z2hSHYz/iKKGSQmts913GsveSHQbnRM=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
os.Exit(1)
}

pricingProvider := pricing.NewDefaultProvider(
ctx,
ecsClient,
*ecsClient.RegionId,
)
pricingProvider, err := pricing.NewDefaultProvider(ctx, *ecsClient.RegionId)
if err != nil {
log.FromContext(ctx).Error(err, "Failed to create pricing provider")
os.Exit(1)
}

instanceProvider := instance.NewDefaultProvider(
ctx,
Expand Down
118 changes: 99 additions & 19 deletions pkg/providers/pricing/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ package pricing
import (
"context"
_ "embed"
"fmt"
"net/http"
"sync"
"time"

ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
"github.com/cloudpilot-ai/priceserver/pkg/apis"
"github.com/cloudpilot-ai/priceserver/pkg/tools"
"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

utilsobject "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils/object"
)

var (
//go:embed initial-on-demand-prices.json
initialOnDemandPricesData []byte

initialOnDemandPrices = *utilsobject.JSONUnmarshal[map[string]map[string]float64](initialOnDemandPricesData)
)
//go:embed initial-on-demand-prices.json
var initialOnDemandPricesData []byte

const defaultRegion = "cn-qingdao"

Expand All @@ -47,15 +47,17 @@ type Provider interface {
UpdateSpotPricing(context.Context) error
}

// DefaultProvider provides actual pricing data to the Ali cloud provider to allow it to make more informed decisions
// regarding which instances to launch. This is initialized at startup with a periodically updated static price list to
// DefaultProvider provides actual pricing data to the AlibabaCloud provider to allow it to make more informed decisions
// regarding which instances to launch. This is initialized at startup with a periodically updated static price list to
// support running in locations where pricing data is unavailable. In those cases the static pricing data provides a
// relative ordering that is still more accurate than our previous pricing model. In the event that a pricing update
// fails, the previous pricing information is retained and used which may be the static initial pricing data if pricing
// updates never succeed.
type DefaultProvider struct {
ecsClient *ecsclient.Client
// pricing pricingiface.PricingAPI
muPriceLastUpdatedTimestamp sync.RWMutex
priceLastUpdatedTimestamp time.Time
alibabaCloudPriceClient tools.QueryClientInterface

region string
cm *pretty.ChangeMonitor

Expand Down Expand Up @@ -84,17 +86,30 @@ func newZonalPricing(defaultPrice float64) zonal {
return z
}

func NewDefaultProvider(_ context.Context, ecsClient *ecsclient.Client, region string) *DefaultProvider {
const (
// defaultPriceQueryClient defines the default query endpoint
// we do not use the alibabacloud sdk because it has a rate limit which not satisfied with our request frequency
// you can build your own query server with repo: https://github.com/cloudpilot-ai/priceserver
// TODO: update to production endpoint latter
defaultPriceQueryEndpoint = "https://pre-price.cloudpilot.ai"
)

func NewDefaultProvider(ctx context.Context, region string) (*DefaultProvider, error) {
queryClient, err := tools.NewQueryClient(defaultPriceQueryEndpoint, tools.AlibabaCloudProvider, region)
if err != nil {
log.FromContext(ctx).Error(err, "unable to create query client")
return nil, err
}
p := &DefaultProvider{
region: region,
ecsClient: ecsClient,
region: region,
alibabaCloudPriceClient: queryClient,

cm: pretty.NewChangeMonitor(),
}
// sets the pricing data from the static default state for the provider
p.Reset()

return p
return p, nil
}

// InstanceTypes returns the list of all instance types for which either a spot or on-demand price is known.
Expand Down Expand Up @@ -147,13 +162,16 @@ func (p *DefaultProvider) LivenessProbe(_ *http.Request) error {
// ensure we don't deadlock and nolint for the empty critical section
p.muOnDemand.Lock()
p.muSpot.Lock()
p.muPriceLastUpdatedTimestamp.Lock()
//nolint: staticcheck
p.muOnDemand.Unlock()
p.muSpot.Unlock()
p.muPriceLastUpdatedTimestamp.Unlock()
return nil
}

func (p *DefaultProvider) Reset() {
initialOnDemandPrices := *utilsobject.JSONUnmarshal[map[string]map[string]float64](initialOnDemandPricesData)
// see if we've got region specific pricing data
staticPricing, ok := initialOnDemandPrices[p.region]
if !ok {
Expand All @@ -167,13 +185,75 @@ func (p *DefaultProvider) Reset() {
p.spotPricingUpdated = false
}

func (p *DefaultProvider) UpdateOnDemandPricing(context.Context) error {
func (p *DefaultProvider) UpdateOnDemandPricing(ctx context.Context) error {
if err := p.syncPricingData(ctx); err != nil {
return err
}

prices := p.alibabaCloudPriceClient.ListInstancesDetails(p.region)
if prices == nil || len(prices.InstanceTypePrices) == 0 {
err := fmt.Errorf("no price info available for region %s", p.region)
log.FromContext(ctx).Error(err, "failed to get on-demand pricing data from alibaba cloud")
return err
}

p.muOnDemand.Lock()
defer p.muOnDemand.Unlock()
p.onDemandPrices = lo.MapEntries(prices.InstanceTypePrices, func(key string, value *apis.InstanceTypePrice) (string, float64) {
return key, value.OnDemandPricePerHour
})

return nil
}
func (p *DefaultProvider) UpdateSpotPricing(ctx context.Context) error {
if err := p.syncPricingData(ctx); err != nil {
return err
}

prices := p.alibabaCloudPriceClient.ListInstancesDetails(p.region)
if prices == nil || len(prices.InstanceTypePrices) == 0 {
err := fmt.Errorf("no price info available for region %s", p.region)
log.FromContext(ctx).Error(err, "failed to get spot pricing data from alibaba cloud")
return err
}

totalOfferings := 0
p.muSpot.Lock()
defer p.muSpot.Unlock()
for instanceType, priceInfo := range prices.InstanceTypePrices {
if _, ok := p.spotPrices[instanceType]; !ok {
p.spotPrices[instanceType] = newZonalPricing(0)
}
for zone, price := range priceInfo.SpotPricePerHour {
p.spotPrices[instanceType].prices[zone] = price
}
totalOfferings += len(priceInfo.SpotPricePerHour)
}

p.spotPricingUpdated = true
if p.cm.HasChanged("spot-prices", p.spotPrices) {
log.FromContext(ctx).WithValues(
"instance-type-count", len(p.onDemandPrices),
"offering-count", totalOfferings).V(1).Info("updated spot pricing with instance types and offerings")
}

// TODO: implement me
return nil
}
func (p *DefaultProvider) UpdateSpotPricing(context.Context) error {

// TODO: implement me
func (p *DefaultProvider) syncPricingData(ctx context.Context) error {
p.muPriceLastUpdatedTimestamp.Lock()
lastUpdatedTime := p.priceLastUpdatedTimestamp
p.muPriceLastUpdatedTimestamp.Unlock()

if lastUpdatedTime.Add(time.Minute * 5).Before(time.Now()) {
if err := p.alibabaCloudPriceClient.Sync(); err != nil {
log.FromContext(ctx).Error(err, "failed to sync pricing data from alibaba cloud")
return err
}
p.muPriceLastUpdatedTimestamp.Lock()
p.priceLastUpdatedTimestamp = time.Now()
p.muPriceLastUpdatedTimestamp.Unlock()
}

return nil
}
68 changes: 68 additions & 0 deletions pkg/providers/pricing/pricing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2024 The CloudPilot AI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pricing

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewDefaultProvider(t *testing.T) {
type args struct {
region string
instanceType string
}

tests := []struct {
name string
args args
}{
{
name: "test cn-beijing region with od and spot price",
args: args{
region: "cn-beijing",
instanceType: "ecs.i2.xlarge",
},
},
{
name: "test us-east-1 region with od and spot price",
args: args{
region: "us-east-1",
instanceType: "ecs.i2.xlarge",
},
},
}

ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider, err := NewDefaultProvider(ctx, tt.args.region)
assert.NoError(t, err)
assert.NoError(t, provider.UpdateSpotPricing(ctx))
assert.NoError(t, provider.UpdateOnDemandPricing(ctx))

assert.NotZero(t, provider.onDemandPrices[tt.args.instanceType])
for _, info := range provider.spotPrices {
for _, v := range info.prices {
assert.NotZero(t, v)
}
}
})
}
}

0 comments on commit 1f7d61e

Please sign in to comment.