Skip to content

Commit

Permalink
cephfs: safeguard localClusterState struct from race conditions
Browse files Browse the repository at this point in the history
This commit uses atomic.Int64 and sync.Map with members of
localClusterState and safeguards clusterAdditionalInfo map
operations with a mutex.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Oct 6, 2023
1 parent cba5402 commit 61b19fa
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
6 changes: 3 additions & 3 deletions internal/cephfs/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar
func (s *subVolumeClient) supportsSubVolMetadata() bool {
newLocalClusterState(s.clusterID)

return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolMetadataState.Load() != unsupported
}

func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool {
var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to the caller.
clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(unsupported)

return false
}
clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(supported)

return true
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cephfs/core/snapshot_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata
func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
newLocalClusterState(s.clusterID)

return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Load() != unsupported
}

func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool {
var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to
// the caller.
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(unsupported)

return false
}
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(supported)

return true
}
Expand Down
48 changes: 27 additions & 21 deletions internal/cephfs/core/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"path"
"strings"
"sync"
"sync/atomic"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
Expand All @@ -32,12 +34,17 @@ import (
"github.com/ceph/go-ceph/rados"
)

// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
var clusterAdditionalInfo = make(map[string]*localClusterState)
var (
// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
clusterAdditionalInfo = make(map[string]*localClusterState)
// clusterAdditionalInfoMutex is used to synchronize access to
// clusterAdditionalInfo map.
clusterAdditionalInfoMutex = sync.Mutex{}
)

// Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo.
Expand Down Expand Up @@ -190,33 +197,32 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err
return &subvol, nil
}

type operationState int64

const (
unknown operationState = iota
unknown int64 = iota
supported
unsupported
)

type localClusterState struct {
// set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster.
resizeState operationState
subVolMetadataState operationState
subVolSnapshotMetadataState operationState
resizeState atomic.Int64
subVolMetadataState atomic.Int64
subVolSnapshotMetadataState atomic.Int64
// A cluster can have multiple filesystem for that we need to have a map of
// subvolumegroups to check filesystem is created nor not.
// set true once a subvolumegroup is created
// for corresponding filesystem in a cluster.
subVolumeGroupsCreated map[string]bool
subVolumeGroupsCreated sync.Map
}

func newLocalClusterState(clusterID string) {
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
clusterAdditionalInfoMutex.Lock()
defer clusterAdditionalInfoMutex.Unlock()
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
}
}

Expand All @@ -232,7 +238,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
}

// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
if _, found := clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Load(s.FsName); !found {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil {
Expand All @@ -246,7 +252,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Store(s.FsName, true)
}

opts := fsAdmin.SubVolumeOptions{
Expand All @@ -264,7 +270,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
if errors.Is(err, rados.ErrNotFound) {
// Reset the subVolumeGroupsCreated so that we can try again to create the
// subvolumegroup in next request if the error is Not Found.
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Delete(s.FsName)
}

return err
Expand Down Expand Up @@ -297,8 +303,8 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
newLocalClusterState(s.clusterID)
// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState == supported {
if clusterAdditionalInfo[s.clusterID].resizeState.Load() == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState.Load() == supported {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err)
Expand All @@ -307,7 +313,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
}
_, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[s.clusterID].resizeState = supported
clusterAdditionalInfo[s.clusterID].resizeState.Store(supported)

return nil
}
Expand All @@ -319,7 +325,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
return err
}
}
clusterAdditionalInfo[s.clusterID].resizeState = unsupported
clusterAdditionalInfo[s.clusterID].resizeState.Store(unsupported)
s.Size = bytesQuota

return s.CreateVolume(ctx)
Expand Down

0 comments on commit 61b19fa

Please sign in to comment.