diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f468..f7f85591e039 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -34,23 +34,72 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar func (s *subVolumeClient) supportsSubVolMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState != unsupported } func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to the caller. - clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = supported return true } +// isNotSupportedResize returns true if resize is not supported. +func (s *subVolumeClient) isNotSupportedResize() bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].resizeState.RLock() + defer clusterAdditionalInfo[s.clusterID].resizeState.RUnlock() + + return clusterAdditionalInfo[s.clusterID].resizeState.operationState == unknown || + clusterAdditionalInfo[s.clusterID].resizeState.operationState == unsupported +} + +// updateResizeState updates resize state. +func (s *subVolumeClient) updateResizeState(state operationState) { + clusterAdditionalInfo[s.clusterID].resizeState.Lock() + defer clusterAdditionalInfo[s.clusterID].resizeState.Unlock() + + clusterAdditionalInfo[s.clusterID].resizeState.operationState = state +} + +// isSubVolumeGroupCreated returns true if subvolume group is created. +func (s *subVolumeClient) isSubVolumeGroupCreated(group string) bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + return false + } + + return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] +} + +// updateSubVolumeGroupCreated updates subvolume group created map. +// If the map is nil, it creates a new map. +func (s *subVolumeClient) updateSubVolumeGroupCreated(group string, state bool) { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool) + } + + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] = state +} + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index f168fbf8cd41..123a4eac892c 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -30,20 +30,25 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata func (s *snapshotClient) supportsSubVolSnapMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState != unsupported } func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to // the caller. - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = supported return true } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 54c3dd4eb216..3d3470fb532a 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -196,7 +196,12 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err return &subvol, nil } -type operationState int64 +type operationState int32 + +type operationStateMutex struct { + sync.RWMutex + operationState operationState +} const ( unknown operationState = iota @@ -207,14 +212,17 @@ const ( type localClusterState struct { // set the enum value i.e., unknown, supported, // unsupported as per the state of the cluster. - resizeState operationState - subVolMetadataState operationState - subVolSnapshotMetadataState operationState + resizeState operationStateMutex + subVolMetadataState operationStateMutex + subVolSnapshotMetadataState operationStateMutex // A cluster can have multiple filesystem for that we need to have a map of // subvolumegroups to check filesystem is created nor not. // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. subVolumeGroupsCreated map[string]bool + // subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map + // against concurrent writes while allowing multiple readers. + subVolumeGroupsRWMutex sync.RWMutex } func newLocalClusterState(clusterID string) { @@ -224,7 +232,6 @@ func newLocalClusterState(clusterID string) { defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} - clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) } } @@ -240,7 +247,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if !s.isSubVolumeGroupCreated(s.SubvolumeGroup) { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -254,7 +261,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, true) } opts := fsAdmin.SubVolumeOptions{ @@ -272,7 +279,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, false) } return err @@ -303,10 +310,10 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er // CreateVolume to resize the subvolume. func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { newLocalClusterState(s.clusterID) + // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. - if clusterAdditionalInfo[s.clusterID].resizeState == unknown || - clusterAdditionalInfo[s.clusterID].resizeState == supported { + if s.isNotSupportedResize() { fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err) @@ -315,7 +322,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er } _, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true) if err == nil { - clusterAdditionalInfo[s.clusterID].resizeState = supported + s.updateResizeState(supported) return nil } @@ -327,7 +334,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er return err } } - clusterAdditionalInfo[s.clusterID].resizeState = unsupported + s.updateResizeState(unsupported) s.Size = bytesQuota return s.CreateVolume(ctx)