Skip to content

Commit

Permalink
save the look up of enveloped work
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Zhang committed Nov 6, 2024
1 parent 03caf65 commit c4d687d
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 81 deletions.
132 changes: 51 additions & 81 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
if uResource.GetObjectKind().GroupVersionKind() == utils.ConfigMapGVK &&
len(uResource.GetAnnotations()[fleetv1beta1.EnvelopeConfigMapAnnotation]) != 0 {
// get a work object for the enveloped configMap
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
work, err := r.getConfigMapEnvelopWorkObj(workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
if err != nil {
return true, false, err
}
Expand Down Expand Up @@ -547,7 +547,7 @@ func (r *Reconciler) fetchAllResourceSnapshots(ctx context.Context, resourceBind

// getConfigMapEnvelopWorkObj first try to locate a work object for the corresponding envelopObj of type configMap.
// we create a new one if the work object doesn't exist. We do this to avoid repeatedly delete and create the same work object.
func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePrefix string, resourceBinding *fleetv1beta1.ClusterResourceBinding,
func (r *Reconciler) getConfigMapEnvelopWorkObj(workNamePrefix string, resourceBinding *fleetv1beta1.ClusterResourceBinding,
resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, envelopeObj *unstructured.Unstructured, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash string) (*fleetv1beta1.Work, error) {
// we group all the resources in one configMap to one work
manifest, err := extractResFromConfigMap(envelopeObj)
Expand All @@ -559,75 +559,42 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
klog.V(2).InfoS("Successfully extract the enveloped resources from the configMap", "numOfResources", len(manifest),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))

// Try to see if we already have a work represent the same enveloped object for this CRP in the same cluster
// The ParentResourceSnapshotIndexLabel can change between snapshots so we have to exclude that label in the match
envelopWorkLabelMatcher := client.MatchingLabels{
fleetv1beta1.ParentBindingLabel: resourceBinding.Name,
fleetv1beta1.CRPTrackingLabel: resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel],
fleetv1beta1.EnvelopeTypeLabel: string(fleetv1beta1.ConfigMapEnvelopeType),
fleetv1beta1.EnvelopeNameLabel: envelopeObj.GetName(),
fleetv1beta1.EnvelopeNamespaceLabel: envelopeObj.GetNamespace(),
}
workList := &fleetv1beta1.WorkList{}
if err := r.Client.List(ctx, workList, envelopWorkLabelMatcher); err != nil {
return nil, controller.NewAPIServerError(true, err)
}
// we need to create a new work object
if len(workList.Items) == 0 {
// we limit the CRP name length to be 63 (DNS1123LabelMaxLength) characters,
// so we have plenty of characters left to fit into 253 (DNS1123SubdomainMaxLength) characters for a CR
workName := fmt.Sprintf(fleetv1beta1.WorkNameWithConfigEnvelopeFmt, workNamePrefix, uuid.NewUUID())
return &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: fmt.Sprintf(utils.NamespaceNameFormat, resourceBinding.Spec.TargetCluster),
Labels: map[string]string{
fleetv1beta1.ParentBindingLabel: resourceBinding.Name,
fleetv1beta1.CRPTrackingLabel: resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel],
fleetv1beta1.ParentResourceSnapshotIndexLabel: resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel],
fleetv1beta1.EnvelopeTypeLabel: string(fleetv1beta1.ConfigMapEnvelopeType),
fleetv1beta1.EnvelopeNameLabel: envelopeObj.GetName(),
fleetv1beta1.EnvelopeNamespaceLabel: envelopeObj.GetNamespace(),
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: resourceBinding.Spec.ResourceSnapshotName,
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Kind: resourceBinding.Kind,
Name: resourceBinding.Name,
UID: resourceBinding.UID,
BlockOwnerDeletion: ptr.To(true), // make sure that the k8s will call work delete when the binding is deleted
},
},
// we limit the CRP name length to be 63 (DNS1123LabelMaxLength) characters,
// so we have plenty of characters left to fit into 253 (DNS1123SubdomainMaxLength) characters for a CR
return &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.WorkNameWithConfigEnvelopeFmt, workNamePrefix, uuid.NewUUID()),
Namespace: fmt.Sprintf(utils.NamespaceNameFormat, resourceBinding.Spec.TargetCluster),
Labels: map[string]string{
fleetv1beta1.ParentBindingLabel: resourceBinding.Name,
fleetv1beta1.CRPTrackingLabel: resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel],
fleetv1beta1.ParentResourceSnapshotIndexLabel: resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel],
fleetv1beta1.EnvelopeTypeLabel: string(fleetv1beta1.ConfigMapEnvelopeType),
fleetv1beta1.EnvelopeNameLabel: envelopeObj.GetName(),
fleetv1beta1.EnvelopeNamespaceLabel: envelopeObj.GetNamespace(),
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: resourceBinding.Spec.ResourceSnapshotName,
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: manifest,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Kind: resourceBinding.Kind,
Name: resourceBinding.Name,
UID: resourceBinding.UID,
BlockOwnerDeletion: ptr.To(true), // make sure that the k8s will call work delete when the binding is deleted
},
ApplyStrategy: resourceBinding.Spec.ApplyStrategy,
},
}, nil
}
if len(workList.Items) > 1 {
// return error here won't get us out of this
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("find %d work representing configMap", len(workList.Items))),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))
}
work := workList.Items[0]
work.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel]
if work.Annotations == nil {
work.Annotations = make(map[string]string)
}
work.Annotations[fleetv1beta1.ParentResourceSnapshotNameAnnotation] = resourceBinding.Spec.ResourceSnapshotName
work.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = resourceOverrideSnapshotHash
work.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = clusterResourceOverrideSnapshotHash
work.Spec.Workload.Manifests = manifest
work.Spec.ApplyStrategy = resourceBinding.Spec.ApplyStrategy
return &work, nil
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: manifest,
},
ApplyStrategy: resourceBinding.Spec.ApplyStrategy,
},
}, nil
}

// generateSnapshotWorkObj generates the work object for the corresponding snapshot
Expand Down Expand Up @@ -684,22 +651,25 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
// check if we need to update the existing work object
workResourceIndex, err := labels.ExtractResourceSnapshotIndexFromWork(existingWork)
if err != nil {
klog.ErrorS(err, "work has invalid parent resource index", "work", workObj)
return false, controller.NewUnexpectedBehaviorError(err)
}
// we already checked the label in fetchAllResourceSnapShots function so no need to check again
resourceIndex, _ := labels.ExtractResourceIndexFromClusterResourceSnapshot(resourceSnapshot)
if workResourceIndex == resourceIndex {
// no need to do anything if the work is generated from the same resource/override snapshots
if existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] &&
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] {
klog.V(2).InfoS("Work is associated with the desired override snapshots", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
"existingCROHash", existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation], "work", workObj)
return false, nil
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "work has invalid parent resource index", "work", workObj)
} else {
// we already checked the label in fetchAllResourceSnapShots function so no need to check again
resourceIndex, _ := labels.ExtractResourceIndexFromClusterResourceSnapshot(resourceSnapshot)
if workResourceIndex == resourceIndex {
// no need to do anything if the work is generated from the same resource/override snapshots
if existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] &&
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] {
klog.V(2).InfoS("Work is associated with the desired override snapshots", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
"existingCROHash", existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation], "work", workObj)
return false, nil
}
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
}
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
}
// need to copy the new work to the existing work, only 5 possible changes:
if existingWork.Labels == nil {
existingWork.Labels = make(map[string]string)
}
existingWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = newWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
if existingWork.Annotations == nil {
existingWork.Annotations = make(map[string]string)
Expand Down
197 changes: 197 additions & 0 deletions pkg/controllers/workgenerator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/google/go-cmp/cmp"
Expand All @@ -24,6 +27,7 @@ import (

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/controllers/work"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/test/utils/informer"
Expand Down Expand Up @@ -143,6 +147,199 @@ func TestGetWorkNamePrefixFromSnapshotName(t *testing.T) {
}
}

func TestUpsertWork(t *testing.T) {
workName := "work"
namespace := "default"

var cmpOptions = []cmp.Option{
// ignore the message as we may change the message in the future
cmpopts.IgnoreFields(fleetv1beta1.Work{}, "Status"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "CreationTimestamp"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ManagedFields"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(fleetv1beta1.WorkloadTemplate{}, "Manifests"),
}

testDeployment := appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: utils.DeploymentKind,
APIVersion: utils.DeploymentGVK.GroupVersion().String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "testDeployment",
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(2)),
MinReadySeconds: 5,
},
}
newWork := &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: namespace,
Labels: map[string]string{
fleetv1beta1.ParentResourceSnapshotIndexLabel: "1",
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: "snapshot-1",
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: "hash1",
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: "hash2",
},
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: []fleetv1beta1.Manifest{{RawExtension: runtime.RawExtension{Object: &testDeployment}}},
},
},
}

resourceSnapshot := &fleetv1beta1.ClusterResourceSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "snapshot-1",
Labels: map[string]string{
fleetv1beta1.ResourceIndexLabel: "1",
},
},
}

tests := []struct {
name string
existingWork *fleetv1beta1.Work
expectChanged bool
}{
{
name: "Create new work when existing work is nil",
existingWork: nil,
expectChanged: true,
},
{
name: "Update existing work with new annotations",
existingWork: &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: namespace,
Labels: map[string]string{
fleetv1beta1.ParentResourceSnapshotIndexLabel: "1",
},
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: []fleetv1beta1.Manifest{{RawExtension: runtime.RawExtension{Raw: []byte("{}")}}},
},
},
},
expectChanged: true,
},
{
name: "Update existing work even if it does not have the resource snapshot label",
existingWork: &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: namespace,
Annotations: map[string]string{
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: "hash1",
},
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: []fleetv1beta1.Manifest{{RawExtension: runtime.RawExtension{Raw: []byte("{}")}}},
},
},
},
expectChanged: true,
},

{
name: "Update existing work even if it does not have the resource snapshot label",
existingWork: &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: namespace,
Annotations: map[string]string{
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: "hash1",
},
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: []fleetv1beta1.Manifest{{RawExtension: runtime.RawExtension{Raw: []byte("{}")}}},
},
},
},
expectChanged: true,
},
{
name: "Do not update the existing work if it already points to the same resource and override snapshots",
existingWork: &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Namespace: namespace,
Labels: map[string]string{
fleetv1beta1.ParentResourceSnapshotIndexLabel: "1",
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: "snapshot-1",
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: "hash1",
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: "hash2",
},
},
Spec: fleetv1beta1.WorkSpec{
Workload: fleetv1beta1.WorkloadTemplate{
Manifests: []fleetv1beta1.Manifest{{RawExtension: runtime.RawExtension{Raw: []byte("{}")}}},
},
},
},
expectChanged: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := serviceScheme(t)
objects := []client.Object{resourceSnapshot}
if tt.existingWork != nil {
objects = append(objects, tt.existingWork)
}
fakeClient := fake.NewClientBuilder().
WithStatusSubresource(objects...).
WithScheme(scheme).
WithObjects(objects...).
Build()
// Create reconciler with custom client
reconciler := &Reconciler{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
InformerManager: &informer.FakeManager{},
}
changed, _ := reconciler.upsertWork(ctx, newWork, tt.existingWork, resourceSnapshot)
if changed != tt.expectChanged {
t.Fatalf("expected changed: %v, got: %v", tt.expectChanged, changed)
}
upsertedWork := &fleetv1beta1.Work{}
if fakeClient.Get(ctx, client.ObjectKeyFromObject(newWork), upsertedWork) != nil {
t.Fatalf("failed to get upserted work")
}
if diff := cmp.Diff(newWork, upsertedWork, cmpOptions...); diff != "" {
t.Errorf("upsertWork didn't update the work, diff = %s", diff)
}
if tt.expectChanged {
// check if the deployment is applied
var u unstructured.Unstructured
if err := u.UnmarshalJSON(upsertedWork.Spec.Workload.Manifests[0].Raw); err != nil {
t.Fatalf("Failed to unmarshl the result: %v, want nil", err)
}
var deployment appsv1.Deployment
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &deployment); err != nil {
t.Fatalf("Failed to convert the result to deployment: %v, want nil", err)
}
if diff := cmp.Diff(testDeployment, deployment); diff != "" {
t.Errorf("applyJSONPatchOverride() deployment mismatch (-want, +got):\n%s", diff)
}
}
})
}
}

func TestBuildAllWorkAppliedCondition(t *testing.T) {
tests := map[string]struct {
works map[string]*fleetv1beta1.Work
Expand Down

0 comments on commit c4d687d

Please sign in to comment.