Skip to content

Commit

Permalink
capacity: check casts of watch objects, handle DeletedFinalStateUnknown
Browse files Browse the repository at this point in the history
DeletedFinalStateUnknown is something that may be handed to the delete
callback. It can and should be handled.

Because there might be other, currently unexpected objects in the
future, it's better to handle them gracefully with a checked cast and
proper logging.
  • Loading branch information
pohly committed Aug 13, 2020
1 parent 0b1d448 commit fdc3726
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 15 deletions.
60 changes: 54 additions & 6 deletions pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,34 @@ func NewCentralCapacityController(
// Now register for changes. Depending on the implementation of the informers,
// this may already invoke callbacks.
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.onSCAddOrUpdate(obj.(*storagev1.StorageClass)) },
UpdateFunc: func(_ interface{}, newObj interface{}) { c.onSCAddOrUpdate(newObj.(*storagev1.StorageClass)) },
DeleteFunc: func(obj interface{}) { c.onSCDelete(obj.(*storagev1.StorageClass)) },
AddFunc: func(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("added object: expected StorageClass, got %T -> ignoring it", obj)
return
}
c.onSCAddOrUpdate(sc)
},
UpdateFunc: func(_ interface{}, newObj interface{}) {
sc, ok := newObj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("updated object: expected StorageClass, got %T -> ignoring it", newObj)
return
}
c.onSCAddOrUpdate(sc)
},
DeleteFunc: func(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("deleted object: expected StorageClass, got %T -> ignoring it", obj)
return
}
c.onSCDelete(sc)
},
}
c.scInformer.Informer().AddEventHandler(handler)
c.topologyInformer.AddCallback(c.onTopologyChanges)
Expand Down Expand Up @@ -207,11 +232,34 @@ func (c *Controller) prepare(ctx context.Context) {
// for all objects immediately when adding it.
klog.V(3).Info("Checking for existing CSIStorageCapacity objects")
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.onCAddOrUpdate(ctx, obj.(*storagev1alpha1.CSIStorageCapacity)) },
AddFunc: func(obj interface{}) {
csc, ok := obj.(*storagev1alpha1.CSIStorageCapacity)
if !ok {
klog.Errorf("added object: expected CSIStorageCapacity, got %T -> ignoring it", obj)
return
}
c.onCAddOrUpdate(ctx, csc)
},
UpdateFunc: func(_ interface{}, newObj interface{}) {
c.onCAddOrUpdate(ctx, newObj.(*storagev1alpha1.CSIStorageCapacity))
csc, ok := newObj.(*storagev1alpha1.CSIStorageCapacity)
if !ok {
klog.Errorf("updated object: expected CSIStorageCapacity, got %T -> ignoring it", newObj)
return
}
c.onCAddOrUpdate(ctx, csc)
},
DeleteFunc: func(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
csc, ok := obj.(*storagev1alpha1.CSIStorageCapacity)
if !ok {
klog.Errorf("deleted object: expected CSIStorageCapacity, got %T -> ignoring it", obj)
return
}
c.onCDelete(ctx, csc)
},
DeleteFunc: func(obj interface{}) { c.onCDelete(ctx, obj.(*storagev1alpha1.CSIStorageCapacity)) },
}
c.cInformer.Informer().AddEventHandler(handler)
capacities, err := c.cInformer.Lister().List(labels.Everything())
Expand Down
66 changes: 57 additions & 9 deletions pkg/capacity/topology/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,40 +56,88 @@ func NewNodeTopology(
// a bit and just remember that there is work to be done.
nodeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
klog.V(5).Infof("capacity topology: new node: %s", obj.(*v1.Node).Name)
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("added object: expected Node, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: new node: %s", node.Name)
queue.Add("")
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
if reflect.DeepEqual(oldObj.(*v1.Node).Labels, newObj.(*v1.Node).Labels) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
klog.Errorf("original object: expected Node, got %T -> ignoring it", oldObj)
return
}
newNode, ok := newObj.(*v1.Node)
if !ok {
klog.Errorf("updated object: expected Node, got %T -> ignoring it", newObj)
return
}
if reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
// Shortcut: labels haven't changed, no need to sync.
return
}
klog.V(5).Infof("capacity topology: updated node: %s", newObj.(*v1.Node).Name)
klog.V(5).Infof("capacity topology: updated node: %s", newNode.Name)
queue.Add("")
},
DeleteFunc: func(obj interface{}) {
klog.V(5).Infof("capacity topology: removed node: %s", obj.(*v1.Node).Name)
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("deleted object: expected Node, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: removed node: %s", node.Name)
queue.Add("")
},
}
nodeInformer.Informer().AddEventHandler(nodeHandler)
csiNodeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
klog.V(5).Infof("capacity topology: new CSINode: %s", obj.(*storagev1.CSINode).Name)
csiNode, ok := obj.(*storagev1.CSINode)
if !ok {
klog.Errorf("added object: expected CSINode, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: new CSINode: %s", csiNode.Name)
queue.Add("")
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldKeys := nt.driverTopologyKeys(oldObj.(*storagev1.CSINode))
newKeys := nt.driverTopologyKeys(newObj.(*storagev1.CSINode))
oldCSINode, ok := oldObj.(*storagev1.CSINode)
if !ok {
klog.Errorf("original object: expected CSINode, got %T -> ignoring it", oldObj)
return
}
newCSINode, ok := newObj.(*storagev1.CSINode)
if !ok {
klog.Errorf("updated object: expected CSINode, got %T -> ignoring it", newObj)
return
}
oldKeys := nt.driverTopologyKeys(oldCSINode)
newKeys := nt.driverTopologyKeys(newCSINode)
if reflect.DeepEqual(oldKeys, newKeys) {
// Shortcut: keys haven't changed, no need to sync.
return
}
klog.V(5).Infof("capacity topology: updated CSINode: %s", newObj.(*storagev1.CSINode).Name)
klog.V(5).Infof("capacity topology: updated CSINode: %s", newCSINode.Name)
queue.Add("")
},
DeleteFunc: func(obj interface{}) {
klog.V(5).Infof("capacity topology: removed CSINode: %s", obj.(*storagev1.CSINode).Name)
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
csiNode, ok := obj.(*storagev1.CSINode)
if !ok {
klog.Errorf("deleted object: expected CSINode, got %T -> ignoring it", obj)
return
}
klog.V(5).Infof("capacity topology: removed CSINode: %s", csiNode.Name)
queue.Add("")
},
}
Expand Down

0 comments on commit fdc3726

Please sign in to comment.