From f3fb862a4738fe4a60b7472da84c6d5457be9d90 Mon Sep 17 00:00:00 2001 From: "shenmu.wy" Date: Mon, 11 Dec 2023 19:56:20 +0800 Subject: [PATCH 1/5] refactor patchpv and localstorage plugins. --- cmd/controller/antplugins/localstorage.go | 44 ++++++++++++----------- cmd/controller/antplugins/patchpv.go | 15 ++++---- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/cmd/controller/antplugins/localstorage.go b/cmd/controller/antplugins/localstorage.go index 1f7efd3..1aef29f 100644 --- a/cmd/controller/antplugins/localstorage.go +++ b/cmd/controller/antplugins/localstorage.go @@ -6,7 +6,6 @@ import ( "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil" @@ -119,32 +118,35 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin // After remote volume is scheduled, report the local storage size to Node if isVolume && volume != nil && volume.Spec.TargetNodeId != "" && !volume.IsLocal() { - var localStorePct int - var volInState *v1.AntstorVolume - node, err := stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId) + var node *state.Node + node, err = stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId) if err != nil { log.Error(err, "find node failed") return plugin.Result{Error: err} } - sp := node.Pool - - for _, item := range r.ReportLocalConfigs { - selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector) - if err != nil { - log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector) - continue - } - if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault { - localStorePct = item.DefaultLocalStoragePct - log.Info("matched local-storage percentage", "pct", localStorePct) + var sp = node.Pool + + /* + var localStorePct int + var volInState *v1.AntstorVolume + for _, item := range r.ReportLocalConfigs { + selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector) + if err != nil { + log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector) + continue + } + if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault { + localStorePct = item.DefaultLocalStoragePct + log.Info("matched local-storage percentage", "pct", localStorePct) + } } - } - volInState, err = node.GetVolumeByID(volume.Spec.Uuid) - if err == nil { - log.Info("copy volume into state") - *volInState = *volume - } + volInState, err = node.GetVolumeByID(volume.Spec.Uuid) + if err == nil { + log.Info("copy volume into state") + *volInState = *volume + } + */ var expectLocalSize = CalculateLocalStorageCapacity(node) var localSizeStr = strconv.Itoa(int(expectLocalSize)) diff --git a/cmd/controller/antplugins/patchpv.go b/cmd/controller/antplugins/patchpv.go index 9d54707..e42f2f8 100644 --- a/cmd/controller/antplugins/patchpv.go +++ b/cmd/controller/antplugins/patchpv.go @@ -44,7 +44,6 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) { return plugin.Result{} } - log.Info("running PatchPVPlugin") // get pv name from label var pvName string if val, has := volume.Labels[v1.VolumePVNameLabelKey]; has { @@ -53,11 +52,15 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) { pvName = volume.Name } - err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId) - if err != nil { - log.Error(err, "updating PV label failed") - return plugin.Result{ - Error: err, + if volume.Spec.TargetNodeId != "" { + log.Info("patching PV", "nodeId", volume.Spec.TargetNodeId, "pvName", pvName) + + err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId) + if err != nil { + log.Error(err, "updating PV label failed") + return plugin.Result{ + Error: err, + } } } From 255400551373f015ddf33d7f024749847542dd71 Mon Sep 17 00:00:00 2001 From: "shenmu.wy" Date: Tue, 12 Dec 2023 14:05:13 +0800 Subject: [PATCH 2/5] add reasons to filters --- pkg/controller/manager/scheduler/filter/affinity.go | 4 ++++ pkg/controller/manager/scheduler/filter/basic.go | 1 + pkg/controller/manager/scheduler/filter/error.go | 3 +++ 3 files changed, 8 insertions(+) diff --git a/pkg/controller/manager/scheduler/filter/affinity.go b/pkg/controller/manager/scheduler/filter/affinity.go index e2a1386..3994632 100644 --- a/pkg/controller/manager/scheduler/filter/affinity.go +++ b/pkg/controller/manager/scheduler/filter/affinity.go @@ -18,6 +18,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume match, err := schedcore.MatchNodeSelectorTerms(convertNodeInfo(n.Info), vol.Spec.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) if !match || err != nil { klog.Infof("[SchedFail] vol=%s Pool %s NodeAffnity fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonNodeAffinity) return false } } @@ -31,6 +32,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume matched := selector.Matches(labels.Set(n.Info.Labels)) if !matched { klog.Infof("[SchedFail] vol=%s Pool %s NodeLabelSelector fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonNodeAffinity) return false } } @@ -41,6 +43,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume match, err := schedcore.MatchNodeSelectorTerms(convertPoolLabels(n.Pool.Labels), vol.Spec.PoolAffinity.RequiredDuringSchedulingIgnoredDuringExecution) if !match || err != nil { klog.Infof("[SchedFail] vol=%s Pool %s PoolAffinity fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonPoolAffinity) return false } } @@ -52,6 +55,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume matched := selector.Matches(labels.Set(n.Pool.Labels)) if !matched { klog.Infof("[SchedFail] vol=%s Pool %s PoolLabelSelector fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonPoolAffinity) return false } } diff --git a/pkg/controller/manager/scheduler/filter/basic.go b/pkg/controller/manager/scheduler/filter/basic.go index 8132bdd..9c9956e 100644 --- a/pkg/controller/manager/scheduler/filter/basic.go +++ b/pkg/controller/manager/scheduler/filter/basic.go @@ -11,6 +11,7 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b // consider Pool status if !n.Pool.IsSchedulable() { klog.Infof("[SchedFail] vol=%s Pool %s status is %s, or check Pool labels", vol.Name, n.Pool.Name, n.Pool.Status.Status) + ctx.Error.AddReason(ReasonPoolUnschedulable) return false } diff --git a/pkg/controller/manager/scheduler/filter/error.go b/pkg/controller/manager/scheduler/filter/error.go index 07ae84b..dc2d24a 100644 --- a/pkg/controller/manager/scheduler/filter/error.go +++ b/pkg/controller/manager/scheduler/filter/error.go @@ -13,6 +13,9 @@ const ( ReasonPositionNotMatch = "PositionNotMatch" ReasonVolTypeNotMatch = "VolTypeNotMatch" ReasonDataConflict = "DataConflict" + ReasonNodeAffinity = "NodeAffinity" + ReasonPoolAffinity = "PoolAffinity" + ReasonPoolUnschedulable = "PoolUnschedulable" NoStoragePoolAvailable = "NoStoragePoolAvailable" // From 5592a5b3b1e6cbd0d0777f8d8fae26034daecca1 Mon Sep 17 00:00:00 2001 From: "shenmu.wy" Date: Wed, 13 Dec 2023 10:23:31 +0800 Subject: [PATCH 3/5] watch node events and push them to pool reconciler --- pkg/controller/manager/controllers/cmd.go | 30 ++--- .../reconciler/handler/node_handler.go | 106 ++++++++++++++---- .../manager/reconciler/plugable_reconciler.go | 9 ++ 3 files changed, 107 insertions(+), 38 deletions(-) diff --git a/pkg/controller/manager/controllers/cmd.go b/pkg/controller/manager/controllers/cmd.go index ba9d9dd..b44f50e 100644 --- a/pkg/controller/manager/controllers/cmd.go +++ b/pkg/controller/manager/controllers/cmd.go @@ -9,7 +9,6 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/agent" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" hostnvme "code.alipay.com/dbplatform/node-disk-controller/pkg/host-nvme" "code.alipay.com/dbplatform/node-disk-controller/pkg/util" @@ -17,11 +16,9 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/version" "github.com/spf13/cobra" "github.com/spf13/pflag" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" cligoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" rt "sigs.k8s.io/controller-runtime" ) @@ -173,18 +170,21 @@ func (o *OperatorOption) Run() { ctx := rt.SetupSignalHandler() - // create NodeInformer to sync nodes to cache - nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{}) - if err != nil { - klog.Fatal(err) - } - nodeHandler := &handler.NodeEventHandler{ - Cfg: cfg, - } - nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: nodeHandler.FilterObject, - Handler: nodeHandler, - }) + /* + // create NodeInformer to sync nodes to cache + // moved to pool reconciler + nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{}) + if err != nil { + klog.Fatal(err) + } + nodeHandler := &handler.NodeEventHandler{ + Cfg: cfg, + } + nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: nodeHandler.FilterObject, + Handler: nodeHandler, + }) + */ go func() { klog.Info("manager start working") diff --git a/pkg/controller/manager/reconciler/handler/node_handler.go b/pkg/controller/manager/reconciler/handler/node_handler.go index b95baac..a177806 100644 --- a/pkg/controller/manager/reconciler/handler/node_handler.go +++ b/pkg/controller/manager/reconciler/handler/node_handler.go @@ -1,44 +1,84 @@ package handler import ( - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" ) var ( _ cache.ResourceEventHandler = &NodeEventHandler{} + _ handler.EventHandler = &NodeEventHandler{} ) type NodeEventHandler struct { Cfg config.Config } -func (e *NodeEventHandler) FilterObject(obj interface{}) bool { - /* - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - return false - } - - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return false - } - */ - - if len(e.Cfg.Scheduler.NodeCacheSelector) == 0 { - return true +// Create implements EventHandler. +func (e *NodeEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +// Update implements EventHandler. +func (e *NodeEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.ObjectNew) { + return } - if node, ok := obj.(*corev1.Node); ok { - selector := labels.SelectorFromSet(labels.Set(e.Cfg.Scheduler.NodeCacheSelector)) - if selector.Matches(labels.Set(node.Labels)) { - klog.Info("matched node to cache: ", node.Name) - return true - } + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.ObjectNew.GetName(), + }}) +} + +// Delete implements EventHandler. +func (e *NodeEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +// Generic implements EventHandler. +func (e *NodeEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +func (e *NodeEventHandler) FilterObject(obj interface{}) bool { + if metaObj, ok := obj.(client.Object); ok { + return validateNode(&e.Cfg, metaObj) } return false @@ -70,3 +110,23 @@ func (e *NodeEventHandler) OnDelete(obj interface{}) { } klog.Infof("delete Node %s", key) } + +func validateNode(cfg *config.Config, obj client.Object) bool { + if obj == nil { + klog.Error(nil, "NodeEvent received with nil object") + return false + } + + // no config of NodeCacheSelector means appcet all nodes events + if len(cfg.Scheduler.NodeCacheSelector) == 0 { + return true + } + + selector := labels.SelectorFromSet(labels.Set(cfg.Scheduler.NodeCacheSelector)) + if selector.Matches(labels.Set(obj.GetLabels())) { + klog.Info("matched node to cache: ", obj.GetName()) + return true + } + + return false +} diff --git a/pkg/controller/manager/reconciler/plugable_reconciler.go b/pkg/controller/manager/reconciler/plugable_reconciler.go index dd07b6f..d8f8f9b 100644 --- a/pkg/controller/manager/reconciler/plugable_reconciler.go +++ b/pkg/controller/manager/reconciler/plugable_reconciler.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,7 +16,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" @@ -43,6 +47,7 @@ type PlugableReconciler struct { client.Client plugin.Plugable + Cfg config.Config KubeCli kubernetes.Interface State state.StateIface Log logr.Logger @@ -77,6 +82,10 @@ func (r *PlugableReconciler) SetupWithManager(mgr ctrl.Manager) error { MaxConcurrentReconciles: r.Concurrency, }). For(r.WatchType). + // watch node + Watches(&source.Kind{Type: &corev1.Node{}}, &handler.NodeEventHandler{ + Cfg: r.Cfg, + }). Complete(r) } From b4519dbd8f5bb71566296a3846289ea492f79626 Mon Sep 17 00:00:00 2001 From: "shenmu.wy" Date: Wed, 13 Dec 2023 10:23:31 +0800 Subject: [PATCH 4/5] add reasons to filters; watch node events and push them to pool reconciler --- pkg/controller/manager/controllers/cmd.go | 30 ++--- pkg/controller/manager/controllers/manager.go | 37 +++--- .../reconciler/handler/node_handler.go | 106 ++++++++++++++---- .../manager/reconciler/plugable_reconciler.go | 25 ++++- .../manager/scheduler/filter/affinity.go | 4 + .../manager/scheduler/filter/basic.go | 1 + .../manager/scheduler/filter/error.go | 3 + 7 files changed, 149 insertions(+), 57 deletions(-) diff --git a/pkg/controller/manager/controllers/cmd.go b/pkg/controller/manager/controllers/cmd.go index ba9d9dd..b44f50e 100644 --- a/pkg/controller/manager/controllers/cmd.go +++ b/pkg/controller/manager/controllers/cmd.go @@ -9,7 +9,6 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/agent" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" hostnvme "code.alipay.com/dbplatform/node-disk-controller/pkg/host-nvme" "code.alipay.com/dbplatform/node-disk-controller/pkg/util" @@ -17,11 +16,9 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/version" "github.com/spf13/cobra" "github.com/spf13/pflag" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" cligoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" rt "sigs.k8s.io/controller-runtime" ) @@ -173,18 +170,21 @@ func (o *OperatorOption) Run() { ctx := rt.SetupSignalHandler() - // create NodeInformer to sync nodes to cache - nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{}) - if err != nil { - klog.Fatal(err) - } - nodeHandler := &handler.NodeEventHandler{ - Cfg: cfg, - } - nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: nodeHandler.FilterObject, - Handler: nodeHandler, - }) + /* + // create NodeInformer to sync nodes to cache + // moved to pool reconciler + nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{}) + if err != nil { + klog.Fatal(err) + } + nodeHandler := &handler.NodeEventHandler{ + Cfg: cfg, + } + nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: nodeHandler.FilterObject, + Handler: nodeHandler, + }) + */ go func() { klog.Info("manager start working") diff --git a/pkg/controller/manager/controllers/manager.go b/pkg/controller/manager/controllers/manager.go index ace6d24..2069a8d 100644 --- a/pkg/controller/manager/controllers/manager.go +++ b/pkg/controller/manager/controllers/manager.go @@ -5,25 +5,28 @@ import ( // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + rt "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" sched "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" "code.alipay.com/dbplatform/node-disk-controller/pkg/generated/clientset/versioned" "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - rt "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" ) type B64EncodedMysqlDSN string @@ -100,7 +103,15 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { PoolUtil: poolUtil, KubeCli: kubeClient, }, - WatchType: &v1.StoragePool{}, + ForType: &v1.StoragePool{}, + Watches: []reconciler.WatchObject{ + { + Source: &source.Kind{Type: &corev1.Node{}}, + EventHandler: &handler.NodeEventHandler{ + Cfg: req.ControllerConfig, + }, + }, + }, } if err = poolReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller StoragePoolReconciler") @@ -123,7 +134,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { AntstoreCli: antstorCli, Scheduler: scheduler, }, - WatchType: &v1.AntstorVolume{}, + ForType: &v1.AntstorVolume{}, } if err = volReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller VolumeReconciler") @@ -145,7 +156,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { Scheduler: scheduler, State: stateObj, }, - WatchType: &v1.AntstorVolumeGroup{}, + ForType: &v1.AntstorVolumeGroup{}, } if err = volGroupReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller VolumeGroupReconciler") @@ -165,7 +176,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { MainHandler: &reconciler.AntstorDataControlReconcileHandler{ Client: mgr.GetClient(), }, - WatchType: &v1.AntstorDataControl{}, + ForType: &v1.AntstorDataControl{}, } if err = dataControlReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller AntstorDataControlReconciler") diff --git a/pkg/controller/manager/reconciler/handler/node_handler.go b/pkg/controller/manager/reconciler/handler/node_handler.go index b95baac..a177806 100644 --- a/pkg/controller/manager/reconciler/handler/node_handler.go +++ b/pkg/controller/manager/reconciler/handler/node_handler.go @@ -1,44 +1,84 @@ package handler import ( - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" ) var ( _ cache.ResourceEventHandler = &NodeEventHandler{} + _ handler.EventHandler = &NodeEventHandler{} ) type NodeEventHandler struct { Cfg config.Config } -func (e *NodeEventHandler) FilterObject(obj interface{}) bool { - /* - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - return false - } - - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return false - } - */ - - if len(e.Cfg.Scheduler.NodeCacheSelector) == 0 { - return true +// Create implements EventHandler. +func (e *NodeEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +// Update implements EventHandler. +func (e *NodeEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.ObjectNew) { + return } - if node, ok := obj.(*corev1.Node); ok { - selector := labels.SelectorFromSet(labels.Set(e.Cfg.Scheduler.NodeCacheSelector)) - if selector.Matches(labels.Set(node.Labels)) { - klog.Info("matched node to cache: ", node.Name) - return true - } + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.ObjectNew.GetName(), + }}) +} + +// Delete implements EventHandler. +func (e *NodeEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +// Generic implements EventHandler. +func (e *NodeEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + if !validateNode(&e.Cfg, evt.Object) { + return + } + + // Node name is same with StoragePool name + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: v1.DefaultNamespace, + Name: evt.Object.GetName(), + }}) +} + +func (e *NodeEventHandler) FilterObject(obj interface{}) bool { + if metaObj, ok := obj.(client.Object); ok { + return validateNode(&e.Cfg, metaObj) } return false @@ -70,3 +110,23 @@ func (e *NodeEventHandler) OnDelete(obj interface{}) { } klog.Infof("delete Node %s", key) } + +func validateNode(cfg *config.Config, obj client.Object) bool { + if obj == nil { + klog.Error(nil, "NodeEvent received with nil object") + return false + } + + // no config of NodeCacheSelector means appcet all nodes events + if len(cfg.Scheduler.NodeCacheSelector) == 0 { + return true + } + + selector := labels.SelectorFromSet(labels.Set(cfg.Scheduler.NodeCacheSelector)) + if selector.Matches(labels.Set(obj.GetLabels())) { + klog.Info("matched node to cache: ", obj.GetName()) + return true + } + + return false +} diff --git a/pkg/controller/manager/reconciler/plugable_reconciler.go b/pkg/controller/manager/reconciler/plugable_reconciler.go index dd07b6f..ff55dba 100644 --- a/pkg/controller/manager/reconciler/plugable_reconciler.go +++ b/pkg/controller/manager/reconciler/plugable_reconciler.go @@ -14,7 +14,9 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + crhandler "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" @@ -39,6 +41,11 @@ type SetupWithManagerProvider interface { type SetupWithManagerFn func(r reconcile.Reconciler, mgr ctrl.Manager) error +type WatchObject struct { + Source source.Source + EventHandler crhandler.EventHandler +} + type PlugableReconciler struct { client.Client plugin.Plugable @@ -48,7 +55,8 @@ type PlugableReconciler struct { Log logr.Logger Concurrency int - WatchType client.Object + ForType client.Object + Watches []WatchObject MainHandler ReconcileHandler Lock misc.ResourceLockIface @@ -68,16 +76,21 @@ func (r *PlugableReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Lock = misc.NewResourceLocks() } - if r.WatchType == nil { - return fmt.Errorf("WatchType is nil") + if r.ForType == nil { + return fmt.Errorf("ForType is nil") } - return ctrl.NewControllerManagedBy(mgr). + bld := ctrl.NewControllerManagedBy(mgr). WithOptions(controller.Options{ MaxConcurrentReconciles: r.Concurrency, }). - For(r.WatchType). - Complete(r) + For(r.ForType) + + for _, item := range r.Watches { + bld = bld.Watches(item.Source, item.EventHandler) + } + + return bld.Complete(r) } func (r *PlugableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { diff --git a/pkg/controller/manager/scheduler/filter/affinity.go b/pkg/controller/manager/scheduler/filter/affinity.go index e2a1386..3994632 100644 --- a/pkg/controller/manager/scheduler/filter/affinity.go +++ b/pkg/controller/manager/scheduler/filter/affinity.go @@ -18,6 +18,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume match, err := schedcore.MatchNodeSelectorTerms(convertNodeInfo(n.Info), vol.Spec.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) if !match || err != nil { klog.Infof("[SchedFail] vol=%s Pool %s NodeAffnity fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonNodeAffinity) return false } } @@ -31,6 +32,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume matched := selector.Matches(labels.Set(n.Info.Labels)) if !matched { klog.Infof("[SchedFail] vol=%s Pool %s NodeLabelSelector fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonNodeAffinity) return false } } @@ -41,6 +43,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume match, err := schedcore.MatchNodeSelectorTerms(convertPoolLabels(n.Pool.Labels), vol.Spec.PoolAffinity.RequiredDuringSchedulingIgnoredDuringExecution) if !match || err != nil { klog.Infof("[SchedFail] vol=%s Pool %s PoolAffinity fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonPoolAffinity) return false } } @@ -52,6 +55,7 @@ func AffinityFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume matched := selector.Matches(labels.Set(n.Pool.Labels)) if !matched { klog.Infof("[SchedFail] vol=%s Pool %s PoolLabelSelector fail", vol.Name, n.Pool.Name) + ctx.Error.AddReason(ReasonPoolAffinity) return false } } diff --git a/pkg/controller/manager/scheduler/filter/basic.go b/pkg/controller/manager/scheduler/filter/basic.go index 8132bdd..9c9956e 100644 --- a/pkg/controller/manager/scheduler/filter/basic.go +++ b/pkg/controller/manager/scheduler/filter/basic.go @@ -11,6 +11,7 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b // consider Pool status if !n.Pool.IsSchedulable() { klog.Infof("[SchedFail] vol=%s Pool %s status is %s, or check Pool labels", vol.Name, n.Pool.Name, n.Pool.Status.Status) + ctx.Error.AddReason(ReasonPoolUnschedulable) return false } diff --git a/pkg/controller/manager/scheduler/filter/error.go b/pkg/controller/manager/scheduler/filter/error.go index 07ae84b..dc2d24a 100644 --- a/pkg/controller/manager/scheduler/filter/error.go +++ b/pkg/controller/manager/scheduler/filter/error.go @@ -13,6 +13,9 @@ const ( ReasonPositionNotMatch = "PositionNotMatch" ReasonVolTypeNotMatch = "VolTypeNotMatch" ReasonDataConflict = "DataConflict" + ReasonNodeAffinity = "NodeAffinity" + ReasonPoolAffinity = "PoolAffinity" + ReasonPoolUnschedulable = "PoolUnschedulable" NoStoragePoolAvailable = "NoStoragePoolAvailable" // From 0b765510c6f218065087654cbc0624183599bfa5 Mon Sep 17 00:00:00 2001 From: "shenmu.wy" Date: Wed, 13 Dec 2023 11:30:33 +0800 Subject: [PATCH 5/5] setup indexes in plugable reconciler --- pkg/controller/manager/controllers/manager.go | 32 +++++++++++++++++ .../manager/reconciler/plugable_reconciler.go | 24 +++++++------ .../manager/reconciler/volume_reconciler.go | 36 ------------------- 3 files changed, 45 insertions(+), 47 deletions(-) diff --git a/pkg/controller/manager/controllers/manager.go b/pkg/controller/manager/controllers/manager.go index 2069a8d..067864b 100644 --- a/pkg/controller/manager/controllers/manager.go +++ b/pkg/controller/manager/controllers/manager.go @@ -12,6 +12,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" rt "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -135,6 +136,37 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { Scheduler: scheduler, }, ForType: &v1.AntstorVolume{}, + Watches: []reconciler.WatchObject{ + { + Source: &source.Kind{Type: &v1.AntstorVolume{}}, + EventHandler: &handler.VolumeEventHandler{ + State: stateObj, + }}, + }, + Indexes: []reconciler.IndexObject{ + { + Obj: &v1.AntstorVolume{}, + Field: v1.IndexKeyUUID, + ExtractValue: func(rawObj client.Object) []string { + // grab the volume, extract the uuid + if vol, ok := rawObj.(*v1.AntstorVolume); ok { + return []string{vol.Spec.Uuid} + } + return nil + }, + }, + { + Obj: &v1.AntstorVolume{}, + Field: v1.IndexKeyTargetNodeID, + ExtractValue: func(rawObj client.Object) []string { + // grab the volume, extract the targetNodeId + if vol, ok := rawObj.(*v1.AntstorVolume); ok { + return []string{vol.Spec.TargetNodeId} + } + return nil + }, + }, + }, } if err = volReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller VolumeReconciler") diff --git a/pkg/controller/manager/reconciler/plugable_reconciler.go b/pkg/controller/manager/reconciler/plugable_reconciler.go index d70cd17..b0b428e 100644 --- a/pkg/controller/manager/reconciler/plugable_reconciler.go +++ b/pkg/controller/manager/reconciler/plugable_reconciler.go @@ -36,17 +36,17 @@ type ReconcileHandler interface { HandleDeletion(*plugin.Context) plugin.Result } -type SetupWithManagerProvider interface { - GetSetupWithManagerFn() SetupWithManagerFn -} - -type SetupWithManagerFn func(r reconcile.Reconciler, mgr ctrl.Manager) error - type WatchObject struct { Source source.Source EventHandler crhandler.EventHandler } +type IndexObject struct { + Obj client.Object + Field string + ExtractValue client.IndexerFunc +} + type PlugableReconciler struct { client.Client plugin.Plugable @@ -59,6 +59,7 @@ type PlugableReconciler struct { Concurrency int ForType client.Object Watches []WatchObject + Indexes []IndexObject MainHandler ReconcileHandler Lock misc.ResourceLockIface @@ -66,11 +67,6 @@ type PlugableReconciler struct { // SetupWithManager sets up the controller with the Manager. func (r *PlugableReconciler) SetupWithManager(mgr ctrl.Manager) error { - if setupProvider, ok := r.MainHandler.(SetupWithManagerProvider); ok { - fn := setupProvider.GetSetupWithManagerFn() - return fn(r, mgr) - } - if r.Concurrency <= 0 { r.Concurrency = 1 } @@ -92,6 +88,12 @@ func (r *PlugableReconciler) SetupWithManager(mgr ctrl.Manager) error { bld = bld.Watches(item.Source, item.EventHandler) } + // setup indexer + // example code: https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/book/src/cronjob-tutorial/testdata/project/controllers/cronjob_controller.go#L548 + for _, item := range r.Indexes { + mgr.GetFieldIndexer().IndexField(context.Background(), item.Obj, item.Field, item.ExtractValue) + } + return bld.Complete(r) } diff --git a/pkg/controller/manager/reconciler/volume_reconciler.go b/pkg/controller/manager/reconciler/volume_reconciler.go index 256ec15..7531566 100644 --- a/pkg/controller/manager/reconciler/volume_reconciler.go +++ b/pkg/controller/manager/reconciler/volume_reconciler.go @@ -7,7 +7,6 @@ import ( "time" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" sched "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler/filter" @@ -20,9 +19,7 @@ import ( "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -45,39 +42,6 @@ type AntstorVolumeReconcileHandler struct { // EventRecorder record.EventRecorder } -// SetupWithManager sets up the controller with the Manager. -func (rh *AntstorVolumeReconcileHandler) GetSetupWithManagerFn() SetupWithManagerFn { - return func(r reconcile.Reconciler, mgr ctrl.Manager) error { - // setup indexer - // example code: https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/book/src/cronjob-tutorial/testdata/project/controllers/cronjob_controller.go#L548 - mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyUUID, func(rawObj client.Object) []string { - // grab the volume, extract the uuid - if vol, ok := rawObj.(*v1.AntstorVolume); ok { - return []string{vol.Spec.Uuid} - } - return nil - }) - - mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyTargetNodeID, func(rawObj client.Object) []string { - // grab the volume, extract the targetNodeId - if vol, ok := rawObj.(*v1.AntstorVolume); ok { - return []string{vol.Spec.TargetNodeId} - } - return nil - }) - - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - For(&v1.AntstorVolume{}). - Watches(&source.Kind{Type: &v1.AntstorVolume{}}, &handler.VolumeEventHandler{ - State: rh.State, - }). - Complete(r) - } -} - func (r *AntstorVolumeReconcileHandler) ResourceName() string { return "AntstorVolume" }