diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f468..a23c867d2ba5 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -35,18 +35,18 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar func (s *subVolumeClient) supportsSubVolMetadata() bool { newLocalClusterState(s.clusterID) - return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolMetadataState.Load() != unsupported } func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to the caller. - clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(unsupported) return false } - clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(supported) return true } diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index f168fbf8cd41..479a6d697f23 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -31,7 +31,7 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata func (s *snapshotClient) supportsSubVolSnapMetadata() bool { newLocalClusterState(s.clusterID) - return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Load() != unsupported } func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { @@ -39,11 +39,11 @@ func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to // the caller. - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(unsupported) return false } - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(supported) return true } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index c95a7cc8639e..fe89b87bf087 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -22,6 +22,8 @@ import ( "fmt" "path" "strings" + "sync" + "sync/atomic" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" @@ -32,12 +34,17 @@ import ( "github.com/ceph/go-ceph/rados" ) -// clusterAdditionalInfo contains information regarding if resize is -// supported in the particular cluster and subvolumegroup is -// created or not. -// Subvolumegroup creation and volume resize decisions are -// taken through this additional cluster information. -var clusterAdditionalInfo = make(map[string]*localClusterState) +var ( + // clusterAdditionalInfo contains information regarding if resize is + // supported in the particular cluster and subvolumegroup is + // created or not. + // Subvolumegroup creation and volume resize decisions are + // taken through this additional cluster information. + clusterAdditionalInfo = make(map[string]*localClusterState) + // clusterAdditionalInfoMutex is used to synchronize access to + // clusterAdditionalInfo map. + clusterAdditionalInfoMutex = sync.Mutex{} +) // Subvolume holds subvolume information. This includes only the needed members // from fsAdmin.SubVolumeInfo. @@ -190,10 +197,8 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err return &subvol, nil } -type operationState int64 - const ( - unknown operationState = iota + unknown int64 = iota supported unsupported ) @@ -201,22 +206,23 @@ const ( type localClusterState struct { // set the enum value i.e., unknown, supported, // unsupported as per the state of the cluster. - resizeState operationState - subVolMetadataState operationState - subVolSnapshotMetadataState operationState + resizeState atomic.Int64 + subVolMetadataState atomic.Int64 + subVolSnapshotMetadataState atomic.Int64 // A cluster can have multiple filesystem for that we need to have a map of // subvolumegroups to check filesystem is created nor not. // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. - subVolumeGroupsCreated map[string]bool + subVolumeGroupsCreated sync.Map } func newLocalClusterState(clusterID string) { // verify if corresponding clusterID key is present in the map, // and if not, initialize with default values(false). + clusterAdditionalInfoMutex.Lock() + defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} - clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) } } @@ -232,7 +238,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if _, found := clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Load(s.FsName); !found { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -246,7 +252,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Store(s.FsName, true) } opts := fsAdmin.SubVolumeOptions{ @@ -264,7 +270,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Delete(s.FsName) } return err @@ -297,8 +303,8 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er newLocalClusterState(s.clusterID) // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. - if clusterAdditionalInfo[s.clusterID].resizeState == unknown || - clusterAdditionalInfo[s.clusterID].resizeState == supported { + if clusterAdditionalInfo[s.clusterID].resizeState.Load() == unknown || + clusterAdditionalInfo[s.clusterID].resizeState.Load() == supported { fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err) @@ -307,7 +313,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er } _, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true) if err == nil { - clusterAdditionalInfo[s.clusterID].resizeState = supported + clusterAdditionalInfo[s.clusterID].resizeState.Store(supported) return nil } @@ -319,7 +325,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er return err } } - clusterAdditionalInfo[s.clusterID].resizeState = unsupported + clusterAdditionalInfo[s.clusterID].resizeState.Store(unsupported) s.Size = bytesQuota return s.CreateVolume(ctx)