Skip to content

Commit

Permalink
cephfs: safeguard localClusterState struct from race conditions
Browse files Browse the repository at this point in the history
This commit uses atomic.Int64 and sync.Map with members of
localClusterState and safeguards clusterAdditionalInfo map
operations with a mutex.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Oct 6, 2023
1 parent cba5402 commit 90dc22b
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 23 deletions.
45 changes: 42 additions & 3 deletions internal/cephfs/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,62 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar

func (s *subVolumeClient) supportsSubVolMetadata() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolMetadataState.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.RUnlock()

return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState != unsupported
}

func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool {
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.Unlock()

var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to the caller.
clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = unsupported

return false
}
clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = supported

return true
}

// isNotSupportedResize returns true if resize is not supported.
func (s *subVolumeClient) isNotSupportedResize() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].resizeState.RLock()
defer clusterAdditionalInfo[s.clusterID].resizeState.RUnlock()

return clusterAdditionalInfo[s.clusterID].resizeState.operationState != supported
}

// updateResizeState updates resize state.
func (s *subVolumeClient) updateResizeState(state operationState) {
clusterAdditionalInfo[s.clusterID].resizeState.Lock()
defer clusterAdditionalInfo[s.clusterID].resizeState.Unlock()

clusterAdditionalInfo[s.clusterID].resizeState.operationState = state
}

// isSubVolumeGroupCreated returns true if subvolume group is created.
func (s *subVolumeClient) isSubVolumeGroupCreated(group string) bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock()

return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] != false
}

// updateSubVolumeGroupCreated updates subvolume group created state.
func (s *subVolumeClient) updateSubVolumeGroupCreated(group string, state bool) {
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock()

clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] = state
}

// setMetadata sets custom metadata on the subvolume in a volume as a
// key-value pair.
func (s *subVolumeClient) setMetadata(key, value string) error {
Expand Down
11 changes: 8 additions & 3 deletions internal/cephfs/core/snapshot_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata

func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RLock()
defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RUnlock()

return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState != unsupported
}

func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool {
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Lock()
defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Unlock()

var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to
// the caller.
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = unsupported

return false
}
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = supported

return true
}
Expand Down
48 changes: 31 additions & 17 deletions internal/cephfs/core/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"path"
"strings"
"sync"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
Expand All @@ -32,12 +33,17 @@ import (
"github.com/ceph/go-ceph/rados"
)

// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
var clusterAdditionalInfo = make(map[string]*localClusterState)
var (
// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
clusterAdditionalInfo = make(map[string]*localClusterState)
// clusterAdditionalInfoMutex is used to protext against
// concurrent writes.
clusterAdditionalInfoMutex = sync.Mutex{}
)

// Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo.
Expand Down Expand Up @@ -190,7 +196,12 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err
return &subvol, nil
}

type operationState int64
type operationState int32

type operationStateMutex struct {
sync.RWMutex
operationState operationState
}

const (
unknown operationState = iota
Expand All @@ -201,19 +212,22 @@ const (
type localClusterState struct {
// set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster.
resizeState operationState
subVolMetadataState operationState
subVolSnapshotMetadataState operationState
resizeState operationStateMutex
subVolMetadataState operationStateMutex
subVolSnapshotMetadataState operationStateMutex
// A cluster can have multiple filesystem for that we need to have a map of
// subvolumegroups to check filesystem is created nor not.
// set true once a subvolumegroup is created
// for corresponding filesystem in a cluster.
subVolumeGroupsCreated map[string]bool
subVolumeGroupsRWMutex sync.RWMutex
}

func newLocalClusterState(clusterID string) {
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
clusterAdditionalInfoMutex.Lock()
defer clusterAdditionalInfoMutex.Unlock()
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
Expand All @@ -232,7 +246,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
}

// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
if !s.isSubVolumeGroupCreated(s.SubvolumeGroup) {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil {
Expand All @@ -246,7 +260,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
s.updateSubVolumeGroupCreated(s.SubvolumeGroup, true)
}

opts := fsAdmin.SubVolumeOptions{
Expand All @@ -264,7 +278,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
if errors.Is(err, rados.ErrNotFound) {
// Reset the subVolumeGroupsCreated so that we can try again to create the
// subvolumegroup in next request if the error is Not Found.
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false
s.updateSubVolumeGroupCreated(s.SubvolumeGroup, false)
}

return err
Expand Down Expand Up @@ -295,10 +309,10 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er
// CreateVolume to resize the subvolume.
func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error {
newLocalClusterState(s.clusterID)

// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState == supported {
if s.isNotSupportedResize() {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err)
Expand All @@ -307,7 +321,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
}
_, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[s.clusterID].resizeState = supported
s.updateResizeState(supported)

return nil
}
Expand All @@ -319,7 +333,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
return err
}
}
clusterAdditionalInfo[s.clusterID].resizeState = unsupported
s.updateResizeState(unsupported)
s.Size = bytesQuota

return s.CreateVolume(ctx)
Expand Down

0 comments on commit 90dc22b

Please sign in to comment.