Skip to content

Commit

Permalink
Add validation if PVC expansion is allowed, modify the behavior to ke…
Browse files Browse the repository at this point in the history
…ep the existing PersistentVolumeClaims in the StS, but modifying the sizes only
  • Loading branch information
burmanm committed Jul 3, 2024
1 parent be1b2a8 commit 60bc169
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 96 deletions.
23 changes: 13 additions & 10 deletions apis/cassandra/v1beta1/cassandradatacenter_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"fmt"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/k8ssandra/cass-operator/pkg/images"

apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -172,17 +173,19 @@ func ValidateDatacenterFieldChanges(oldDc CassandraDatacenter, newDc CassandraDa
return attemptedTo("change serviceAccount")
}

// CassandraDataVolumeClaimSpec changes are disallowed
oldClaimSpec := oldDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec.DeepCopy()
newClaimSpec := newDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec.DeepCopy()

// if !reflect.DeepEqual(oldDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec, newDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec) {
// return attemptedTo("change storageConfig.CassandraDataVolumeClaimSpec")
// }
// CassandraDataVolumeClaimSpec changes are disallowed
if metav1.HasAnnotation(newDc.ObjectMeta, AllowStorageChangesAnnotation) && newDc.Annotations[AllowStorageChangesAnnotation] == "true" {
// If the AllowStorageChangesAnnotation is set, we allow changes to the CassandraDataVolumeClaimSpec sizes, but not other fields
oldClaimSpec.Resources.Requests = newClaimSpec.Resources.Requests
}

// if oldDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec != nil {
// if !reflect.DeepEqual(*oldDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec, *newDc.Spec.StorageConfig.CassandraDataVolumeClaimSpec) {
// return attemptedTo("change storageConfig.CassandraDataVolumeClaimSpec")
// }
// }
if !apiequality.Semantic.DeepEqual(oldClaimSpec, newClaimSpec) {
pvcSourceDiff := cmp.Diff(oldClaimSpec, newClaimSpec)
return attemptedTo("change storageConfig.CassandraDataVolumeClaimSpec, diff: %s", pvcSourceDiff)
}

// Topology changes - Racks
// - Rack Name and Zone changes are disallowed.
Expand Down
91 changes: 85 additions & 6 deletions apis/cassandra/v1beta1/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -398,7 +399,7 @@ func Test_ValidateSingleDatacenter(t *testing.T) {

func Test_ValidateDatacenterFieldChanges(t *testing.T) {
storageSize := resource.MustParse("1Gi")
storageName := "server-data"
storageName := ptr.To[string]("server-data")

tests := []struct {
name string
Expand All @@ -419,7 +420,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
DeprecatedServiceAccount: "admin",
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: &storageName,
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
Expand All @@ -446,7 +447,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
DeprecatedServiceAccount: "admin",
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: &storageName,
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
Expand Down Expand Up @@ -573,7 +574,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: &storageName,
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
Expand All @@ -589,7 +590,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: &storageName,
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteMany"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
Expand All @@ -600,6 +601,84 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
},
errString: "change storageConfig.CassandraDataVolumeClaimSpec",
},
{
name: "StorageClassName changes with storageConfig changes allowed",
oldDc: &CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "exampleDC",
},
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
},
},
},
},
},
newDc: &CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "exampleDC",
Annotations: map[string]string{
AllowStorageChangesAnnotation: "true",
},
},
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: ptr.To[string]("new-server-data"),
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
},
},
},
},
},
errString: "change storageConfig.CassandraDataVolumeClaimSpec",
},
{
name: "storage requests size changes with storageConfig changes allowed",
oldDc: &CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "exampleDC",
},
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": storageSize},
},
},
},
},
},
newDc: &CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "exampleDC",
Annotations: map[string]string{
AllowStorageChangesAnnotation: "true",
},
},
Spec: CassandraDatacenterSpec{
StorageConfig: StorageConfig{
CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: storageName,
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{"storage": resource.MustParse("2Gi")},
},
},
},
},
},
errString: "",
},
{
name: "Removing a rack",
oldDc: &CassandraDatacenter{
Expand Down Expand Up @@ -836,7 +915,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
} else {
if tt.errString == "" {
t.Errorf("ValidateDatacenterFieldChanges() err = %v, should be valid", err)
} else if !strings.HasSuffix(err.Error(), tt.errString) {
} else if !strings.Contains(err.Error(), tt.errString) {
t.Errorf("ValidateDatacenterFieldChanges() err = %v, want suffix %v", err, tt.errString)
}
}
Expand Down
137 changes: 70 additions & 67 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (rc *ReconciliationContext) CheckPVCResizing() result.ReconcileResult {
}

for _, pvc := range pvcList {
if isPVCResizing(pvc) {
if isPVCResizing(&pvc) {
rc.ReqLogger.Info("Waiting for PVC resize to complete",
"pvc", pvc.Name)
return result.RequeueSoon(10)
Expand All @@ -197,12 +197,12 @@ func (rc *ReconciliationContext) CheckPVCResizing() result.ReconcileResult {
return result.Continue()
}

func isPVCResizing(pvc corev1.PersistentVolumeClaim) bool {
func isPVCResizing(pvc *corev1.PersistentVolumeClaim) bool {
return isPVCStatusConditionTrue(pvc, corev1.PersistentVolumeClaimResizing) ||
isPVCStatusConditionTrue(pvc, corev1.PersistentVolumeClaimFileSystemResizePending)
}

func isPVCStatusConditionTrue(pvc corev1.PersistentVolumeClaim, conditionType corev1.PersistentVolumeClaimConditionType) bool {
func isPVCStatusConditionTrue(pvc *corev1.PersistentVolumeClaim, conditionType corev1.PersistentVolumeClaimConditionType) bool {
for _, condition := range pvc.Status.Conditions {
if condition.Type == conditionType && condition.Status == corev1.ConditionTrue {
return true
Expand All @@ -214,89 +214,88 @@ func isPVCStatusConditionTrue(pvc corev1.PersistentVolumeClaim, conditionType co
func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts *appsv1.StatefulSet) result.ReconcileResult {
rc.ReqLogger.Info("reconcile_racks::CheckVolumeClaims")

current := make(map[string]corev1.PersistentVolumeClaim, len(statefulSet.Spec.VolumeClaimTemplates))
desired := make(map[string]corev1.PersistentVolumeClaim, len(desiredSts.Spec.VolumeClaimTemplates))

for _, claim := range statefulSet.Spec.VolumeClaimTemplates {
current[claim.Name] = claim
}

for _, claim := range desiredSts.Spec.VolumeClaimTemplates {
desired[claim.Name] = claim
}

supportsExpansion, err := rc.storageExpansion()
if err != nil {
return result.Error(err)
}

for name, claim := range current {
if _, ok := desired[name]; ok {
currentSize := claim.Spec.Resources.Requests[corev1.ResourceStorage]
createdSize := desired[name].Spec.Resources.Requests[corev1.ResourceStorage]
for i, claim := range statefulSet.Spec.VolumeClaimTemplates {
// Find the desired one
desiredClaim := desiredSts.Spec.VolumeClaimTemplates[i]
if claim.Name != desiredClaim.Name {
return result.Error(fmt.Errorf("statefulSet and desiredSts have different volume claim templates"))
}

// TODO This code is a bit repetitive with all the Status patches. Needs a refactoring in cass-operator since this is a known
// pattern. https://github.com/k8ssandra/cass-operator/issues/669
if currentSize.Cmp(createdSize) > 0 {
dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated {
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for updating")
return result.Error(err)
}
currentSize := claim.Spec.Resources.Requests[corev1.ResourceStorage]
createdSize := desiredClaim.Spec.Resources.Requests[corev1.ResourceStorage]

// TODO This code is a bit repetitive with all the Status patches. Needs a refactoring in cass-operator since this is a known
// pattern. https://github.com/k8ssandra/cass-operator/issues/669
if currentSize.Cmp(createdSize) > 0 {
dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated {
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for updating")
return result.Error(err)
}
return result.Error(fmt.Errorf("shrinking PVC %s is not supported", name))
}
return result.Error(fmt.Errorf("shrinking PVC %s is not supported", claim.Name))
}

if currentSize.Cmp(createdSize) < 0 {
rc.ReqLogger.Info("PVC resize request detected", "pvc", name)
if currentSize.Cmp(createdSize) < 0 {
rc.ReqLogger.Info("PVC resize request detected", "pvc", claim.Name)

if !supportsExpansion {
rc.ReqLogger.Info("PVC resize requested, but StorageClass does not support expansion", "pvc", name)

dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated {
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for updating")
return result.Error(err)
}
}
return result.Error(fmt.Errorf("PVC resize requested, but StorageClass does not support expansion"))
}
if !supportsExpansion {
rc.ReqLogger.Info("PVC resize requested, but StorageClass does not support expansion", "pvc", claim.Name)

dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterResizingVolumes, corev1.ConditionTrue)); updated {
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterValid, corev1.ConditionFalse)); updated {
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for updating")
return result.Error(err)
}
}
return result.Error(fmt.Errorf("PVC resize requested, but StorageClass does not support expansion"))
}

claims, err := rc.listPVCs(claim.Labels)
if err != nil {
dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterResizingVolumes, corev1.ConditionTrue)); updated {
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch); err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for updating")
return result.Error(err)
}
}

pvcNamePrefix := fmt.Sprintf("%s-%s-", claim.Name, statefulSet.Name)
targetPVCs := make([]corev1.PersistentVolumeClaim, 0)
for _, pvc := range claims {
if strings.HasPrefix(pvc.Name, pvcNamePrefix) {
targetPVCs = append(targetPVCs, pvc)
}
claims, err := rc.listPVCs(claim.Labels)
if err != nil {
return result.Error(err)
}

pvcNamePrefix := fmt.Sprintf("%s-%s-", claim.Name, statefulSet.Name)
targetPVCs := make([]*corev1.PersistentVolumeClaim, 0)
for _, pvc := range claims {
if strings.HasPrefix(pvc.Name, pvcNamePrefix) {
targetPVCs = append(targetPVCs, &pvc)
}
}

for _, pvc := range targetPVCs {
if isPVCResizing(pvc) {
return result.RequeueSoon(10)
}
patch := client.MergeFrom(pvc.DeepCopy())
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = createdSize
if err := rc.Client.Patch(rc.Ctx, &pvc, patch); err != nil {
return result.Error(err)
}
for _, pvc := range targetPVCs {
if isPVCResizing(pvc) {
return result.RequeueSoon(10)
}

patch := client.MergeFrom(pvc.DeepCopy())
pvc.Spec.Resources.Requests["storage"] = createdSize
if err := rc.Client.Patch(rc.Ctx, pvc, patch); err != nil {
return result.Error(err)
}
return result.Continue()
}

// Update the StatefulSet to reflect the new PVC size
claim.Spec.Resources.Requests[corev1.ResourceStorage] = createdSize
statefulSet.Spec.VolumeClaimTemplates[i] = claim

return result.Continue()
}
}

Expand Down Expand Up @@ -385,12 +384,13 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
desiredSts.Annotations = utils.MergeMap(map[string]string{}, statefulSet.Annotations, desiredSts.Annotations)

// copy the stuff that can't be updated

// TODO Add a annotation check for "experimental-feature" here and if not enabled, use old method
if res := rc.CheckVolumeClaimSizes(statefulSet, desiredSts); res.Completed() {
return res
if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.AllowStorageChangesAnnotation) && rc.Datacenter.Annotations[api.AllowStorageChangesAnnotation] == "true" {
if res := rc.CheckVolumeClaimSizes(statefulSet, desiredSts); res.Completed() {
return res
}
}
// desiredSts.Spec.VolumeClaimTemplates = statefulSet.Spec.VolumeClaimTemplates
desiredSts.Spec.VolumeClaimTemplates = statefulSet.Spec.VolumeClaimTemplates

// selector must match podTemplate.Labels, those can't be updated either
desiredSts.Spec.Selector = statefulSet.Spec.Selector

Expand Down Expand Up @@ -1111,6 +1111,7 @@ func (rc *ReconciliationContext) UpdateCassandraNodeStatus(force bool) error {
}
}

logger.Info("Setting state", "Running", isMgmtApiRunning(pod), "pod", pod.Name)
if pod.Status.PodIP != "" && isMgmtApiRunning(pod) {
ip := getRpcAddress(dc, pod)
nodeStatus.IP = ip
Expand Down Expand Up @@ -2045,7 +2046,9 @@ func (rc *ReconciliationContext) hasAdditionalSeeds() bool {
func (rc *ReconciliationContext) startNode(pod *corev1.Pod, labelSeedBeforeStart bool, endpointData httphelper.CassMetadataEndpoints) (bool, error) {
if pod == nil {
return true, nil
} else if !isServerReady(pod) {
}

if !isServerReady(pod) {
if isServerReadyToStart(pod) && isMgmtApiRunning(pod) {

// this is the one exception to all seed labelling happening in labelSeedPods()
Expand Down
Loading

0 comments on commit 60bc169

Please sign in to comment.