Skip to content

Commit

Permalink
storage capacity: initial implementation
Browse files Browse the repository at this point in the history
This is the producer side of KEP
https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking.

Only deployment together with a central controller is currently
implemented.

When syncing directly whenever there is a change, there's potentially
a larger number of changes emitted. When there are rapid changes (for
example, while a driver gets deployed), it may be better to delay
processing and thus combine multiple changes in a single sync.
  • Loading branch information
pohly committed Aug 6, 2020
1 parent 38d1bf1 commit ee5efa9
Show file tree
Hide file tree
Showing 14 changed files with 3,113 additions and 1 deletion.
62 changes: 61 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -43,6 +45,8 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/deprecatedflags"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology"
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
)
Expand All @@ -58,7 +62,8 @@ var (
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaniously running threads, handling cloning finalizer removal")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("storage-capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume")
_ = deprecatedflags.Add("provisioner")

Expand All @@ -76,6 +81,13 @@ var (
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

capacityFeatures = func() *capacity.Features {
capacity := &capacity.Features{}
flag.Var(capacity, "enable-capacity", "Enables producing CSIStorageCapacity objects with capacity information the driver's GetCapacity call. Currently supported: -enable-capacity=central.")
return capacity
}()
capacityPollPeriod = flag.Duration("capacity-poll-period", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes. Defaults to one minute.")

featureGates map[string]bool
provisionController *controller.ProvisionController
version = "unknown"
Expand Down Expand Up @@ -266,6 +278,51 @@ func main() {
controllerCapabilities,
)

var capacityController *capacity.Controller
if (*capacityFeatures)[capacity.FeatureCentral] {
podName := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")
if podName == "" || namespace == "" {
klog.Fatalf("need POD_NAMESPACE/POD_NAME env variables, have only POD_NAMESPACE=%q and POD_NAME=%q", namespace, podName)
}
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
klog.Fatalf("error getting our own pod: %v", err)
}
var controller *metav1.OwnerReference
for _, owner := range pod.OwnerReferences {
if owner.Controller != nil && *owner.Controller {
controller = &owner
break
}
}
if controller == nil {
klog.Fatal("pod does not have a controller which owns it")
}

topologyInformer := topology.NewNodeTopology(
provisionerName,
clientset,
factory.Core().V1().Nodes(),
factory.Storage().V1().CSINodes(),
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
)

capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset,
// TODO: metrics for the queue?!
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
*controller,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
factory.Storage().V1alpha1().CSIStorageCapacities(),
*capacityPollPeriod,
)
}

run := func(ctx context.Context) {
factory.Start(ctx.Done())
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
Expand All @@ -275,6 +332,9 @@ func main() {
}
}

if capacityController != nil {
go capacityController.Run(ctx, int(*capacityThreads))
}
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/component-base v0.19.0-rc.2
k8s.io/csi-translation-lib v0.19.0-rc.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.2.0
k8s.io/kubernetes v1.19.0-rc.2
sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345 h1:2gOG36vt1BhUqpzxwZLZJxUim2dHB05vw+RAn4Q6YOU=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345/go.mod h1:skWido08r9w6Lq/w70DO5XYIKMu4QFu1+4VsqLQuJy8=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
Expand Down
Loading

0 comments on commit ee5efa9

Please sign in to comment.