Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setup watches and indexes to PlugableReconciler #10

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions cmd/controller/antplugins/localstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
Expand Down Expand Up @@ -119,32 +118,35 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin

// After remote volume is scheduled, report the local storage size to Node
if isVolume && volume != nil && volume.Spec.TargetNodeId != "" && !volume.IsLocal() {
var localStorePct int
var volInState *v1.AntstorVolume
node, err := stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId)
var node *state.Node
node, err = stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "find node failed")
return plugin.Result{Error: err}
}
sp := node.Pool

for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
var sp = node.Pool

/*
var localStorePct int
var volInState *v1.AntstorVolume
for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
}
}
}

volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
*/

var expectLocalSize = CalculateLocalStorageCapacity(node)
var localSizeStr = strconv.Itoa(int(expectLocalSize))
Expand Down
15 changes: 9 additions & 6 deletions cmd/controller/antplugins/patchpv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) {
return plugin.Result{}
}

log.Info("running PatchPVPlugin")
// get pv name from label
var pvName string
if val, has := volume.Labels[v1.VolumePVNameLabelKey]; has {
Expand All @@ -53,11 +52,15 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) {
pvName = volume.Name
}

err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "updating PV label failed")
return plugin.Result{
Error: err,
if volume.Spec.TargetNodeId != "" {
log.Info("patching PV", "nodeId", volume.Spec.TargetNodeId, "pvName", pvName)

err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "updating PV label failed")
return plugin.Result{
Error: err,
}
}
}

Expand Down
30 changes: 15 additions & 15 deletions pkg/controller/manager/controllers/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,16 @@ import (
"code.alipay.com/dbplatform/node-disk-controller/pkg/agent"
v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
hostnvme "code.alipay.com/dbplatform/node-disk-controller/pkg/host-nvme"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
"code.alipay.com/dbplatform/node-disk-controller/pkg/version"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
cligoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -173,18 +170,21 @@ func (o *OperatorOption) Run() {

ctx := rt.SetupSignalHandler()

// create NodeInformer to sync nodes to cache
nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{})
if err != nil {
klog.Fatal(err)
}
nodeHandler := &handler.NodeEventHandler{
Cfg: cfg,
}
nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: nodeHandler.FilterObject,
Handler: nodeHandler,
})
/*
// create NodeInformer to sync nodes to cache
// moved to pool reconciler
nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{})
if err != nil {
klog.Fatal(err)
}
nodeHandler := &handler.NodeEventHandler{
Cfg: cfg,
}
nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: nodeHandler.FilterObject,
Handler: nodeHandler,
})
*/

go func() {
klog.Info("manager start working")
Expand Down
69 changes: 56 additions & 13 deletions pkg/controller/manager/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ import (

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin"
sched "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
"code.alipay.com/dbplatform/node-disk-controller/pkg/generated/clientset/versioned"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type B64EncodedMysqlDSN string
Expand Down Expand Up @@ -100,7 +104,15 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
PoolUtil: poolUtil,
KubeCli: kubeClient,
},
WatchType: &v1.StoragePool{},
ForType: &v1.StoragePool{},
Watches: []reconciler.WatchObject{
{
Source: &source.Kind{Type: &corev1.Node{}},
EventHandler: &handler.NodeEventHandler{
Cfg: req.ControllerConfig,
},
},
},
}
if err = poolReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller StoragePoolReconciler")
Expand All @@ -123,7 +135,38 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
AntstoreCli: antstorCli,
Scheduler: scheduler,
},
WatchType: &v1.AntstorVolume{},
ForType: &v1.AntstorVolume{},
Watches: []reconciler.WatchObject{
{
Source: &source.Kind{Type: &v1.AntstorVolume{}},
EventHandler: &handler.VolumeEventHandler{
State: stateObj,
}},
},
Indexes: []reconciler.IndexObject{
{
Obj: &v1.AntstorVolume{},
Field: v1.IndexKeyUUID,
ExtractValue: func(rawObj client.Object) []string {
// grab the volume, extract the uuid
if vol, ok := rawObj.(*v1.AntstorVolume); ok {
return []string{vol.Spec.Uuid}
}
return nil
},
},
{
Obj: &v1.AntstorVolume{},
Field: v1.IndexKeyTargetNodeID,
ExtractValue: func(rawObj client.Object) []string {
// grab the volume, extract the targetNodeId
if vol, ok := rawObj.(*v1.AntstorVolume); ok {
return []string{vol.Spec.TargetNodeId}
}
return nil
},
},
},
}
if err = volReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller VolumeReconciler")
Expand All @@ -145,7 +188,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
Scheduler: scheduler,
State: stateObj,
},
WatchType: &v1.AntstorVolumeGroup{},
ForType: &v1.AntstorVolumeGroup{},
}
if err = volGroupReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller VolumeGroupReconciler")
Expand All @@ -165,7 +208,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
MainHandler: &reconciler.AntstorDataControlReconcileHandler{
Client: mgr.GetClient(),
},
WatchType: &v1.AntstorDataControl{},
ForType: &v1.AntstorDataControl{},
}
if err = dataControlReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller AntstorDataControlReconciler")
Expand Down
106 changes: 83 additions & 23 deletions pkg/controller/manager/reconciler/handler/node_handler.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,84 @@
package handler

import (
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
)

var (
_ cache.ResourceEventHandler = &NodeEventHandler{}
_ handler.EventHandler = &NodeEventHandler{}
)

type NodeEventHandler struct {
Cfg config.Config
}

func (e *NodeEventHandler) FilterObject(obj interface{}) bool {
/*
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return false
}

_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false
}
*/

if len(e.Cfg.Scheduler.NodeCacheSelector) == 0 {
return true
// Create implements EventHandler.
func (e *NodeEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

// Update implements EventHandler.
func (e *NodeEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.ObjectNew) {
return
}

if node, ok := obj.(*corev1.Node); ok {
selector := labels.SelectorFromSet(labels.Set(e.Cfg.Scheduler.NodeCacheSelector))
if selector.Matches(labels.Set(node.Labels)) {
klog.Info("matched node to cache: ", node.Name)
return true
}
// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.ObjectNew.GetName(),
}})
}

// Delete implements EventHandler.
func (e *NodeEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

// Generic implements EventHandler.
func (e *NodeEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

func (e *NodeEventHandler) FilterObject(obj interface{}) bool {
if metaObj, ok := obj.(client.Object); ok {
return validateNode(&e.Cfg, metaObj)
}

return false
Expand Down Expand Up @@ -70,3 +110,23 @@ func (e *NodeEventHandler) OnDelete(obj interface{}) {
}
klog.Infof("delete Node %s", key)
}

func validateNode(cfg *config.Config, obj client.Object) bool {
if obj == nil {
klog.Error(nil, "NodeEvent received with nil object")
return false
}

// no config of NodeCacheSelector means appcet all nodes events
if len(cfg.Scheduler.NodeCacheSelector) == 0 {
return true
}

selector := labels.SelectorFromSet(labels.Set(cfg.Scheduler.NodeCacheSelector))
if selector.Matches(labels.Set(obj.GetLabels())) {
klog.Info("matched node to cache: ", obj.GetName())
return true
}

return false
}
Loading