Skip to content

Commit

Permalink
cephfs: safeguard localClusterState struct members from race condition
Browse files Browse the repository at this point in the history
Multiple go-routines may simultaneously check for
presence of a clusterID's metadata such as
subvolumegroup created state, resize state,
metadata state and snapshot metadata state in the
clusterAdditionalInfo map and update an entry after creation
if it is absent. This set of operation needs to be serialized.

Therefore, this commit safeguards localClusterState's members
with a RWMutex to prevent the above problem.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Oct 9, 2023
1 parent 21b10fb commit 7e6309f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 18 deletions.
55 changes: 52 additions & 3 deletions internal/cephfs/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,72 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar

func (s *subVolumeClient) supportsSubVolMetadata() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolMetadataState.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.RUnlock()

return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState != unsupported
}

func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool {
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.Unlock()

var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to the caller.
clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = unsupported

return false
}
clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = supported

return true
}

// isNotSupportedResize returns true if resize is not supported.
func (s *subVolumeClient) isNotSupportedResize() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].resizeState.RLock()
defer clusterAdditionalInfo[s.clusterID].resizeState.RUnlock()

return clusterAdditionalInfo[s.clusterID].resizeState.operationState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState.operationState == unsupported
}

// updateResizeState updates resize state.
func (s *subVolumeClient) updateResizeState(state operationState) {
clusterAdditionalInfo[s.clusterID].resizeState.Lock()
defer clusterAdditionalInfo[s.clusterID].resizeState.Unlock()

clusterAdditionalInfo[s.clusterID].resizeState.operationState = state
}

// isSubVolumeGroupCreated returns true if subvolume group is created.
func (s *subVolumeClient) isSubVolumeGroupCreated(group string) bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock()

if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil {
return false
}

return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group]
}

// updateSubVolumeGroupCreated updates subvolume group created map.
// If the map is nil, it creates a new map.
func (s *subVolumeClient) updateSubVolumeGroupCreated(group string, state bool) {
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock()

if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil {
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool)
}

clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] = state
}

// setMetadata sets custom metadata on the subvolume in a volume as a
// key-value pair.
func (s *subVolumeClient) setMetadata(key, value string) error {
Expand Down
11 changes: 8 additions & 3 deletions internal/cephfs/core/snapshot_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata

func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RUnlock()

return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState != unsupported
}

func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool {
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Unlock()

var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to
// the caller.
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = unsupported

return false
}
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = supported

return true
}
Expand Down
31 changes: 19 additions & 12 deletions internal/cephfs/core/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err
return &subvol, nil
}

type operationState int64
type operationState int32

type operationStateMutex struct {
sync.RWMutex
operationState operationState
}

const (
unknown operationState = iota
Expand All @@ -207,14 +212,17 @@ const (
type localClusterState struct {
// set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster.
resizeState operationState
subVolMetadataState operationState
subVolSnapshotMetadataState operationState
resizeState operationStateMutex
subVolMetadataState operationStateMutex
subVolSnapshotMetadataState operationStateMutex
// A cluster can have multiple filesystem for that we need to have a map of
// subvolumegroups to check filesystem is created nor not.
// set true once a subvolumegroup is created
// for corresponding filesystem in a cluster.
subVolumeGroupsCreated map[string]bool
// subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map
// against concurrent writes while allowing multiple readers.
subVolumeGroupsRWMutex sync.RWMutex
}

func newLocalClusterState(clusterID string) {
Expand All @@ -224,7 +232,6 @@ func newLocalClusterState(clusterID string) {
defer clusterAdditionalInfoMutex.Unlock()
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
}
}

Expand All @@ -240,7 +247,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
}

// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
if !s.isSubVolumeGroupCreated(s.SubvolumeGroup) {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil {
Expand All @@ -254,7 +261,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
s.updateSubVolumeGroupCreated(s.SubvolumeGroup, true)
}

opts := fsAdmin.SubVolumeOptions{
Expand All @@ -272,7 +279,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
if errors.Is(err, rados.ErrNotFound) {
// Reset the subVolumeGroupsCreated so that we can try again to create the
// subvolumegroup in next request if the error is Not Found.
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false
s.updateSubVolumeGroupCreated(s.SubvolumeGroup, false)
}

return err
Expand Down Expand Up @@ -303,10 +310,10 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er
// CreateVolume to resize the subvolume.
func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error {
newLocalClusterState(s.clusterID)

// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState == supported {
if s.isNotSupportedResize() {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err)
Expand All @@ -315,7 +322,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
}
_, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[s.clusterID].resizeState = supported
s.updateResizeState(supported)

return nil
}
Expand All @@ -327,7 +334,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
return err
}
}
clusterAdditionalInfo[s.clusterID].resizeState = unsupported
s.updateResizeState(unsupported)
s.Size = bytesQuota

return s.CreateVolume(ctx)
Expand Down

0 comments on commit 7e6309f

Please sign in to comment.