Skip to content

Commit

Permalink
storage capacity: initial implementation
Browse files Browse the repository at this point in the history
This is the producer side of KEP
https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking.

Only deployment together with a central controller is currently
implemented.

When syncing directly whenever there is a change, there's potentially
a larger number of changes emitted. When there are rapid changes (for
example, while a driver gets deployed), it may be better to delay
processing and thus combine multiple changes in a single sync.
  • Loading branch information
pohly committed Aug 6, 2020
1 parent 38d1bf1 commit bcef05d
Show file tree
Hide file tree
Showing 15 changed files with 3,190 additions and 2 deletions.
78 changes: 77 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,21 @@ Note that the external-provisioner does not scale with more replicas. Only one e

* `--kube-api-burst <num>`: Burst for clients that communicate with the kubernetes apiserver. Defaults to `10`.

* `--cloning-protection-threads <num>`: Number of simultaniously running threads, handling cloning finalizer removal. Defaults to `1`.
* `--cloning-protection-threads <num>`: Number of simultaneously running threads, handling cloning finalizer removal. Defaults to `1`.

* `--metrics-address`: The TCP network address where the prometheus metrics endpoint will run (example: `:8080` which corresponds to port 8080 on local host). The default is empty string, which means metrics endpoint is disabled.

* `--metrics-path`: The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.

* `--extra-create-metadata`: Enables the injection of extra PVC and PV metadata as parameters when calling `CreateVolume` on the driver (keys: "csi.storage.k8s.io/pvc/name", "csi.storage.k8s.io/pvc/namespace", "csi.storage.k8s.io/pv/name")

* `--capacity-threads <num>`: Number of simultaneously running threads, handling CSIStorageCapacity objects. Defaults to `1`.

* `--capacity-poll-period <duration>`: How long the external-provisioner waits before checking for storage capacity changes. Defaults to `1m`.

* `--enable-capacity <enumeration>`: Enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. Currently supported: `--enable-capacity=central`.


#### Other recognized arguments
* `--feature-gates <gates>`: A set of comma separated `<feature-name>=<true|false>` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default.

Expand Down Expand Up @@ -102,6 +109,75 @@ Yes | No | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` w
No | Irrelevant | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with randomly selected node topology as first element
No | Irrelevant | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with randomly selected node topology as first element

### Capacity support

> :warning: *Warning:* This is an alpha feature and only supported by
> Kubernetes >= 1.19 if the `CSIStorageCapacity` feature gate is
> enabled.
The external-provisioner can be used to create CSIStorageCapacity
objects that hold information about the storage capacity available
through the driver. The Kubernetes scheduler then [uses that
information](https://kubernetes.io/docs/concepts/storage/storage-capacity]
when selecting nodes for pods with unbound volumes that wait for the
first consumer.

To enable this feature in a driver deployment:
- Set the `POD_NAME` and `POD_NAMESPACE` environment variables like this:
```yaml
env:
- name: WATCH_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
```
- Add `--enable-capacity=central` to the command line flags.
- Add `StorageCapacity: true` to the CSIDriver information object.
Without it, external-provisioner will publish information, but the
Kubernetes scheduler will ignore it. This can be used to first
deploy the driver without that flag, then when sufficient
information has been published, enabled the scheduler usage of it.
- Optional: configure how often external-provisioner polls the driver
to detect changed capacity with `--capacity-poll-period`.
- Optional: configure how many worker threads are used in parallel
with `--capacity-threads`.

In this mode, external-provisioner uses the topology keys and labels
reported by the CSI driver instance on each node to determine how many
different topology segments exist. The expectation is that the driver
reports topology at the granularity of the storage that it has access
to, i.e. a driver that has access to a single, network-attached
storage pools should report the same key/value pair on all
nodes. Drivers will node-local storage have to report different
key/value pairs for each node.

For each segment and each storage class, CSI `GetCapacity` is called
once with the topology of the segment and the parameters of the
class. If there is no error and the capacity is non-zero, a
CSIStorageCapacity object is created respectively updated (if it
already exists from a prior call) with that information. Obsolete
objects are removed.

To ensure that CSIStorageCapacity objects get removed even if the
external-provisioner dies and/or gets removed from the cluster, they
all have an owner. The owner is not the external-provisioner pod
itself but rather its parent. This way, it is possible to switch
between external-provisioner instances without loosing the already
gather information.

CSIStorageCapacity objects are namespaced and get created in the
namespace of the external-provisioner. Only CSIStorageCapacity objects
with the right owner are modified by external-provisioner and their
name is generated, so it is possible to deploy different drivers in
the same namespace. However, Kubernetes does not check who is creating
CSIStorageCapacity objects, so in theory a malfunctioning or malicious
driver deployment could also publish incorrect information about some
other driver.

### CSI error and timeout handling
The external-provisioner invokes all gRPC calls to CSI driver with timeout provided by `--timeout` command line argument (15 seconds by default).

Expand Down
62 changes: 61 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -43,6 +45,8 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/deprecatedflags"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity"
"github.com/kubernetes-csi/external-provisioner/pkg/capacity/topology"
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
)
Expand All @@ -58,7 +62,8 @@ var (
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaniously running threads, handling cloning finalizer removal")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume")
_ = deprecatedflags.Add("provisioner")

Expand All @@ -76,6 +81,13 @@ var (
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

capacityFeatures = func() *capacity.Features {
capacity := &capacity.Features{}
flag.Var(capacity, "enable-capacity", "Enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. Currently supported: --enable-capacity=central.")
return capacity
}()
capacityPollPeriod = flag.Duration("capacity-poll-period", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")

featureGates map[string]bool
provisionController *controller.ProvisionController
version = "unknown"
Expand Down Expand Up @@ -266,6 +278,51 @@ func main() {
controllerCapabilities,
)

var capacityController *capacity.Controller
if (*capacityFeatures)[capacity.FeatureCentral] {
podName := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")
if podName == "" || namespace == "" {
klog.Fatalf("need POD_NAMESPACE/POD_NAME env variables, have only POD_NAMESPACE=%q and POD_NAME=%q", namespace, podName)
}
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
klog.Fatalf("error getting our own pod: %v", err)
}
var controller *metav1.OwnerReference
for _, owner := range pod.OwnerReferences {
if owner.Controller != nil && *owner.Controller {
controller = &owner
break
}
}
if controller == nil {
klog.Fatal("pod does not have a controller which owns it")
}

topologyInformer := topology.NewNodeTopology(
provisionerName,
clientset,
factory.Core().V1().Nodes(),
factory.Storage().V1().CSINodes(),
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
)

capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset,
// TODO: metrics for the queue?!
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
*controller,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
factory.Storage().V1alpha1().CSIStorageCapacities(),
*capacityPollPeriod,
)
}

run := func(ctx context.Context) {
factory.Start(ctx.Done())
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
Expand All @@ -275,6 +332,9 @@ func main() {
}
}

if capacityController != nil {
go capacityController.Run(ctx, int(*capacityThreads))
}
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/component-base v0.19.0-rc.2
k8s.io/csi-translation-lib v0.19.0-rc.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.2.0
k8s.io/kubernetes v1.19.0-rc.2
sigs.k8s.io/sig-storage-lib-external-provisioner/v6 v6.1.0-rc1
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345 h1:2gOG36vt1BhUqpzxwZLZJxUim2dHB05vw+RAn4Q6YOU=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200716221620-18dfb9cca345/go.mod h1:skWido08r9w6Lq/w70DO5XYIKMu4QFu1+4VsqLQuJy8=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
Expand Down
Loading

0 comments on commit bcef05d

Please sign in to comment.