From 73c8d15727c68c2f2d2114f7f647e9c4417fe096 Mon Sep 17 00:00:00 2001 From: Nikhil-Ladha Date: Thu, 24 Oct 2024 13:05:19 +0530 Subject: [PATCH] cephfs: add modfify volume support add ControllerModifyVolume RPC support for cephfs Signed-off-by: Nikhil-Ladha --- internal/cephfs/controllerserver.go | 61 ++++++++++++++++++++++++++ internal/cephfs/driver.go | 1 + internal/cephfs/store/volumeoptions.go | 2 +- internal/cephfs/validator.go | 18 ++++++++ internal/util/idlocker.go | 42 +++++++++++++++++- 5 files changed, 121 insertions(+), 3 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index f41537b5713..f0382f5ed1a 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -752,6 +752,67 @@ func (cs *ControllerServer) ControllerExpandVolume( }, nil } +// ControllerExpandVolume expands CephFS Volumes on demand based on resizer request. +func (cs *ControllerServer) ControllerModifyVolume( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, +) (*csi.ControllerModifyVolumeResponse, error) { + if err := cs.validateModifyVolumeRequest(req); err != nil { + log.ErrorLog(ctx, "ControllerModifyVolumeRequest validation failed: %v", err) + + return nil, err + } + + volID := req.GetVolumeId() + secret := req.GetSecrets() + params := req.GetMutableParameters() + + // lock out parallel delete operations + if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID) + } + defer cs.VolumeLocks.Release(volID) + + // lock out volumeID for clone, delete, expand and restore operation + if err := cs.OperationLocks.GetModifyLock(volID); err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseModifyLock(volID) + + cr, err := util.NewAdminCredentials(secret) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + + volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, params, secret, + cs.ClusterName, cs.SetMetadata) + if err != nil { + log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) + + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer volOptions.Destroy() + + if volOptions.BackingSnapshot { + return nil, status.Error(codes.InvalidArgument, "cannot modify snapshot-backed volume") + } + + volClient := core.NewSubVolume(volOptions.GetConnection(), + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) + if err = volClient.CreateVolume(ctx); err != nil { + log.ErrorLog(ctx, "failed to modify volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.ControllerModifyVolumeResponse{}, nil +} + // CreateSnapshot creates the snapshot in backend and stores metadata // in store // diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 8023bb1a798..084884f29b5 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -139,6 +139,7 @@ func (fs *Driver) Run(conf *util.Config) { csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_CLONE_VOLUME, csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, }) fs.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 285b15ffcdb..61ef9292ecb 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -221,7 +221,7 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error { optName, actual, expected) } -// getVolumeOptions validates the basic required basic options provided in the +// getVolumeOptions validates the required basic options provided in the // volume parameters and extract the volumeOptions from volume parameters. // It contains the following checks: // - clusterID must be set diff --git a/internal/cephfs/validator.go b/internal/cephfs/validator.go index 74099fc9dbe..8db8272cc47 100644 --- a/internal/cephfs/validator.go +++ b/internal/cephfs/validator.go @@ -111,3 +111,21 @@ func (cs *ControllerServer) validateExpandVolumeRequest(req *csi.ControllerExpan return nil } + +// validateModifyVolumeRequest validates the Controller ModifyVolume request. +func (cs *ControllerServer) validateModifyVolumeRequest(req *csi.ControllerModifyVolumeRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_MODIFY_VOLUME); err != nil { + return fmt.Errorf("invalid ModifyVolumeRequest: %w", err) + } + + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "Volume ID cannot be empty") + } + + mutableParam := req.GetMutableParameters() + if mutableParam == nil { + return status.Error(codes.InvalidArgument, "MutableParameters cannot be empty") + } + + return nil +} diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c9c..65caa30b127 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -72,6 +72,7 @@ const ( cloneOpt operation = "clone" restoreOp operation = "restore" expandOp operation = "expand" + modifyOp operation = "modify" ) // OperationLock implements a map with atomic operations. @@ -99,6 +100,7 @@ func NewOperationLock() *OperationLock { lock[cloneOpt] = make(map[string]int) lock[restoreOp] = make(map[string]int) lock[expandOp] = make(map[string]int) + lock[modifyOp] = make(map[string]int) return &OperationLock{ locks: lock, @@ -162,12 +164,37 @@ func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { if _, ok := ol.locks[cloneOpt][volumeID]; ok { return fmt.Errorf("a Clone operation with given id %s already exists", volumeID) } - // check any delete operation is going on for given volume ID + // check any create operation is going on for given volume ID if _, ok := ol.locks[createOp][volumeID]; ok { return fmt.Errorf("a Create operation with given id %s already exists", volumeID) } ol.locks[expandOp][volumeID] = 1 + case modifyOp: + // During modify operation the volume should not be deleted, cloned, + // resized and restored and there should not be a create operation also. + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[deleteOp][volumeID]; ok { + return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) + } + // check any clone operation is going on for given volume ID + if _, ok := ol.locks[cloneOpt][volumeID]; ok { + return fmt.Errorf("a Clone operation with given id %s already exists", volumeID) + } + // check any expand operation is going on for given volume ID + if _, ok := ol.locks[expandOp][volumeID]; ok { + return fmt.Errorf("a Expand operation with given id %s already exists", volumeID) + } + // check any restore operation is going on for given volume ID + if _, ok := ol.locks[restoreOp][volumeID]; ok { + return fmt.Errorf("a Restore operation with given id %s already exists", volumeID) + } + // check any create operation is going on for given volume ID + if _, ok := ol.locks[createOp][volumeID]; ok { + return fmt.Errorf("a Expand operation with given id %s already exists", volumeID) + } + + ol.locks[modifyOp][volumeID] = 1 default: return fmt.Errorf("%v operation not supported", op) } @@ -203,6 +230,12 @@ func (ol *OperationLock) GetExpandLock(volumeID string) error { return ol.tryAcquire(expandOp, volumeID) } +// GetModifyLock gets the modify lock on given volumeID,ensures that there is +// no create, delete, clone, expand, and restore operation on given volumeID. +func (ol *OperationLock) GetModifyLock(volumeID string) error { + return ol.tryAcquire(modifyOp, volumeID) +} + // ReleaseSnapshotCreateLock releases the create lock on given volumeID. func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) { ol.release(createOp, volumeID) @@ -228,12 +261,17 @@ func (ol *OperationLock) ReleaseExpandLock(volumeID string) { ol.release(expandOp, volumeID) } +// ReleaseModifyLock releases the modify lock on given volumeID. +func (ol *OperationLock) ReleaseModifyLock(volumeID string) { + ol.release(modifyOp, volumeID) +} + // release deletes the lock on volumeID. func (ol *OperationLock) release(op operation, volumeID string) { ol.mux.Lock() defer ol.mux.Unlock() switch op { - case cloneOpt, createOp, expandOp, restoreOp, deleteOp: + case cloneOpt, createOp, expandOp, restoreOp, deleteOp, modifyOp: if val, ok := ol.locks[op][volumeID]; ok { // decrement the counter for operation ol.locks[op][volumeID] = val - 1