From ee5efa908e1343c5674012986c06d81644fb7e79 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 26 Jun 2020 17:03:57 +0200 Subject: [PATCH] storage capacity: initial implementation This is the producer side of KEP https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking. Only deployment together with a central controller is currently implemented. When syncing directly whenever there is a change, there's potentially a larger number of changes emitted. When there are rapid changes (for example, while a driver gets deployed), it may be better to delay processing and thus combine multiple changes in a single sync. --- cmd/csi-provisioner/csi-provisioner.go | 62 +- go.mod | 1 + go.sum | 1 + pkg/capacity/controller.go | 562 +++++++++ pkg/capacity/controller_test.go | 1123 +++++++++++++++++ pkg/capacity/doc.go | 19 + pkg/capacity/features.go | 76 ++ pkg/capacity/features_test.go | 81 ++ pkg/capacity/topology/doc.go | 21 + pkg/capacity/topology/nodes.go | 278 ++++ pkg/capacity/topology/nodes_test.go | 633 ++++++++++ pkg/capacity/topology/topology.go | 129 ++ .../k8s.io/apimachinery/pkg/util/rand/rand.go | 127 ++ vendor/modules.txt | 1 + 14 files changed, 3113 insertions(+), 1 deletion(-) create mode 100644 pkg/capacity/controller.go create mode 100644 pkg/capacity/controller_test.go create mode 100644 pkg/capacity/doc.go create mode 100644 pkg/capacity/features.go create mode 100644 pkg/capacity/features_test.go create mode 100644 pkg/capacity/topology/doc.go create mode 100644 pkg/capacity/topology/nodes.go create mode 100644 pkg/capacity/topology/nodes_test.go create mode 100644 pkg/capacity/topology/topology.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/rand/rand.go diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index fa86788f12..1d0757a1f5 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -26,7 +26,9 @@ import ( "strings" "time" + "github.com/container-storage-interface/spec/lib/go/csi" flag "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -43,6 +45,8 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/deprecatedflags" "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "github.com/kubernetes-csi/csi-lib-utils/metrics" + "github.com/kubernetes-csi/external-provisioner/pkg/capacity" + "github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology" ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller" snapclientset "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned" ) @@ -58,7 +62,8 @@ var ( retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.") retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.") workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.") - finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaniously running threads, handling cloning finalizer removal") + finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal") + capacityThreads = flag.Uint("storage-capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects") operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume") _ = deprecatedflags.Add("provisioner") @@ -76,6 +81,13 @@ var ( kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") + capacityFeatures = func() *capacity.Features { + capacity := &capacity.Features{} + flag.Var(capacity, "enable-capacity", "Enables producing CSIStorageCapacity objects with capacity information the driver's GetCapacity call. Currently supported: -enable-capacity=central.") + return capacity + }() + capacityPollPeriod = flag.Duration("capacity-poll-period", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes. Defaults to one minute.") + featureGates map[string]bool provisionController *controller.ProvisionController version = "unknown" @@ -266,6 +278,51 @@ func main() { controllerCapabilities, ) + var capacityController *capacity.Controller + if (*capacityFeatures)[capacity.FeatureCentral] { + podName := os.Getenv("POD_NAME") + namespace := os.Getenv("POD_NAMESPACE") + if podName == "" || namespace == "" { + klog.Fatalf("need POD_NAMESPACE/POD_NAME env variables, have only POD_NAMESPACE=%q and POD_NAME=%q", namespace, podName) + } + pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + klog.Fatalf("error getting our own pod: %v", err) + } + var controller *metav1.OwnerReference + for _, owner := range pod.OwnerReferences { + if owner.Controller != nil && *owner.Controller { + controller = &owner + break + } + } + if controller == nil { + klog.Fatal("pod does not have a controller which owns it") + } + + topologyInformer := topology.NewNodeTopology( + provisionerName, + clientset, + factory.Core().V1().Nodes(), + factory.Storage().V1().CSINodes(), + workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"), + ) + + capacityController = capacity.NewCentralCapacityController( + csi.NewControllerClient(grpcClient), + provisionerName, + clientset, + // TODO: metrics for the queue?! + workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"), + *controller, + namespace, + topologyInformer, + factory.Storage().V1().StorageClasses(), + factory.Storage().V1alpha1().CSIStorageCapacities(), + *capacityPollPeriod, + ) + } + run := func(ctx context.Context) { factory.Start(ctx.Done()) cacheSyncResult := factory.WaitForCacheSync(ctx.Done()) @@ -275,6 +332,9 @@ func main() { } } + if capacityController != nil { + go capacityController.Run(ctx, int(*capacityThreads)) + } if csiClaimController != nil { go csiClaimController.Run(ctx, int(*finalizerThreads)) } diff --git a/go.mod b/go.mod index fb7a96a119..7f542d23d1 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( k8s.io/component-base v0.19.0-rc.2 k8s.io/csi-translation-lib v0.19.0-rc.2 k8s.io/klog v1.0.0 + k8s.io/klog/v2 v2.2.0 k8s.io/kubernetes v1.19.0-rc.2 sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1 ) diff --git a/go.sum b/go.sum index 981f2d8076..2f1fd5f7f9 100644 --- a/go.sum +++ b/go.sum @@ -555,6 +555,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345 h1:2gOG36vt1BhUqpzxwZLZJxUim2dHB05vw+RAn4Q6YOU= go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345/go.mod h1:skWido08r9w6Lq/w70DO5XYIKMu4QFu1+4VsqLQuJy8= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/pkg/capacity/controller.go b/pkg/capacity/controller.go new file mode 100644 index 0000000000..b6f9649864 --- /dev/null +++ b/pkg/capacity/controller.go @@ -0,0 +1,562 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package capacity contains the code which controls the CSIStorageCapacity +// objects owned by the external-provisioner. +package capacity + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + // "github.com/kubernetes-csi/csi-lib-utils/rpc" + // v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + storagev1alpha1 "k8s.io/api/storage/v1alpha1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + storageinformersv1 "k8s.io/client-go/informers/storage/v1" + storageinformersv1alpha1 "k8s.io/client-go/informers/storage/v1alpha1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + // corelisters "k8s.io/client-go/listers/core/v1" + // "k8s.io/client-go/tools/cache" + "github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +// Controller creates and updates CSIStorageCapacity objects. It +// deletes those which are no longer needed because their storage +// class or topology segment are gone. The controller only manages +// those CSIStorageCapacity objects that are owned by a certain +// entity. +// +// The controller maintains a set of topology segments (= NodeSelector +// pointers). Work items are a combination of such a pointer and a +// pointer to a storage class. These keys are mapped to the +// corresponding CSIStorageCapacity object, if one exists. +// +// When processing a work item, the controller first checks whether +// the topology segment and storage class still exist. If not, +// the CSIStorageCapacity object gets deleted. Otherwise, it gets updated +// or created. +// +// New work items are queued for processing when the reconiliation loop +// finds differences, periodically (to refresh existing items) and when +// capacity is expected to have changed. +// +// The work queue is also used to delete duplicate CSIStorageCapacity objects, +// i.e. those that for some reason have the same topology segment +// and storage class name as some other object. That should never happen, +// but the controller is prepared to clean that up, just in case. +type Controller struct { + csiController CSICapacityClient + driverName string + client kubernetes.Interface + queue workqueue.RateLimitingInterface + owner metav1.OwnerReference + ownerNamespace string + topologyInformer topology.Informer + scInformer storageinformersv1.StorageClassInformer + cInformer storageinformersv1alpha1.CSIStorageCapacityInformer + pollPeriod time.Duration + + // capacities contains one entry for each object that is supposed + // to exist. + capacities map[workItem]*storagev1alpha1.CSIStorageCapacity + capacitiesLock sync.Mutex +} + +type workItem struct { + segment *topology.Segment + storageClassName string +} + +var ( + // Defines parameters for ExponentialBackoff used while starting up + // and listing CSIStorageCapacity objects. + listCSIStorageCapacityBackoff = wait.Backoff{ + Duration: time.Second * 5, + Factor: 1.1, + Steps: 10, + } +) + +// CSICapacityClient is the relevant subset of csi.ControllerClient. +type CSICapacityClient interface { + GetCapacity(ctx context.Context, in *csi.GetCapacityRequest, opts ...grpc.CallOption) (*csi.GetCapacityResponse, error) +} + +// NewController creates a new controller for CSIStorageCapacity objects. +func NewCentralCapacityController( + csiController CSICapacityClient, + driverName string, + client kubernetes.Interface, + queue workqueue.RateLimitingInterface, + owner metav1.OwnerReference, + ownerNamespace string, + topologyInformer topology.Informer, + scInformer storageinformersv1.StorageClassInformer, + cInformer storageinformersv1alpha1.CSIStorageCapacityInformer, + pollPeriod time.Duration, +) *Controller { + c := &Controller{ + csiController: csiController, + driverName: driverName, + client: client, + queue: queue, + owner: owner, + ownerNamespace: ownerNamespace, + topologyInformer: topologyInformer, + scInformer: scInformer, + cInformer: cInformer, + pollPeriod: pollPeriod, + capacities: map[workItem]*storagev1alpha1.CSIStorageCapacity{}, + } + + // Now register for changes. Depending on the implementation of the informers, + // this may already invoke callbacks. + handler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.onSCAddOrUpdate(obj.(*storagev1.StorageClass)) }, + UpdateFunc: func(_ interface{}, newObj interface{}) { c.onSCAddOrUpdate(newObj.(*storagev1.StorageClass)) }, + DeleteFunc: func(obj interface{}) { c.onSCDelete(obj.(*storagev1.StorageClass)) }, + } + c.scInformer.Informer().AddEventHandler(handler) + c.topologyInformer.AddCallback(c.onTopologyChanges) + + // We don't want the callbacks yet, but need to ensure that + // the informer controller is instantiated before the caller + // starts the factory. + cInformer.Informer() + + return c +} + +// Run is a main Controller handler +func (c *Controller) Run(ctx context.Context, threadiness int) { + klog.Info("Starting Capacity Controller") + defer c.queue.ShutDown() + go c.scInformer.Informer().Run(ctx.Done()) + go c.topologyInformer.Run(ctx) + + c.prepare(ctx) + for i := 0; i < threadiness; i++ { + go wait.UntilWithContext(ctx, func(ctx context.Context) { + c.runWorker(ctx) + }, time.Second) + } + + go wait.UntilWithContext(ctx, func(ctx context.Context) { c.pollCapacities() }, c.pollPeriod) + + klog.Info("Started Capacity Controller") + <-ctx.Done() + klog.Info("Shutting down Capacity Controller") +} + +func (c *Controller) prepare(ctx context.Context) { + // Wait for topology and storage class informer sync. Once we have that, + // we know which CSIStorageCapacity objects we need. + if !cache.WaitForCacheSync(ctx.Done(), c.topologyInformer.HasSynced, c.scInformer.Informer().HasSynced, c.cInformer.Informer().HasSynced) { + return + } + klog.V(3).Infof("Initial number of topology segments %d, storage classes %d, potential CSIStorageCapacity objects %d", + len(c.topologyInformer.List()), + func() int { + scs, _ := c.scInformer.Lister().List(labels.Everything()) + return len(scs) + }(), + len(c.capacities)) + + // Now that we know what we need, we can check what we have. + // We do that both via callbacks *and* by iterating over all + // objects: callbacks handle future updates and iterating + // avoids the assumumption that the callback will be invoked + // for all objects immediately when adding it. + klog.V(3).Info("Checking for existing CSIStorageCapacity objects") + handler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.onCAddOrUpdate(ctx, obj.(*storagev1alpha1.CSIStorageCapacity)) }, + UpdateFunc: func(_ interface{}, newObj interface{}) { + c.onCAddOrUpdate(ctx, newObj.(*storagev1alpha1.CSIStorageCapacity)) + }, + DeleteFunc: func(obj interface{}) { c.onCDelete(ctx, obj.(*storagev1alpha1.CSIStorageCapacity)) }, + } + c.cInformer.Informer().AddEventHandler(handler) + capacities, err := c.cInformer.Lister().List(labels.Everything()) + if err != nil { + // This shouldn't happen. + utilruntime.HandleError(err) + return + } + for _, capacity := range capacities { + c.onCAddOrUpdate(ctx, capacity) + } + + // Now that we have seen all existing objects, we are done + // with the preparation and can let our caller start + // processing work items. +} + +// onTopologyChanges is called by the topology informer. +func (c *Controller) onTopologyChanges(added []*topology.Segment, removed []*topology.Segment) { + klog.V(3).Infof("Capacity Controller: topology changed: added %v, removed %v", added, removed) + + storageclasses, err := c.scInformer.Lister().List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return + } + + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + + for _, sc := range storageclasses { + if sc.Provisioner != c.driverName { + continue + } + for _, segment := range added { + c.addWorkItem(segment, sc) + } + for _, segment := range removed { + c.removeWorkItem(segment, sc) + } + } +} + +// onSCAddOrUpdate is called for add or update events by the storage +// class listener. +func (c *Controller) onSCAddOrUpdate(sc *storagev1.StorageClass) { + if sc.Provisioner != c.driverName { + return + } + + klog.V(3).Infof("Capacity Controller: storage class %s was updated or added", sc.Name) + segments := c.topologyInformer.List() + + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + for _, segment := range segments { + c.addWorkItem(segment, sc) + } +} + +// onSCDelete is called for delete events by the storage class listener. +func (c *Controller) onSCDelete(sc *storagev1.StorageClass) { + if sc.Provisioner != c.driverName { + return + } + + klog.V(3).Infof("Capacity Controller: storage class %s was removed", sc.Name) + segments := c.topologyInformer.List() + + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + for _, segment := range segments { + c.removeWorkItem(segment, sc) + } +} + +// addWorkItem ensures that there is an item in c.capacities. It +// must be called while holding c.capacitiesLock! +func (c *Controller) addWorkItem(segment *topology.Segment, sc *storagev1.StorageClass) { + item := workItem{ + segment: segment, + storageClassName: sc.Name, + } + // Ensure that we have an entry for it... + capacity := c.capacities[item] + c.capacities[item] = capacity + // ... and then tell our workers to update + // or create that capacity object. + klog.V(5).Infof("Capacity Controller: enqueuing %+v", item) + c.queue.Add(item) +} + +// addWorkItem ensures that the item gets removed from c.capacities. It +// must be called while holding c.capacitiesLock! +func (c *Controller) removeWorkItem(segment *topology.Segment, sc *storagev1.StorageClass) { + item := workItem{ + segment: segment, + storageClassName: sc.Name, + } + capacity, found := c.capacities[item] + if !found { + // Already gone or in the queue to be removed. + klog.V(5).Infof("Capacity Controller: %+v already removed", item) + return + } + // Deleting the item will prevent further updates to + // it, in case that it is already in the queue. + delete(c.capacities, item) + + if capacity == nil { + // No object to remove. + klog.V(5).Infof("Capacity Controller: %+v removed, no object", item) + return + } + + // Any capacity object in the queue will be deleted. + klog.V(5).Infof("Capacity Controller: enqueuing CSIStorageCapacity %s for removal", capacity.Name) + c.queue.Add(capacity) +} + +// pollCapacities must be called periodically to detect when the underlying storage capacity has changed. +func (c *Controller) pollCapacities() { + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + + for item := range c.capacities { + klog.V(5).Infof("Capacity Controller: enqueuing %+v for periodic update", item) + c.queue.Add(item) + } +} + +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +// processNextWorkItem processes items from queue. +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + obj, shutdown := c.queue.Get() + if shutdown { + return false + } + + err := func() error { + defer c.queue.Done(obj) + + switch obj := obj.(type) { + case workItem: + return c.syncCapacity(ctx, obj) + case *storagev1alpha1.CSIStorageCapacity: + return c.deleteCapacity(ctx, obj) + default: + klog.Warningf("unexpected work item %#v", obj) + } + + return nil + }() + + if err != nil { + utilruntime.HandleError(err) + klog.Warningf("Retrying %#v after %d failures", obj, c.queue.NumRequeues(obj)) + c.queue.AddRateLimited(obj) + } else { + c.queue.Forget(obj) + } + + return true +} + +// syncCapacity gets the capacity and then updates or creates the object. +func (c *Controller) syncCapacity(ctx context.Context, item workItem) error { + // We lock only while accessing c.capacities, but not during + // the potentially long-running operations. That is okay + // because there is only a single worker per item. In the + // unlikely case that the desired state of the item changes + // while we work on it, we will add or update an obsolete + // object which we then don't store and instead queue for + // removal. + c.capacitiesLock.Lock() + capacity, found := c.capacities[item] + c.capacitiesLock.Unlock() + + klog.V(5).Infof("Capacity Controller: refreshing %+v", item) + if !found { + // The item was removed in the meantime. This can happen when the storage class + // or the topology segment are gone. + klog.V(5).Infof("Capacity Controller: %v became obsolete", item) + return nil + } + + sc, err := c.scInformer.Lister().Get(item.storageClassName) + if err != nil { + if apierrs.IsNotFound(err) { + // Another indication that the value is no + // longer needed. + return nil + } + return fmt.Errorf("retrieve storage class for %+v: %v", item, err) + } + + req := &csi.GetCapacityRequest{ + Parameters: sc.Parameters, + // The assumption is that the capacity is independent of the + // capabilities. The standard makes it mandatory to pass something, + // therefore we pick something rather arbitrarily. + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{}, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_UNKNOWN, + }, + }, + }, + } + if item.segment != nil { + req.AccessibleTopology = &csi.Topology{ + Segments: item.segment.GetLabelMap(), + } + } + resp, err := c.csiController.GetCapacity(ctx, req) + if err != nil { + return fmt.Errorf("CSI GetCapacity for %+v: %v", item, err) + } + + quantity := resource.NewQuantity(resp.AvailableCapacity, resource.BinarySI) + if capacity == nil { + // Create new object. + capacity = &storagev1alpha1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "csisc-", + OwnerReferences: []metav1.OwnerReference{c.owner}, + }, + StorageClassName: item.storageClassName, + NodeTopology: item.segment.GetLabelSelector(), + Capacity: quantity, + } + var err error + klog.V(5).Infof("Capacity Controller: creating new object for %+v, new capacity %v", item, quantity) + capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(c.ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("create CSIStorageCapacity for %+v: %v", item, err) + } + klog.V(5).Infof("Capacity Controller: created %s for %+v with capacity %v", capacity.Name, item, quantity) + } else if capacity.Capacity.Value() == quantity.Value() { + klog.V(5).Infof("Capacity Controller: no need to update %s for %+v, same capacity %v", capacity.Name, item, quantity) + return nil + } else { + // Update existing object. + capacity.Capacity = quantity + var err error + klog.V(5).Infof("Capacity Controller: updating %s for %+v, new capacity %v", capacity.Name, item, quantity) + capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Update(ctx, capacity, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("update CSIStorageCapacity for %+v: %v", item, err) + } + } + + c.capacitiesLock.Lock() + _, found = c.capacities[item] + if found { + // Remember the new or updated object for future updates. + c.capacities[item] = capacity + } else { + klog.V(5).Infof("Capacity Controller: %+v became obsolete during refresh, enqueue %s for deletion", item, capacity.Name) + c.queue.Add(capacity) + } + c.capacitiesLock.Unlock() + + return nil +} + +// deleteCapacity ensures that the object is gone when done. +func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1alpha1.CSIStorageCapacity) error { + klog.V(5).Infof("Capacity Controller: removing CSIStorageCapacity %s", capacity.Name) + err := c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}) + if err != nil && apierrs.IsNotFound(err) { + return nil + } + return err +} + +// syncCSIStorageObject takes a read-only CSIStorageCapacity object +// and either remembers the pointer to it for future updates or +// ensures that it gets deleted if no longer needed. Foreign objects +// are ignored. +func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1alpha1.CSIStorageCapacity) { + if !c.isControlledByUs(capacity.OwnerReferences) { + // Not ours (anymore?). For the unlikely case that someone removed our owner reference, + // we also must remove our reference to the object. + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + for item, capacity2 := range c.capacities { + if capacity2 != nil && capacity2.UID == capacity.UID { + c.capacities[item] = nil + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s owner was modified by someone, enqueueing %v for re-creation", capacity.Name, item) + c.queue.Add(item) + } + } + return + } + + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + for item, capacity2 := range c.capacities { + if capacity2 != nil && capacity2.UID == capacity.UID { + // We already have matched the object. + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s is already known to match %+v", capacity.Name, item) + // If it has a different capacity than our old copy, then someone else must have + // modified the capacity and we need to check the capacity anew. + if capacity2.Capacity.Value() != capacity.Capacity.Value() { + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s was modified by someone, enqueueing %v for fixing", capacity.Name, item) + c.queue.Add(item) + } + // Either way, remember the new object revision to avoid the "conflict" error + // when we try to update the old object. + c.capacities[item] = capacity + return + } + if capacity2 == nil && + item.storageClassName == capacity.StorageClassName && + reflect.DeepEqual(item.segment.GetLabelSelector(), capacity.NodeTopology) { + // This is the capacity object for this particular combination + // of parameters. Reuse it. + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s matches %+v", capacity.Name, item) + c.capacities[item] = capacity + return + } + } + // The CSIStorageCapacity object is obsolete, delete it. + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s is obsolete, enqueue for removal", capacity.Name) + c.queue.Add(capacity) +} + +func (c *Controller) onCDelete(ctx context.Context, capacity *storagev1alpha1.CSIStorageCapacity) { + c.capacitiesLock.Lock() + defer c.capacitiesLock.Unlock() + for item, capacity2 := range c.capacities { + if capacity2 != nil && capacity2.UID == capacity.UID { + // The object is still needed. Someone else must have removed it. + // Re-create it... + klog.V(5).Infof("Capacity Controller: CSIStorageCapacity %s was removed by someone, enqueue %v for re-creation", capacity.Name, item) + c.capacities[item] = nil + c.queue.Add(item) + return + } + } +} + +// isControlledByUs implements the same logic as https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1?tab=doc#IsControlledBy, +// just with the expected owner identified directly with the UID. +func (c *Controller) isControlledByUs(owners []metav1.OwnerReference) bool { + for _, owner := range owners { + if owner.Controller != nil && *owner.Controller && owner.UID == c.owner.UID { + return true + } + } + return false +} diff --git a/pkg/capacity/controller_test.go b/pkg/capacity/controller_test.go new file mode 100644 index 0000000000..13f9f1282f --- /dev/null +++ b/pkg/capacity/controller_test.go @@ -0,0 +1,1123 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacity + +import ( + "context" + "errors" + "fmt" + "reflect" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology" + "google.golang.org/grpc" + storagev1 "k8s.io/api/storage/v1" + storagev1alpha1 "k8s.io/api/storage/v1alpha1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + krand "k8s.io/apimachinery/pkg/util/rand" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +func init() { + klog.InitFlags(nil) +} + +const ( + driverName = "test-driver" + ownerNamespace = "testns" + csiscRev = "CSISC-REV-" +) + +var ( + yes = true + defaultOwner = metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "statefulset", + Name: "test-driver", + UID: "309cd460-2d62-4f40-bbcf-b7765aac5a6d", + Controller: &yes, + } + noOwner = metav1.OwnerReference{} + otherOwner = metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "statefulset", + Name: "other-test-driver", + UID: "11111111-2d62-4f40-bbcf-b7765aac5a6d", + Controller: &yes, + } + + layer0 = topology.Segment{ + {Key: "layer0", Value: "foo"}, + } + layer0other = topology.Segment{ + {Key: "layer0", Value: "bar"}, + } + mb = resource.MustParse("1Mi") +) + +// TestCapacityController checks that the controller handles the initial state and +// several different changes at runtime correctly. +func TestController(t *testing.T) { + testcases := map[string]struct { + topology mockTopology + storage mockCapacity + initialSCs []testSC + initialCapacities []testCapacity + expectedCapacities []testCapacity + modify func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) (modifiedExpected []testCapacity, err error) + capacityChange func(ctx context.Context, storage *mockCapacity, expected []testCapacity) (modifiedExpected []testCapacity) + }{ + "empty": {}, + "one segment": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + }, + "one class": { + initialSCs: []testSC{ + { + name: "fast-sc", + driverName: driverName, + }, + }, + }, + "one capacity object": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "reuse one capacity object, no changes": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + initialCapacities: []testCapacity{ + { + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "test-capacity-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "reuse one capacity object, update capacity": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "2Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + initialCapacities: []testCapacity{ + { + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "test-capacity-1", + resourceVersion: csiscRev + "1", + segment: layer0, + storageClassName: "other-sc", + quantity: "2Gi", + }, + }, + }, + "obsolete object, missing SC": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialCapacities: []testCapacity{ + { + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "obsolete object, missing segment": { + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + initialCapacities: []testCapacity{ + { + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "ignore capacity with other owner": { + initialCapacities: []testCapacity{ + { + owner: &otherOwner, + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + expectedCapacities: []testCapacity{ + { + owner: &otherOwner, + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "ignore capacity with no owner": { + initialCapacities: []testCapacity{ + { + owner: &noOwner, + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + expectedCapacities: []testCapacity{ + { + owner: &noOwner, + uid: "test-capacity-1", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + }, + "two segments, two classes, four objects missing": { + topology: mockTopology{ + segments: []*topology.Segment{ + &layer0, + &layer0other, + }, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + "bar": "2Gi", + }, + }, + initialSCs: []testSC{ + { + name: "direct-sc", + driverName: driverName, + }, + { + name: "triple-sc", + driverName: driverName, + parameters: map[string]string{ + mockMultiplier: "3", + }, + }, + }, + expectedCapacities: []testCapacity{ + { + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "direct-sc", + quantity: "1Gi", + }, + { + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "triple-sc", + quantity: "3Gi", + }, + { + resourceVersion: csiscRev + "0", + segment: layer0other, + storageClassName: "direct-sc", + quantity: "2Gi", + }, + { + resourceVersion: csiscRev + "0", + segment: layer0other, + storageClassName: "triple-sc", + quantity: "6Gi", + }, + }, + }, + "two segments, two classes, four objects updated": { + topology: mockTopology{ + segments: []*topology.Segment{ + &layer0, + &layer0other, + }, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + "bar": "2Gi", + }, + }, + initialSCs: []testSC{ + { + name: "direct-sc", + driverName: driverName, + }, + { + name: "triple-sc", + driverName: driverName, + parameters: map[string]string{ + mockMultiplier: "3", + }, + }, + }, + initialCapacities: []testCapacity{ + { + uid: "test-capacity-1", + segment: layer0, + storageClassName: "direct-sc", + quantity: "1Mi", + }, + { + uid: "test-capacity-2", + segment: layer0, + storageClassName: "triple-sc", + quantity: "3Mi", + }, + { + uid: "test-capacity-3", + segment: layer0other, + storageClassName: "direct-sc", + quantity: "2Mi", + }, + { + uid: "test-capacity-4", + segment: layer0other, + storageClassName: "triple-sc", + quantity: "6Mi", + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "test-capacity-1", + resourceVersion: csiscRev + "1", + segment: layer0, + storageClassName: "direct-sc", + quantity: "1Gi", + }, + { + uid: "test-capacity-2", + resourceVersion: csiscRev + "1", + segment: layer0, + storageClassName: "triple-sc", + quantity: "3Gi", + }, + { + uid: "test-capacity-3", + resourceVersion: csiscRev + "1", + segment: layer0other, + storageClassName: "direct-sc", + quantity: "2Gi", + }, + { + uid: "test-capacity-4", + resourceVersion: csiscRev + "1", + segment: layer0other, + storageClassName: "triple-sc", + quantity: "6Gi", + }, + }, + }, + "two segments, two classes, two added, two removed": { + topology: mockTopology{ + segments: []*topology.Segment{ + &layer0, + &layer0other, + }, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + "bar": "2Gi", + }, + }, + initialSCs: []testSC{ + { + name: "direct-sc", + driverName: driverName, + }, + { + name: "triple-sc", + driverName: driverName, + parameters: map[string]string{ + mockMultiplier: "3", + }, + }, + }, + initialCapacities: []testCapacity{ + { + uid: "test-capacity-1", + segment: layer0, + storageClassName: "old-direct-sc", + quantity: "1Mi", + }, + { + uid: "test-capacity-2", + segment: layer0, + storageClassName: "old-triple-sc", + quantity: "3Mi", + }, + { + uid: "test-capacity-3", + segment: layer0other, + storageClassName: "direct-sc", + quantity: "2Mi", + }, + { + uid: "test-capacity-4", + segment: layer0other, + storageClassName: "triple-sc", + quantity: "6Mi", + }, + }, + expectedCapacities: []testCapacity{ + { + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "direct-sc", + quantity: "1Gi", + }, + { + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "triple-sc", + quantity: "3Gi", + }, + { + uid: "test-capacity-3", + resourceVersion: csiscRev + "1", + segment: layer0other, + storageClassName: "direct-sc", + quantity: "2Gi", + }, + { + uid: "test-capacity-4", + resourceVersion: csiscRev + "1", + segment: layer0other, + storageClassName: "triple-sc", + quantity: "6Gi", + }, + }, + }, + "fix modified capacity": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "CSISC-UID-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) { + capacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + capacity := capacities.Items[0] + capacity.Capacity = &mb + if _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Update(ctx, &capacity, metav1.UpdateOptions{}); err != nil { + return nil, err + } + expected[0].resourceVersion = csiscRev + "2" + return expected, nil + }, + }, + "re-create capacity": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "CSISC-UID-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) { + capacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + capacity := capacities.Items[0] + if err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil { + return nil, err + } + expected[0].uid = "CSISC-UID-2" + return expected, nil + }, + }, + "delete redundant capacity": { + modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) { + capacity := makeCapacity(testCapacity{quantity: "1Gi"}) + if _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}); err != nil { + return nil, err + } + return expected, nil + }, + }, + "ignore capacity after owner change": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "CSISC-UID-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) { + capacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + capacity := capacities.Items[0] + // Unset owner. It's not clear why anyone would want to do that, but lets deal with it anyway: + // - the now "foreign" object must be left alone + // - an entry must be created anew + capacity.OwnerReferences = []metav1.OwnerReference{} + if _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Update(ctx, &capacity, metav1.UpdateOptions{}); err != nil { + return nil, err + } + expected[0].owner = &noOwner + expected[0].resourceVersion = csiscRev + "1" + expected = append(expected, testCapacity{ + uid: "CSISC-UID-2", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }) + return expected, nil + }, + }, + "delete and recreate by someone": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "CSISC-UID-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) { + capacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + capacity := capacities.Items[0] + // Delete and recreate with wrong capacity. This changes the UID while keeping the name + // the same. The capacity then must get corrected by the controller. + if err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil { + return nil, err + } + capacity.UID = "CSISC-UID-2" + capacity.Capacity = &mb + if _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Create(ctx, &capacity, metav1.CreateOptions{}); err != nil { + return nil, err + } + expected[0].uid = capacity.UID + expected[0].resourceVersion = csiscRev + "1" + return expected, nil + }, + }, + "storage capacity change": { + topology: mockTopology{ + segments: []*topology.Segment{&layer0}, + }, + storage: mockCapacity{ + capacity: map[string]interface{}{ + // This matches layer0. + "foo": "1Gi", + }, + }, + initialSCs: []testSC{ + { + name: "other-sc", + driverName: driverName, + }, + }, + expectedCapacities: []testCapacity{ + { + uid: "CSISC-UID-1", + resourceVersion: csiscRev + "0", + segment: layer0, + storageClassName: "other-sc", + quantity: "1Gi", + }, + }, + capacityChange: func(ctx context.Context, storage *mockCapacity, expected []testCapacity) []testCapacity { + storage.capacity["foo"] = "2Gi" + expected[0].quantity = "2Gi" + expected[0].resourceVersion = csiscRev + "1" + return expected + }, + }, + } + + for name, tc := range testcases { + // Not run in parallel. That doesn't work well in combination with global logging. + t.Run(name, func(t *testing.T) { + // There is no good way to shut down the controller. It spawns + // various goroutines and some of them (in particular shared informer) + // become very unhappy ("close on closed channel") when using a context + // that gets cancelled. Therefore we just keep everything running. + ctx := context.Background() + + var objects []runtime.Object + objects = append(objects, makeSCs(tc.initialSCs)...) + clientSet := fakeclientset.NewSimpleClientset(objects...) + clientSet.PrependReactor("create", "csistoragecapacities", createCSIStorageCapacityReactor()) + clientSet.PrependReactor("update", "csistoragecapacities", updateCSIStorageCapacityReactor()) + c := fakeController(ctx, clientSet, &tc.storage, &tc.topology) + for _, testCapacity := range tc.initialCapacities { + capacity := makeCapacity(testCapacity) + _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + c.prepare(ctx) + if err := process(ctx, c); err != nil { + t.Fatalf("unexpected processing error: %v", err) + } + err := validateCapacities(ctx, clientSet, tc.expectedCapacities) + if err != nil { + t.Fatalf("%v", err) + } + + // Now (optionally) modify the state and + // ensure that eventually the controller + // catches up. + expectedCapacities := tc.expectedCapacities + if tc.modify != nil { + klog.Info("modifying objects") + ec, err := tc.modify(ctx, clientSet, expectedCapacities) + if err != nil { + t.Fatalf("modify objects: %v", err) + } + expectedCapacities = ec + if err := validateCapacitiesEventually(ctx, c, clientSet, expectedCapacities); err != nil { + t.Fatalf("modified objects: %v", err) + } + } + if tc.capacityChange != nil { + klog.Info("modifying capacity") + expectedCapacities = tc.capacityChange(ctx, &tc.storage, expectedCapacities) + c.pollCapacities() + if err := validateCapacitiesEventually(ctx, c, clientSet, expectedCapacities); err != nil { + t.Fatalf("modified capacity: %v", err) + } + } + }) + } +} + +func validateCapacities(ctx context.Context, clientSet *fakeclientset.Clientset, expectedCapacities []testCapacity) error { + actualCapacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("unexpected error: %v", err) + } + var messages []string + if len(actualCapacities.Items) != len(expectedCapacities) { + messages = append(messages, fmt.Sprintf("expected %d CSIStorageCapacity objects, got %d", len(expectedCapacities), len(actualCapacities.Items))) + } +nextActual: + for _, actual := range actualCapacities.Items { + for i, expected := range expectedCapacities { + expectedOwnerReferences := makeCapacity(expected).OwnerReferences + if reflect.DeepEqual(actual.NodeTopology, expected.segment.GetLabelSelector()) && + actual.StorageClassName == expected.storageClassName && + (len(actual.OwnerReferences) == 0 && len(expectedOwnerReferences) == 0 || + reflect.DeepEqual(actual.OwnerReferences, expectedOwnerReferences)) { + var mismatches []string + if expected.quantity != "" && actual.Capacity == nil { + mismatches = append(mismatches, "unexpected nil quantity") + } + if expected.quantity == "" && actual.Capacity != nil { + mismatches = append(mismatches, "unexpected quantity") + } + if expected.quantity != "" && actual.Capacity.Cmp(*expected.getCapacity()) != 0 { + mismatches = append(mismatches, fmt.Sprintf("expected quantity %v, got %v", expected.quantity, *actual.Capacity)) + } + if expected.uid != "" && actual.UID != expected.uid { + mismatches = append(mismatches, fmt.Sprintf("expected UID %s, got %s", expected.uid, actual.UID)) + } + if expected.resourceVersion != "" && actual.ResourceVersion != expected.resourceVersion { + mismatches = append(mismatches, fmt.Sprintf("expected ResourceVersion %s, got %s", expected.resourceVersion, actual.ResourceVersion)) + } + if len(mismatches) > 0 { + messages = append(messages, fmt.Sprintf("CSIStorageCapacity %+v:\n %s", actual, strings.Join(mismatches, "\n "))) + } + // Never match against the same expected capacity twice. Also, the ones that remain are dumped below. + expectedCapacities = append(expectedCapacities[:i], expectedCapacities[i+1:]...) + continue nextActual + } + } + messages = append(messages, fmt.Sprintf("unexpected CSIStorageCapacity %#v", actual)) + } + for _, expected := range expectedCapacities { + messages = append(messages, fmt.Sprintf("expected CSIStorageCapacity %+v not found", expected)) + } + if len(messages) > 0 { + return errors.New(strings.Join(messages, "\n")) + } + return nil +} + +func validateCapacitiesEventually(ctx context.Context, c *Controller, clientSet *fakeclientset.Clientset, expectedCapacities []testCapacity) error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + deadline, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + var lastValidationError error + klog.Info("waiting for controller to catch up") + for { + select { + case <-ticker.C: + if err := process(ctx, c); err != nil { + return fmt.Errorf("unexpected processing error: %v", err) + } + lastValidationError = validateCapacities(ctx, clientSet, expectedCapacities) + if lastValidationError == nil { + return nil + } + case <-deadline.Done(): + return fmt.Errorf("timed out waiting for controller, last unexpected state:\n%v", lastValidationError) + } + } +} + +// createCSIStorageCapacityReactor implements the logic required for the GenerateName and UID fields to work when using +// the fake client. Add it with client.PrependReactor to your fake client. +func createCSIStorageCapacityReactor() func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + var uidCounter int + var mutex sync.Mutex + return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + s := action.(ktesting.CreateAction).GetObject().(*storagev1alpha1.CSIStorageCapacity) + if s.Name == "" && s.GenerateName != "" { + s.Name = fmt.Sprintf("%s-%s", s.GenerateName, krand.String(16)) + } + if s.UID == "" { + mutex.Lock() + defer mutex.Unlock() + uidCounter++ + s.UID = types.UID(fmt.Sprintf("CSISC-UID-%d", uidCounter)) + } + s.ResourceVersion = csiscRev + "0" + return false, nil, nil + } +} + +// updateCSIStorageCapacityReactor implements the logic required for the GenerateName and UID fields to work when using +// the fake client. Add it with client.PrependReactor to your fake client. +func updateCSIStorageCapacityReactor() func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + s := action.(ktesting.UpdateAction).GetObject().(*storagev1alpha1.CSIStorageCapacity) + if !strings.HasPrefix(s.ResourceVersion, csiscRev) { + return false, nil, fmt.Errorf("resource version %q should have prefix %s", s.ResourceVersion, csiscRev) + } + revCounter, err := strconv.Atoi(s.ResourceVersion[len(csiscRev):]) + if err != nil { + return false, nil, fmt.Errorf("resource version %q should have formar %s: %v", s.ResourceVersion, csiscRev, err) + } + s.ResourceVersion = csiscRev + fmt.Sprintf("%d", revCounter+1) + return false, nil, nil + } +} + +func fakeController(ctx context.Context, client *fakeclientset.Clientset, storage CSICapacityClient, topologyInformer topology.Informer) *Controller { + utilruntime.ReallyCrash = false // avoids os.Exit after "close of closed channel" in shared informer code + + // We don't need resyncs, they just lead to confusing log output if they get triggered while already some + // new test is running. + resyncPeriod := time.Hour + informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) + scInformer := informerFactory.Storage().V1().StorageClasses() + cInformer := informerFactory.Storage().V1alpha1().CSIStorageCapacities() + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 2*time.Second) + queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "items") + + c := NewCentralCapacityController( + storage, + driverName, + client, + queue, + defaultOwner, + ownerNamespace, + topologyInformer, + scInformer, + cInformer, + 1000*time.Hour, // Not used, but even if it was, we wouldn't want automatic capacity polling while the test runs... + ) + + // This ensures that the informers are running and up-to-date. + go informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + return c +} + +// process handles work items until the queue is empty and the informers are synced. +func process(ctx context.Context, c *Controller) error { + for { + if c.queue.Len() == 0 { + done, err := storageClassesSynced(ctx, c) + if err != nil { + return fmt.Errorf("check storage classes: %v", err) + } + if done { + return nil + } + } + // There's no atomic "try to get a work item". Let's + // check one more time before potentially blocking + // in c.queue.Get(). + len := c.queue.Len() + if len > 0 { + klog.V(1).Infof("testing next work item, queue length %d", len) + c.processNextWorkItem(ctx) + klog.V(5).Infof("done testing next work item") + } + } +} + +func storageClassesSynced(ctx context.Context, c *Controller) (bool, error) { + actualStorageClasses, err := c.client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + informerStorageClasses, err := c.scInformer.Lister().List(labels.Everything()) + if len(informerStorageClasses) != len(actualStorageClasses.Items) { + return false, nil + } + if len(informerStorageClasses) > 0 && !func() bool { + for _, actualStorageClass := range actualStorageClasses.Items { + for _, informerStorageClass := range informerStorageClasses { + if reflect.DeepEqual(actualStorageClass, *informerStorageClass) { + return true + } + } + } + return false + }() { + return false, nil + } + + return true, nil +} + +const ( + mockMultiplier = "multiplier" +) + +// mockGetCapacity simulates a driver with a layered storage system: +// storage exists at each level with different quantities (one pool for all nodes, +// one pool for each data center, one pool for reach region). +// +// It uses "layer1", "layer2", ... etc. as topology keys to dive into +// the map, which then either has a string or another map. +// A fake "multiplier" parameter is applied to the resulting capacity. +type mockCapacity struct { + capacity map[string]interface{} +} + +func (mc *mockCapacity) GetCapacity(ctx context.Context, in *csi.GetCapacityRequest, opts ...grpc.CallOption) (*csi.GetCapacityResponse, error) { + available := "" + if in.AccessibleTopology != nil { + var err error + available, err = getCapacity(mc.capacity, in.AccessibleTopology.Segments, 0) + if err != nil { + return nil, err + } + } + resp := &csi.GetCapacityResponse{} + if available != "" { + quantity := resource.MustParse(available) + resp.AvailableCapacity = quantity.Value() + } + multiplierStr, ok := in.Parameters[mockMultiplier] + if ok { + multiplier, err := strconv.Atoi(multiplierStr) + if err != nil { + return nil, fmt.Errorf("invalid parameter %s -> %s: %v", mockMultiplier, multiplierStr, err) + } + resp.AvailableCapacity *= int64(multiplier) + } + return resp, nil +} + +func getCapacity(capacity map[string]interface{}, segments map[string]string, layer int) (string, error) { + if capacity == nil { + return "", fmt.Errorf("no information found at layer %d", layer) + } + key := fmt.Sprintf("layer%d", layer) + value := capacity[segments[key]] + switch value := value.(type) { + case string: + return value, nil + case map[string]interface{}: + result, err := getCapacity(value, segments, layer+1) + if err != nil { + return "", fmt.Errorf("%s -> %s: %v", key, segments[key], err) + } + return result, nil + } + return "", nil +} + +// mockTopology simulates a driver installation on different nodes. +type mockTopology struct { + segments []*topology.Segment + callbacks []topology.Callback +} + +func (mt *mockTopology) AddCallback(cb topology.Callback) { + mt.callbacks = append(mt.callbacks, cb) + cb(mt.segments, nil) +} + +func (mt *mockTopology) List() []*topology.Segment { + return mt.segments +} + +func (mt *mockTopology) Run(ctx context.Context) { +} + +func (mt *mockTopology) HasSynced() bool { + return true +} + +type testCapacity struct { + uid types.UID + resourceVersion string + segment topology.Segment + storageClassName string + quantity string + owner *metav1.OwnerReference +} + +func (tc testCapacity) getCapacity() *resource.Quantity { + if tc.quantity == "" { + return nil + } + quantity := resource.MustParse(tc.quantity) + return &quantity +} + +var capacityCounter int + +func makeCapacity(in testCapacity) *storagev1alpha1.CSIStorageCapacity { + capacityCounter++ + var owners []metav1.OwnerReference + switch in.owner { + case nil: + owners = append(owners, defaultOwner) + case &noOwner: + // Don't add anything. + default: + owners = append(owners, *in.owner) + } + return &storagev1alpha1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + UID: in.uid, + ResourceVersion: in.resourceVersion, + Name: fmt.Sprintf("csisc-%d", capacityCounter), + OwnerReferences: owners, + }, + NodeTopology: in.segment.GetLabelSelector(), + StorageClassName: in.storageClassName, + Capacity: in.getCapacity(), + } +} + +func makeCapacities(in []testCapacity) (items []runtime.Object) { + for _, item := range in { + items = append(items, makeCapacity(item)) + } + return +} + +type testSC struct { + name string + driverName string + parameters map[string]string +} + +func makeSC(in testSC) *storagev1.StorageClass { + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: in.name, + }, + Provisioner: in.driverName, + Parameters: in.parameters, + } +} + +func makeSCs(in []testSC) (items []runtime.Object) { + for _, item := range in { + items = append(items, makeSC(item)) + } + return +} diff --git a/pkg/capacity/doc.go b/pkg/capacity/doc.go new file mode 100644 index 0000000000..a0f064c2ba --- /dev/null +++ b/pkg/capacity/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package capacity contains the code which controls the CSIStorageCapacity +// objects owned by the external-provisioner. +package capacity diff --git a/pkg/capacity/features.go b/pkg/capacity/features.go new file mode 100644 index 0000000000..a41b5699d8 --- /dev/null +++ b/pkg/capacity/features.go @@ -0,0 +1,76 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacity + +import ( + "fmt" + "strings" + + flag "github.com/spf13/pflag" +) + +// Feature is the type for named features supported by the capacity +// controller. +type Feature string + +// Features are disabled by default. +type Features map[Feature]bool + +const ( + // FeatureCentral enables the mode where external-provisioner + // is deployed together with the CSI driver's controller. + FeatureCentral = Feature("central") + + // FeatureLocal enables the mode where external-provisioner + // is deployed on each node. Not implemented yet. + FeatureLocal = Feature("local") +) + +// Set enables the named features. Multiple features can be listed, separated by commas, +// with optional whitespace. +func (features *Features) Set(value string) error { + for _, part := range strings.Split(value, ",") { + part := Feature(strings.TrimSpace(part)) + switch part { + case FeatureCentral: + if *features == nil { + *features = Features{} + } + (*features)[part] = true + case FeatureLocal: + return fmt.Errorf("%s: not implemented yet", part) + case "": + default: + return fmt.Errorf("%s: unknown feature", part) + } + } + return nil +} + +func (features *Features) String() string { + var parts []string + for feature := range *features { + parts = append(parts, string(feature)) + } + return strings.Join(parts, ",") +} + +func (features *Features) Type() string { + return "enumeration" +} + +var _ flag.Value = &Features{} diff --git a/pkg/capacity/features_test.go b/pkg/capacity/features_test.go new file mode 100644 index 0000000000..fcb6e2af1c --- /dev/null +++ b/pkg/capacity/features_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacity + +import ( + "reflect" + "testing" +) + +func TestFeatures(t *testing.T) { + tests := []struct { + name string + input []string + expectedOutput Features + expectedError string + }{ + { + name: "empty", + }, + { + name: "central", + input: []string{string(FeatureCentral)}, + expectedOutput: Features{FeatureCentral: true}, + }, + { + name: "local", + input: []string{string(FeatureLocal)}, + expectedError: string(FeatureLocal) + ": not implemented yet", + }, + { + name: "invalid", + input: []string{"no-such-feature"}, + expectedError: "no-such-feature: unknown feature", + }, + { + name: "multi", + input: []string{string(FeatureCentral), string(FeatureCentral)}, + expectedOutput: Features{FeatureCentral: true}, + }, + { + name: "comma", + input: []string{string(FeatureCentral) + " ," + string(FeatureCentral) + " "}, + expectedOutput: Features{FeatureCentral: true}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var actual Features + for _, value := range test.input { + err := actual.Set(value) + if err != nil && test.expectedError != "" { + if err.Error() == test.expectedError { + return + } + t.Fatalf("expected error %q, got %v", test.expectedError, err) + } + if err == nil && test.expectedError != "" { + t.Fatalf("expected error %q, got no error", test.expectedError) + } + } + if !reflect.DeepEqual(actual, test.expectedOutput) { + t.Fatalf("expected %v, got %v", test.expectedOutput, actual) + } + }) + } +} diff --git a/pkg/capacity/topology/doc.go b/pkg/capacity/topology/doc.go new file mode 100644 index 0000000000..6adfde6a43 --- /dev/null +++ b/pkg/capacity/topology/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package topology contains an abstract interface for discovering +// topology segments for a storage backend and a specific implementation +// which does that based on the CSINodeDriver.TopologyKeys and the +// corresponding labels for the nodes. +package topology diff --git a/pkg/capacity/topology/nodes.go b/pkg/capacity/topology/nodes.go new file mode 100644 index 0000000000..7a24ea73e3 --- /dev/null +++ b/pkg/capacity/topology/nodes.go @@ -0,0 +1,278 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package topology contains an abstract interface for discovering +// topology segments for a storage backend and a specific implementation +// which does that based on the CSINodeDriver.TopologyKeys and the +// corresponding labels for the nodes. +package topology + +import ( + "context" + "reflect" + "sort" + "sync" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + storageinformersv1 "k8s.io/client-go/informers/storage/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +func NewNodeTopology( + driverName string, + client kubernetes.Interface, + nodeInformer coreinformersv1.NodeInformer, + csiNodeInformer storageinformersv1.CSINodeInformer, + queue workqueue.RateLimitingInterface, +) Informer { + nt := &nodeTopology{ + driverName: driverName, + client: client, + nodeInformer: nodeInformer, + csiNodeInformer: csiNodeInformer, + queue: queue, + } + + // Whenever Node or CSINode objects change, we need to + // recalculate the new topology segments. We could do that + // immediately, but it is better to let the input data settle + // a bit and just remember that there is work to be done. + nodeHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + klog.V(5).Infof("capacity topology: new node: %s", obj.(*v1.Node).Name) + queue.Add("") + }, + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + if reflect.DeepEqual(oldObj.(*v1.Node).Labels, newObj.(*v1.Node).Labels) { + // Shortcut: labels haven't changed, no need to sync. + return + } + klog.V(5).Infof("capacity topology: updated node: %s", newObj.(*v1.Node).Name) + queue.Add("") + }, + DeleteFunc: func(obj interface{}) { + klog.V(5).Infof("capacity topology: removed node: %s", obj.(*v1.Node).Name) + queue.Add("") + }, + } + nodeInformer.Informer().AddEventHandler(nodeHandler) + csiNodeHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + klog.V(5).Infof("capacity topology: new CSINode: %s", obj.(*storagev1.CSINode).Name) + queue.Add("") + }, + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + oldKeys := nt.driverTopologyKeys(oldObj.(*storagev1.CSINode)) + newKeys := nt.driverTopologyKeys(newObj.(*storagev1.CSINode)) + if reflect.DeepEqual(oldKeys, newKeys) { + // Shortcut: keys haven't changed, no need to sync. + return + } + klog.V(5).Infof("capacity topology: updated CSINode: %s", newObj.(*storagev1.CSINode).Name) + queue.Add("") + }, + DeleteFunc: func(obj interface{}) { + klog.V(5).Infof("capacity topology: removed CSINode: %s", obj.(*storagev1.CSINode).Name) + queue.Add("") + }, + } + csiNodeInformer.Informer().AddEventHandler(csiNodeHandler) + + return nt +} + +var _ Informer = &nodeTopology{} + +type nodeTopology struct { + driverName string + client kubernetes.Interface + nodeInformer coreinformersv1.NodeInformer + csiNodeInformer storageinformersv1.CSINodeInformer + queue workqueue.RateLimitingInterface + + mutex sync.Mutex + segments []*Segment + callbacks []Callback +} + +// driverTopologyKeys returns nil if the driver is not running +// on the node, otherwise at least an empty slice of topology keys. +func (nt *nodeTopology) driverTopologyKeys(csiNode *storagev1.CSINode) []string { + for _, csiNodeDriver := range csiNode.Spec.Drivers { + if csiNodeDriver.Name == nt.driverName { + if csiNodeDriver.TopologyKeys == nil { + return []string{} + } + return csiNodeDriver.TopologyKeys + } + } + return nil +} + +func (nt *nodeTopology) AddCallback(cb Callback) { + nt.mutex.Lock() + defer nt.mutex.Unlock() + + nt.callbacks = append(nt.callbacks, cb) +} + +func (nt *nodeTopology) List() []*Segment { + nt.mutex.Lock() + defer nt.mutex.Unlock() + + // We need to return a new slice to protect against future + // changes in nt.segments. The segments themselves are + // immutable and shared. + segments := make([]*Segment, len(nt.segments)) + copy(segments, nt.segments) + return segments +} + +func (nt *nodeTopology) Run(ctx context.Context) { + go nt.nodeInformer.Informer().Run(ctx.Done()) + go nt.csiNodeInformer.Informer().Run(ctx.Done()) + go nt.runWorker(ctx) + + klog.Info("Started node topology informer") + <-ctx.Done() + klog.Info("Shutting node topology informer") +} + +func (nt *nodeTopology) HasSynced() bool { + return nt.nodeInformer.Informer().HasSynced() && + nt.csiNodeInformer.Informer().HasSynced() +} + +func (nt *nodeTopology) runWorker(ctx context.Context) { + for nt.processNextWorkItem(ctx) { + } +} + +func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool { + obj, shutdown := nt.queue.Get() + if shutdown { + return false + } + defer nt.queue.Done(obj) + nt.sync(ctx) + return true +} + +func (nt *nodeTopology) sync(ctx context.Context) { + // For all nodes on which the driver is registered, collect the topology key/value pairs + // and sort them by key name to make the result deterministic. Skip all segments that have + // been seen before. + segments := nt.List() + removed := map[*Segment]bool{} + var addedSegments, removedSegments []*Segment + for _, segment := range segments { + // Assume that the segment is removed. Will be set to + // false if we find out otherwise. + removed[segment] = true + } + + csiNodes, err := nt.csiNodeInformer.Lister().List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return + } + existingSegments := make([]*Segment, 0, len(segments)) +node: + for _, csiNode := range csiNodes { + topologyKeys := nt.driverTopologyKeys(csiNode) + if topologyKeys == nil { + // Driver not running on node, ignore it. + continue + } + node, err := nt.nodeInformer.Lister().Get(csiNode.Name) + if err != nil { + if apierrs.IsNotFound(err) { + // Obsolete CSINode object? Ignore it. + continue + } + // This shouldn't happen. If it does, + // something is very wrong and we give up. + utilruntime.HandleError(err) + return + } + + newSegment := Segment{} + sort.Strings(topologyKeys) + for _, key := range topologyKeys { + value, ok := node.Labels[key] + if !ok { + // The driver announced some topology key and kubelet recorded + // it in CSINode, but we haven't seen the corresponding + // node update yet as the label is not set. Ignore the node + // for now, we'll sync up when we get the node update. + continue node + } + newSegment = append(newSegment, SegmentEntry{key, value}) + } + + // Add it only if new, otherwise look at the next node. + for _, segment := range segments { + if newSegment.Compare(*segment) == 0 { + // Reuse a segment instead of using the new one. This keeps pointers stable. + removed[segment] = false + existingSegments = append(existingSegments, segment) + continue node + } + } + for _, segment := range addedSegments { + if newSegment.Compare(*segment) == 0 { + // We already discovered this new segment. + continue node + } + } + + // A completely new segment. + addedSegments = append(addedSegments, &newSegment) + existingSegments = append(existingSegments, &newSegment) + } + + // Lock while making changes, but unlock before actually invoking callbacks. + nt.mutex.Lock() + nt.segments = existingSegments + + // Theoretically callbacks could change while we don't have + // the lock, so make a copy. + callbacks := make([]Callback, len(nt.callbacks)) + copy(callbacks, nt.callbacks) + nt.mutex.Unlock() + + for segment, wasRemoved := range removed { + if wasRemoved { + removedSegments = append(removedSegments, segment) + } + } + if len(addedSegments) > 0 || len(removedSegments) > 0 { + klog.V(5).Infof("topology changed: added %v, removed %v", addedSegments, removedSegments) + for _, cb := range callbacks { + cb(addedSegments, removedSegments) + } + } else { + klog.V(5).Infof("topology unchanged") + } +} diff --git a/pkg/capacity/topology/nodes_test.go b/pkg/capacity/topology/nodes_test.go new file mode 100644 index 0000000000..32aeb06881 --- /dev/null +++ b/pkg/capacity/topology/nodes_test.go @@ -0,0 +1,633 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "context" + "fmt" + "reflect" + "sort" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +func init() { + klog.InitFlags(nil) +} + +const ( + driverName = "my-csi-driver" + node1 = "node1" + node2 = "node2" +) + +var ( + localStorageKey = "nodename" + localStorageKeys = []string{localStorageKey} + localStorageLabelsNode1 = map[string]string{localStorageKey: node1} + localStorageNode1 = &Segment{ + {localStorageKey, node1}, + } + localStorageLabelsNode2 = map[string]string{localStorageKey: node2} + localStorageNode2 = &Segment{ + {localStorageKey, node2}, + } + networkStorageKeys = []string{"A", "B", "C"} + networkStorageLabels = map[string]string{ + networkStorageKeys[0]: "US", + networkStorageKeys[1]: "NY", + networkStorageKeys[2]: "1", + } + networkStorage = &Segment{ + {networkStorageKeys[0], "US"}, + {networkStorageKeys[1], "NY"}, + {networkStorageKeys[2], "1"}, + } + networkStorageLabels2 = map[string]string{ + networkStorageKeys[0]: "US", + networkStorageKeys[1]: "NY", + networkStorageKeys[2]: "2", + } + networkStorage2 = &Segment{ + {networkStorageKeys[0], "US"}, + {networkStorageKeys[1], "NY"}, + {networkStorageKeys[2], "2"}, + } +) + +func removeNode(t *testing.T, client *fakeclientset.Clientset, nodeName string) { + err := client.CoreV1().Nodes().Delete(context.Background(), nodeName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func removeCSINode(t *testing.T, client *fakeclientset.Clientset, nodeName string) { + err := client.StorageV1().CSINodes().Delete(context.Background(), nodeName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// TestNodeTopology checks that node labels are correctly transformed +// into topology segments. +func TestNodeTopology(t *testing.T) { + testcases := map[string]struct { + driverName string + initialNodes []testNode + expectedSegments []*Segment + update func(t *testing.T, client *fakeclientset.Clientset) + expectedUpdatedSegments []*Segment + }{ + "empty": {}, + "one-node": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + }, + "missing-csi-node": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + skipCSINodeCreation: true, + }, + }, + }, + "missing-node": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + skipNodeCreation: true, + }, + }, + }, + "missing-node-labels": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + }, + }, + }, + "two-nodes": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode2, + }, + }, + expectedSegments: []*Segment{localStorageNode1, localStorageNode2}, + }, + "shared-storage": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + }, + "other-shared-storage": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode2, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode2, + }, + }, + expectedSegments: []*Segment{localStorageNode2}, + }, + "deep-topology": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels, + }, + }, + expectedSegments: []*Segment{networkStorage}, + }, + "mixed-topology": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels, + }, + }, + expectedSegments: []*Segment{localStorageNode1, networkStorage}, + }, + "partial-match": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels2, + }, + }, + expectedSegments: []*Segment{networkStorage, networkStorage2}, + }, + "unsorted-keys": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + // This node reports keys in reverse order, which must not make a difference. + driverName: []string{networkStorageKeys[2], networkStorageKeys[1], networkStorageKeys[0]}, + }, + labels: networkStorageLabels, + }, + { + name: node2, + driverKeys: map[string][]string{ + driverName: networkStorageKeys, + }, + labels: networkStorageLabels, + }, + }, + expectedSegments: []*Segment{networkStorage}, + }, + "wrong-driver": { + driverName: "other-driver", + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + }, + "remove-csi-node": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + update: func(t *testing.T, client *fakeclientset.Clientset) { + removeCSINode(t, client, node1) + }, + }, + "remove-node": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + update: func(t *testing.T, client *fakeclientset.Clientset) { + removeNode(t, client, node1) + }, + }, + "remove-driver": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + update: func(t *testing.T, client *fakeclientset.Clientset) { + csiNode, err := client.StorageV1().CSINodes().Get(context.Background(), node1, metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + csiNode.Spec.Drivers = nil + if _, err := client.StorageV1().CSINodes().Update(context.Background(), csiNode, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }, + }, + "change-labels": { + initialNodes: []testNode{ + { + name: node1, + driverKeys: map[string][]string{ + driverName: localStorageKeys, + }, + labels: localStorageLabelsNode1, + }, + }, + expectedSegments: []*Segment{localStorageNode1}, + update: func(t *testing.T, client *fakeclientset.Clientset) { + node, err := client.CoreV1().Nodes().Get(context.Background(), node1, metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // This isn't a realistic test case because CSI drivers cannot change their topology? + // We support it anyway. + node.Labels[localStorageKey] = node2 + if _, err := client.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }, + expectedUpdatedSegments: []*Segment{localStorageNode2}, + }, + } + + for name, tc := range testcases { + // Not run in parallel. That doesn't work well in combination with global logging. + t.Run(name, func(t *testing.T) { + // There is no good way to shut down the informers. They spawn + // various goroutines and some of them (in particular shared informer) + // become very unhappy ("close on closed channel") when using a context + // that gets cancelled. Therefore we just keep everything running. + // + // The informers also catch up with changes made via the client API + // asynchronously. To ensure expected input for sync(), we wait until + // the content of the informers is identical to what is currently stored. + ctx := context.Background() + + testDriverName := tc.driverName + if testDriverName == "" { + testDriverName = driverName + } + + var objects []runtime.Object + objects = append(objects, makeNodes(tc.initialNodes)...) + clientSet := fakeclientset.NewSimpleClientset(objects...) + nt := fakeNodeTopology(ctx, testDriverName, clientSet) + if err := waitForInformers(ctx, nt); err != nil { + t.Fatalf("unexpected error: %v", err) + } + validate(t, nt, tc.expectedSegments, nil, tc.expectedSegments) + + if tc.update != nil { + tc.update(t, clientSet) + if err := waitForInformers(ctx, nt); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Determine the expected changes based on the delta. + var expectedAdded, expectedRemoved []*Segment + for _, segment := range tc.expectedUpdatedSegments { + if !containsSegment(tc.expectedSegments, segment) { + expectedAdded = append(expectedAdded, segment) + } + } + for _, segment := range tc.expectedSegments { + if !containsSegment(tc.expectedUpdatedSegments, segment) { + expectedRemoved = append(expectedRemoved, segment) + } + } + validate(t, nt, expectedAdded, expectedRemoved, tc.expectedUpdatedSegments) + } + }) + } +} + +type segmentsFound map[*Segment]bool + +func (sf segmentsFound) Found() []*Segment { + var found []*Segment + for key, value := range sf { + if value { + found = append(found, key) + } + } + return found +} + +func addTestCallback(nt *nodeTopology) (added, removed segmentsFound, called *bool) { + added = segmentsFound{} + removed = segmentsFound{} + called = new(bool) + nt.AddCallback(func(a, r []*Segment) { + *called = true + for _, segment := range a { + added[segment] = true + } + for _, segment := range r { + removed[segment] = true + } + }) + return +} + +func containsSegment(segments []*Segment, segment *Segment) bool { + for _, s := range segments { + if s.Compare(*segment) == 0 { + return true + } + } + return false +} + +func fakeNodeTopology(ctx context.Context, testDriverName string, client *fakeclientset.Clientset) *nodeTopology { + // We don't need resyncs, they just lead to confusing log output if they get triggered while already some + // new test is running. + informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second /* no resync */) + nodeInformer := informerFactory.Core().V1().Nodes() + csiNodeInformer := informerFactory.Storage().V1().CSINodes() + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 2*time.Second) + queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "items") + + nt := NewNodeTopology( + testDriverName, + client, + nodeInformer, + csiNodeInformer, + queue, + ).(*nodeTopology) + + go informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + return nt +} + +func waitForInformers(ctx context.Context, nt *nodeTopology) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err := wait.PollImmediateUntil(time.Millisecond, func() (bool, error) { + actualNodes, err := nt.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + informerNodes, err := nt.nodeInformer.Lister().List(labels.Everything()) + if len(informerNodes) != len(actualNodes.Items) { + return false, nil + } + if len(informerNodes) > 0 && !func() bool { + for _, actualNode := range actualNodes.Items { + for _, informerNode := range informerNodes { + if reflect.DeepEqual(actualNode, *informerNode) { + return true + } + } + } + return false + }() { + return false, nil + } + + actualCSINodes, err := nt.client.StorageV1().CSINodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + informerCSINodes, err := nt.csiNodeInformer.Lister().List(labels.Everything()) + if len(informerCSINodes) != len(actualCSINodes.Items) { + return false, nil + } + if len(informerCSINodes) > 0 && !func() bool { + for _, actualCSINode := range actualCSINodes.Items { + for _, informerCSINode := range informerCSINodes { + if reflect.DeepEqual(actualCSINode, *informerCSINode) { + return true + } + } + } + return false + }() { + return false, nil + } + + return true, nil + }, ctx.Done()) + if err != nil { + return fmt.Errorf("get informers in sync: %v", err) + } + return nil +} + +func validate(t *testing.T, nt *nodeTopology, expectedAdded, expectedRemoved, expectedAll []*Segment) { + added, removed, called := addTestCallback(nt) + nt.sync(context.Background()) + expectedChanges := len(expectedAdded) > 0 || len(expectedRemoved) > 0 + if expectedChanges && !*called { + t.Error("change callback not invoked") + } + if !expectedChanges && *called { + t.Error("change callback invoked unexpectedly") + } + validateSegments(t, "added", added.Found(), expectedAdded) + validateSegments(t, "removed", removed.Found(), expectedRemoved) + validateSegments(t, "final", nt.List(), expectedAll) + + if t.Failed() { + t.FailNow() + } +} + +func validateSegments(t *testing.T, what string, actual, expected []*Segment) { + // We can just compare the string representation because that covers all + // relevant content of the segments and is readable. + found := map[string]bool{} + for _, str := range segmentsToStrings(expected) { + found[str] = false + } + for _, str := range segmentsToStrings(actual) { + _, exists := found[str] + if !exists { + t.Errorf("unexpected %s segment: %s", what, str) + t.Fail() + continue + } + found[str] = true + } + for str, matched := range found { + if !matched { + t.Errorf("expected %s segment not found: %s", what, str) + t.Fail() + } + } +} + +func segmentsToStrings(segments []*Segment) []string { + str := []string{} + for _, segment := range segments { + str = append(str, segment.String()) + } + sort.Strings(str) + return str +} + +type testNode struct { + name string + driverKeys map[string][]string + labels map[string]string + skipNodeCreation, skipCSINodeCreation bool +} + +func makeNodes(nodes []testNode) []runtime.Object { + var objects []runtime.Object + + for _, node := range nodes { + if !node.skipNodeCreation { + objects = append(objects, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.name, + Labels: node.labels, + }, + }) + } + if !node.skipCSINodeCreation { + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.name, + }, + } + for driver, keys := range node.driverKeys { + csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, + storagev1.CSINodeDriver{ + Name: driver, + TopologyKeys: keys, + }) + } + objects = append(objects, csiNode) + } + } + return objects +} diff --git a/pkg/capacity/topology/topology.go b/pkg/capacity/topology/topology.go new file mode 100644 index 0000000000..0a2d1db118 --- /dev/null +++ b/pkg/capacity/topology/topology.go @@ -0,0 +1,129 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "context" + "sort" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Segment represents a topology segment. Entries are always sorted by +// key and keys are unique. In contrast to a map, segments therefore +// can be compared efficiently. A nil segment matches no nodes +// in a cluster, an empty segment all of them. +type Segment []SegmentEntry + +var _ sort.Interface = Segment{} + +func (s Segment) String() string { + var parts []string + for _, entry := range s { + parts = append(parts, entry.String()) + } + return "{" + strings.Join(parts, ", ") + "}" +} + +// Compare returns -1 if s is considered smaller than the other segment (less keys, +// keys and/or values smaller), 0 if equal and 1 otherwise. +func (s Segment) Compare(other Segment) int { + if len(s) < len(other) { + return -1 + } + if len(s) > len(other) { + return 1 + } + for i := 0; i < len(s); i++ { + cmp := s[i].Compare(other[i]) + if cmp != 0 { + return cmp + } + } + return 0 +} + +func (s Segment) Len() int { return len(s) } +func (s Segment) Less(i, j int) bool { return s[i].Compare(s[j]) < 0 } +func (s Segment) Swap(i, j int) { + entry := s[i] + s[i] = s[j] + s[j] = entry +} + +// SegmentEntry represents one topology key/value pair. +type SegmentEntry struct { + Key, Value string +} + +func (se SegmentEntry) String() string { + return se.Key + ": " + se.Value +} + +// Compare returns -1 if se is considered smaller than the other segment entry (key or value smaller), +// 0 if equal and 1 otherwise. +func (se SegmentEntry) Compare(other SegmentEntry) int { + cmp := strings.Compare(se.Key, other.Key) + if cmp != 0 { + return cmp + } + return strings.Compare(se.Value, other.Value) +} + +// GetLabelSelector returns a LabelSelector with the key/value entries +// as label match criteria. +func (s Segment) GetLabelSelector() *metav1.LabelSelector { + return &metav1.LabelSelector{ + MatchLabels: s.GetLabelMap(), + } +} + +// GetLabelMap returns nil if the Segment itself is nil, +// otherwise a map with all key/value pairs. +func (s Segment) GetLabelMap() map[string]string { + if s == nil { + return nil + } + labels := map[string]string{} + for _, entry := range s { + labels[entry.Key] = entry.Value + } + return labels +} + +// Informer keeps a list of discovered topology segments and can +// notify one or more clients when it discovers changes. Segments +// are identified by their address and guaranteed to be unique. +type Informer interface { + // AddCallback ensures that the function is called each time + // changes to the list of segments are detected. It also gets + // called immediately when adding the callback and there are + // already some known segments. + AddCallback(cb Callback) + + // List returns all known segments, in no particular order. + List() []*Segment + + // Run starts watching for changes. + Run(ctx context.Context) + + // HasSynced returns true once all segments have been found. + HasSynced() bool +} + +type Callback func(added []*Segment, removed []*Segment) diff --git a/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go b/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go new file mode 100644 index 0000000000..82a473bb14 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go @@ -0,0 +1,127 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package rand provides utilities related to randomization. +package rand + +import ( + "math/rand" + "sync" + "time" +) + +var rng = struct { + sync.Mutex + rand *rand.Rand +}{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), +} + +// Int returns a non-negative pseudo-random int. +func Int() int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Int() +} + +// Intn generates an integer in range [0,max). +// By design this should panic if input is invalid, <= 0. +func Intn(max int) int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Intn(max) +} + +// IntnRange generates an integer in range [min,max). +// By design this should panic if input is invalid, <= 0. +func IntnRange(min, max int) int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Intn(max-min) + min +} + +// IntnRange generates an int64 integer in range [min,max). +// By design this should panic if input is invalid, <= 0. +func Int63nRange(min, max int64) int64 { + rng.Lock() + defer rng.Unlock() + return rng.rand.Int63n(max-min) + min +} + +// Seed seeds the rng with the provided seed. +func Seed(seed int64) { + rng.Lock() + defer rng.Unlock() + + rng.rand = rand.New(rand.NewSource(seed)) +} + +// Perm returns, as a slice of n ints, a pseudo-random permutation of the integers [0,n) +// from the default Source. +func Perm(n int) []int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Perm(n) +} + +const ( + // We omit vowels from the set of available characters to reduce the chances + // of "bad words" being formed. + alphanums = "bcdfghjklmnpqrstvwxz2456789" + // No. of bits required to index into alphanums string. + alphanumsIdxBits = 5 + // Mask used to extract last alphanumsIdxBits of an int. + alphanumsIdxMask = 1<>= alphanumsIdxBits + remaining-- + } + return string(b) +} + +// SafeEncodeString encodes s using the same characters as rand.String. This reduces the chances of bad words and +// ensures that strings generated from hash functions appear consistent throughout the API. +func SafeEncodeString(s string) string { + r := make([]byte, len(s)) + for i, b := range []rune(s) { + r[i] = alphanums[(int(b) % len(alphanums))] + } + return string(r) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 7ff9dabe01..09c0aa6cf5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -288,6 +288,7 @@ k8s.io/apimachinery/pkg/util/json k8s.io/apimachinery/pkg/util/mergepatch k8s.io/apimachinery/pkg/util/naming k8s.io/apimachinery/pkg/util/net +k8s.io/apimachinery/pkg/util/rand k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/util/sets k8s.io/apimachinery/pkg/util/strategicpatch