Skip to content

Commit

Permalink
Merge pull request #478 from leiyiz/stateful_csi
Browse files Browse the repository at this point in the history
stateful CSI call logic
  • Loading branch information
k8s-ci-robot authored Apr 19, 2023
2 parents 788b9f8 + 672cb50 commit 45531da
Show file tree
Hide file tree
Showing 5 changed files with 568 additions and 28 deletions.
29 changes: 26 additions & 3 deletions pkg/csi_driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type controllerServerConfig struct {
ipAllocator *util.IPAllocator
volumeLocks *util.VolumeLocks
enableMultishare bool
statefulController *MultishareStatefulController
multiShareController *MultishareController
reconciler *MultishareReconciler
metricsManager *metrics.MetricsManager
Expand All @@ -129,6 +130,10 @@ func newControllerServer(config *controllerServerConfig) csi.ControllerServer {
if config.enableMultishare {
config.multiShareController = NewMultishareController(config)
config.multiShareController.opsManager.controllerServer = cs
if config.features.FeatureStateful.Enabled {
config.statefulController = NewMultishareStatefulController(config)
config.statefulController.mc = config.multiShareController
}
}
if config.reconciler != nil {
klog.Infof("stateful reconciler enabled, setting its controller server")
Expand All @@ -152,7 +157,13 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, status.Error(codes.InvalidArgument, "multishare controller not enabled")
}
start := time.Now()
response, err := s.config.multiShareController.CreateVolume(ctx, req)
var response *csi.CreateVolumeResponse
var err error
if s.config.features.FeatureStateful.Enabled {
response, err = s.config.statefulController.CreateVolume(ctx, req)
} else {
response, err = s.config.multiShareController.CreateVolume(ctx, req)
}
duration := time.Since(start)
s.config.metricsManager.RecordOperationMetrics(err, methodCreateVolume, modeMultishare, duration)
klog.Infof("CreateVolume response %+v error %v, for request %+v", response, err, req)
Expand Down Expand Up @@ -352,7 +363,13 @@ func (s *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolu
return nil, status.Error(codes.InvalidArgument, "multishare controller not enabled")
}
start := time.Now()
response, err := s.config.multiShareController.DeleteVolume(ctx, req)
var response *csi.DeleteVolumeResponse
var err error
if s.config.features.FeatureStateful.Enabled {
response, err = s.config.statefulController.DeleteVolume(ctx, req)
} else {
response, err = s.config.multiShareController.DeleteVolume(ctx, req)
}
duration := time.Since(start)
s.config.metricsManager.RecordOperationMetrics(err, methodDeleteVolume, modeMultishare, duration)
klog.Infof("Deletevolume response %+v error %v, for request: %+v", response, err, req)
Expand Down Expand Up @@ -649,7 +666,13 @@ func (s *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.
return nil, status.Error(codes.InvalidArgument, "multishare controller not enabled")
}
start := time.Now()
response, err := s.config.multiShareController.ControllerExpandVolume(ctx, req)
var response *csi.ControllerExpandVolumeResponse
var err error
if s.config.features.FeatureStateful.Enabled {
response, err = s.config.statefulController.ControllerExpandVolume(ctx, req)
} else {
response, err = s.config.multiShareController.ControllerExpandVolume(ctx, req)
}
duration := time.Since(start)
s.config.metricsManager.RecordOperationMetrics(err, methodExpandVolume, modeMultishare, duration)
klog.Infof("ControllerExpandVolume response %+v error %v, for request: %+v", response, err, req)
Expand Down
307 changes: 307 additions & 0 deletions pkg/csi_driver/multishare_stateful_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package driver

import (
csi "github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/apis/multishare/v1alpha1"
clientset "sigs.k8s.io/gcp-filestore-csi-driver/pkg/client/clientset/versioned"
listers "sigs.k8s.io/gcp-filestore-csi-driver/pkg/client/listers/multishare/v1alpha1"
cloud "sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider/file"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/util"
)

// MultishareController handles CSI calls for volumes which use Filestore multishare instances.
type MultishareStatefulController struct {
//TODO: support variable share count per Filestore instance feature.
driver *GCFSDriver
zone string
cloud *cloud.Cloud
mc *MultishareController

clientset clientset.Interface
shareLister listers.ShareInfoLister
}

func NewMultishareStatefulController(config *controllerServerConfig) *MultishareStatefulController {
return &MultishareStatefulController{
driver: config.driver,
zone: config.cloud.Zone,
cloud: config.cloud,
clientset: config.features.FeatureStateful.DriverClientSet,
shareLister: config.features.FeatureStateful.ShareLister,
}
}

func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.Infof("CreateVolume called for multishare with request %+v", req)
pvName := req.GetName()
if len(pvName) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
if err := m.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if req.GetVolumeContentSource() != nil {
return nil, status.Error(codes.InvalidArgument, "Multishare backed volumes do not support volume content source")
}

instanceSCLabel, err := getInstanceSCLabel(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqBytes, err := getShareRequestCapacity(req.GetCapacityRange(), util.MinShareSizeBytes, util.MaxShareSizeBytes)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if !util.IsAligned(reqBytes, util.Gb) {
return nil, status.Errorf(codes.InvalidArgument, "requested size(bytes) %d is not a multiple of 1GiB", reqBytes)
}

shareInfo, err := m.shareLister.Get(pvName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error getting shareInfo %q from informer: %s", pvName, err.Error())
}
klog.Infof("querying ShareInfo %q from api server", pvName)
shareInfo, err = m.clientset.MultishareV1alpha1().ShareInfos().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error getting shareInfo %q from api server: %s", pvName, err.Error())
}
klog.V(6).Infof("shareInfo object for share %q not found in API server", pvName)
shareInfo = nil
} else {
klog.Infof("shareInfo object for share %q not found in informer cache but found in api server", pvName)
}
}

if shareInfo == nil {
region, err := m.mc.pickRegion(req.GetAccessibilityRequirements())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
shareInfo = &v1alpha1.ShareInfo{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
Finalizers: []string{util.FilestoreResourceCleanupFinalizer},
Labels: extractShareLabels(req.Parameters),
},
Spec: v1alpha1.ShareInfoSpec{
ShareName: util.ConvertVolToShareName(pvName),
CapacityBytes: reqBytes,
InstancePoolTag: instanceSCLabel,
Region: region,
},
}
klog.V(6).Infof("trying to create shareInfo object: %v", shareInfo)
shareInfo, err = m.createShareInfo(ctx, shareInfo)
if err != nil {
return nil, status.Errorf(codes.Internal, "error creating share object: %s", err.Error())
}
}

if shareInfo.Status == nil || shareInfo.Status.InstanceHandle == "" {
return nil, status.Errorf(codes.Aborted, "share %s is not assigned to an instance yet", pvName)
}

if shareInfo.Status.ShareStatus != v1alpha1.READY {
if shareInfo.Status.Error != "" {
return nil, status.Errorf(codes.Internal, "internal error: %s", shareInfo.Status.Error)
}
return nil, status.Errorf(codes.Aborted, "share %s is not ready yet", pvName)
}

share, err := generateFileShareFromShareInfo(shareInfo)
if err != nil {
return nil, err
}
return m.mc.getShareAndGenerateCSICreateVolumeResponse(ctx, instanceSCLabel, share, util.MaxShareSizeBytes)
}

func (m *MultishareStatefulController) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
_, project, location, instanceName, shareName, err := parseMultishareVolId(req.VolumeId)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
klog.V(4).Infof("DeleteVolume called for multishare with request %+v", req)

siName := util.ShareToShareInfoName(shareName)
shareInfo, err := m.shareLister.Get(siName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error deleting volume %q due to informer error: %s", req.VolumeId, err.Error())
}
// check with api if share exist
klog.V(6).Infof("shareInfo %s does not exist in cache, checking if share is already deleted", siName)
_, err := m.cloud.File.GetShare(ctx, &file.Share{
Parent: &file.MultishareInstance{
Project: project,
Location: location,
Name: instanceName,
},
Name: shareName,
})
if err != nil {
if file.IsNotFoundErr(err) {
return &csi.DeleteVolumeResponse{}, nil
}

return nil, status.Error(codes.Internal, err.Error())
}
return nil, status.Errorf(codes.Aborted, "waiting to express intent for volume %s to be deleted", req.VolumeId)
}

if shareInfo.DeletionTimestamp == nil {
if len(shareInfo.Finalizers) == 0 {
klog.Errorf("shareInfo %s shouldn't have no finalizer before deletion marking", siName)
return nil, status.Errorf(codes.Internal, "error deleting volume %s due to driver state error", req.VolumeId)
}

err := m.deleteShareInfo(ctx, siName)
if err != nil {
klog.Errorf("error marking the shareInfo object as deleted: %s", err.Error())
}
return nil, status.Errorf(codes.Aborted, "expressed intent for volume %s to be deleted, waiting.", req.VolumeId)
}

if shareInfo.Status == nil {
klog.Errorf("shareInfo %s marked to be deleted but shareInfo.Status == nil", siName)
return nil, status.Errorf(codes.Aborted, "waiting for volume %s to be deleted.", req.VolumeId)
}

if shareInfo.Status.ShareStatus == v1alpha1.DELETED {
// remove finalizer and return success
klog.V(6).Infof("trying to remove finalizer from %s because share deleted", siName)
shareInfoClone := shareInfo.DeepCopy()
shareInfoClone.Finalizers = []string{}
shareInfo, err = m.updateShareInfo(ctx, shareInfoClone)
if err != nil {
klog.Errorf("failed to remove finalizer from %s: %s", siName, err.Error())
return nil, status.Errorf(codes.Internal, "error deleting volume %s due to failed internal update", req.VolumeId)
}
return &csi.DeleteVolumeResponse{}, nil
}

if shareInfo.Status.Error != "" {
return nil, status.Errorf(codes.Internal, "internal error: %s", shareInfo.Status.Error)
}

return nil, status.Errorf(codes.Aborted, "waiting for the Filestore share supporting volume %s to be deleted", req.VolumeId)
}

func (m *MultishareStatefulController) updateShareInfo(ctx context.Context, shareInfoClone *v1alpha1.ShareInfo) (*v1alpha1.ShareInfo, error) {
result, err := m.clientset.MultishareV1alpha1().ShareInfos().Update(ctx, shareInfoClone, metav1.UpdateOptions{})
if err != nil {
return result, err
}
return result, nil
}

func (m *MultishareStatefulController) createShareInfo(ctx context.Context, shareInfo *v1alpha1.ShareInfo) (*v1alpha1.ShareInfo, error) {
result, err := m.clientset.MultishareV1alpha1().ShareInfos().Create(ctx, shareInfo, metav1.CreateOptions{})
if err != nil {
return result, err
}
return result, nil
}

func (m *MultishareStatefulController) deleteShareInfo(ctx context.Context, siName string) error {
return m.clientset.MultishareV1alpha1().ShareInfos().Delete(ctx, siName, metav1.DeleteOptions{})
}

func (m *MultishareStatefulController) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
reqBytes, err := getShareRequestCapacity(req.GetCapacityRange(), util.MinShareSizeBytes, util.MaxShareSizeBytes)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if !util.IsAligned(reqBytes, util.Gb) {
return nil, status.Errorf(codes.InvalidArgument, "requested size(bytes) %d is not a multiple of 1GiB", reqBytes)
}
_, _, _, _, shareName, err := parseMultishareVolId(req.VolumeId)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
klog.Infof("ControllerExpandVolume called for multishare with request %+v", req)

siName := util.ShareToShareInfoName(shareName)
shareInfo, err := m.shareLister.Get(siName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error getting shareInfo %q from informer: %s", siName, err.Error())
}
klog.Infof("shareInfo %s does not exist in cache", siName)
return nil, status.Errorf(codes.Aborted, "waiting to express intent for volume %s to be expanded", siName)
}

if shareInfo.Spec.CapacityBytes < reqBytes {
// update Spec.CapacityBytes
shareInfoClone := shareInfo.DeepCopy()
shareInfoClone.Spec.CapacityBytes = reqBytes
shareInfo, err = m.updateShareInfo(ctx, shareInfoClone)
if err != nil {
klog.Errorf("failed to update shareInfo %s: %s", siName, err.Error())
return nil, status.Errorf(codes.Internal, "error expanding volume %s due to failed internal update", siName)
}
return nil, status.Errorf(codes.Aborted, "expressed intent for volume %s to be expanded", siName)
}

if shareInfo.Status == nil {
klog.Errorf("ControllerExpandVolume called for %s but shareInfo.Status is nil", siName)
return nil, status.Errorf(codes.Internal, "volume %s is not yet created", siName)
}

if shareInfo.Status.CapacityBytes >= reqBytes && shareInfo.Status.ShareStatus == v1alpha1.READY {
klog.Infof("Controller expand volume succeeded for volume %v, size(bytes): %v", req.VolumeId, shareInfo.Status.CapacityBytes)

share, err := generateFileShareFromShareInfo(shareInfo)
if err != nil {
return nil, err
}
return m.mc.getShareAndGenerateCSIControllerExpandVolumeResponse(ctx, share, reqBytes)
}

if shareInfo.Status.Error != "" {
return nil, status.Errorf(codes.Internal, "internal error: %s", shareInfo.Status.Error)
}

return nil, status.Errorf(codes.Aborted, "waiting for volume %s to be expanded", siName)
}

func generateFileShareFromShareInfo(shareInfo *v1alpha1.ShareInfo) (*file.Share, error) {
instanceUri := shareInfo.Status.InstanceHandle
project, location, instanceName, err := util.ParseInstanceURI(instanceUri)
if err != nil {
return nil, status.Errorf(codes.Internal, "couldn't parse instanceURI %q: %s", instanceUri, err.Error())
}
return &file.Share{
Name: shareInfo.Spec.ShareName,
Parent: &file.MultishareInstance{
Project: project,
Location: location,
Name: instanceName,
},
}, nil
}
Loading

0 comments on commit 45531da

Please sign in to comment.