From 03405febc1b4b01658a109bca00b21a35a4332bd Mon Sep 17 00:00:00 2001 From: Jeeva Kandasamy Date: Thu, 28 Sep 2023 05:12:30 +0530 Subject: [PATCH] continue the post upgrade on storage migration errors Signed-off-by: Jeeva Kandasamy --- .../tektonconfig/upgrade/helper/migrator.go | 131 ++++++++++ .../upgrade/helper/migrator_test.go | 228 ++++++++++++++++++ .../helper/storage_version_migrator.go | 27 ++- .../helper/storage_version_migrator_test.go | 6 +- .../tektonconfig/upgrade/post_upgrade.go | 8 +- 5 files changed, 381 insertions(+), 19 deletions(-) create mode 100644 pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator.go create mode 100644 pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator_test.go diff --git a/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator.go b/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator.go new file mode 100644 index 0000000000..85efafaa5f --- /dev/null +++ b/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator.go @@ -0,0 +1,131 @@ +/* +Copyright 2023 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// copied from: https://github.com/knative/pkg/blob/2783cd8cfad9ba907e6f31cafeef3eb2943424ee/apiextensions/storageversion/migrator.go +// local changes: continue the execution even though error happens on patching a resource +//--- + +package upgrade + +import ( + "context" + "fmt" + + "go.uber.org/zap" + apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/pager" +) + +// Migrator will read custom resource definitions and upgrade +// the associated resources to the latest storage version +type Migrator struct { + dynamicClient dynamic.Interface + apixClient apixclient.Interface + logger *zap.SugaredLogger +} + +// NewMigrator will return a new Migrator +func NewMigrator(d dynamic.Interface, a apixclient.Interface, logger *zap.SugaredLogger) *Migrator { + return &Migrator{ + dynamicClient: d, + apixClient: a, + logger: logger, + } +} + +// Migrate takes a group resource (ie. resource.some.group.dev) and +// updates instances of the resource to the latest storage version +// +// This is done by listing all the resources and performing an empty patch +// which triggers a migration on the K8s API server +// +// Finally the migrator will update the CRD's status and drop older storage +// versions +func (m *Migrator) Migrate(ctx context.Context, gr schema.GroupResource) error { + crdClient := m.apixClient.ApiextensionsV1().CustomResourceDefinitions() + + crd, err := crdClient.Get(ctx, gr.String(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("unable to fetch crd %s - %w", gr, err) + } + + version := storageVersion(crd) + + if version == "" { + return fmt.Errorf("unable to determine storage version for %s", gr) + } + + if err := m.migrateResources(ctx, gr.WithVersion(version)); err != nil { + return err + } + + patch := `{"status":{"storedVersions":["` + version + `"]}}` + _, err = crdClient.Patch(ctx, crd.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") + if err != nil { + return fmt.Errorf("unable to drop storage version definition %s - %w", gr, err) + } + + return nil +} + +func (m *Migrator) migrateResources(ctx context.Context, gvr schema.GroupVersionResource) error { + client := m.dynamicClient.Resource(gvr) + + listFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return client.Namespace(metav1.NamespaceAll).List(ctx, opts) + } + + onEach := func(obj runtime.Object) error { + item := obj.(metav1.Object) + + _, err := client.Namespace(item.GetNamespace()). + Patch(ctx, item.GetName(), types.MergePatchType, []byte("{}"), metav1.PatchOptions{}) + + if err != nil && !apierrs.IsNotFound(err) { + m.logger.Errorw("unable to patch a resource", + "resourceName", item.GetName(), + "namespace", item.GetNamespace(), + "groupVersionResource", gvr, + err, + ) + } + + return nil + } + + pager := pager.New(listFunc) + return pager.EachListItem(ctx, metav1.ListOptions{}, onEach) +} + +func storageVersion(crd *apix.CustomResourceDefinition) string { + var version string + + for _, v := range crd.Spec.Versions { + if v.Storage { + version = v.Name + break + } + } + + return version +} diff --git a/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator_test.go b/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator_test.go new file mode 100644 index 0000000000..4541217479 --- /dev/null +++ b/pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2023 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// copied from: https://github.com/knative/pkg/blob/2783cd8cfad9ba907e6f31cafeef3eb2943424ee/apiextensions/storageversion/migrator_test.go +// local changes: aligned with local migrator.go +// --- +package upgrade + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apixFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + dynamicFake "k8s.io/client-go/dynamic/fake" + k8stesting "k8s.io/client-go/testing" + "knative.dev/pkg/logging" +) + +var ( + fakeGK = schema.GroupKind{ + Kind: "Fake", + Group: "group.dev", + } + + fakeGR = schema.GroupResource{ + Resource: "fakes", + Group: "group.dev", + } + + fakeCRD = &apix.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeGR.String(), + }, + Spec: apix.CustomResourceDefinitionSpec{ + Group: fakeGK.Group, + Versions: []apix.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1", Served: true, Storage: true}, + }, + }, + Status: apix.CustomResourceDefinitionStatus{ + StoredVersions: []string{ + "v1alpha1", + "v1", + }, + }, + } +) + +func TestMigrate(t *testing.T) { + // setup + resources := []runtime.Object{fake("first"), fake("second")} + dclient := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), resources...) + cclient := apixFake.NewSimpleClientset(fakeCRD) + ctx := context.TODO() + m := NewMigrator(dclient, cclient, logging.FromContext(ctx)) + + if err := m.Migrate(context.Background(), fakeGR); err != nil { + t.Fatal("Migrate() =", err) + } + + assertPatches(t, dclient.Actions(), + // patch resource definition dropping non-storage version + emptyResourcePatch("first", "v1"), + emptyResourcePatch("second", "v1"), + ) + + assertPatches(t, cclient.Actions(), + // patch resource definition dropping non-storage version + crdStorageVersionPatch(fakeCRD.Name, "v1"), + ) +} + +// func TestMigrate_Paging(t *testing.T) { +// t.Skip("client-go lacks testing pagination " + +// "since list options aren't captured in actions") +// } + +func TestMigrate_Errors(t *testing.T) { + tests := []struct { + name string + crd func(*k8stesting.Fake) + dyn func(*k8stesting.Fake) + pass bool + }{{ + name: "failed to fetch CRD", + crd: func(fake *k8stesting.Fake) { + fake.PrependReactor("get", "*", + func(k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("failed to get crd") + }) + }, + }, { + name: "listing fails", + dyn: func(fake *k8stesting.Fake) { + fake.PrependReactor("list", "*", + func(k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("failed to list resources") + }) + }, + }, { + name: "patching resource fails", + dyn: func(fake *k8stesting.Fake) { + fake.PrependReactor("patch", "*", + func(k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("failed to patch resources") + }) + }, + // prints the error and continues the execution + pass: true, + }, { + name: "patching definition fails", + crd: func(fake *k8stesting.Fake) { + fake.PrependReactor("patch", "*", + func(k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("failed to patch definition") + }) + }, + }, { + name: "patching unexisting resource", + dyn: func(fake *k8stesting.Fake) { + fake.PrependReactor("patch", "*", + func(k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrs.NewNotFound(fakeGR, "resource-removed") + }) + }, + // Resouce not found error should not block the storage migration. + pass: true, + }, + // todo paging fails + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + resources := []runtime.Object{fake("first"), fake("second")} + dclient := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), resources...) + cclient := apixFake.NewSimpleClientset(fakeCRD) + + if test.crd != nil { + test.crd(&cclient.Fake) + } + + if test.dyn != nil { + test.dyn(&dclient.Fake) + } + + m := NewMigrator(dclient, cclient, logging.FromContext(context.TODO())) + if err := m.Migrate(context.Background(), fakeGR); test.pass != (err == nil) { + t.Error("Migrate should have returned an error") + } + }) + } +} + +func assertPatches(t *testing.T, actions []k8stesting.Action, want ...k8stesting.PatchAction) { + t.Helper() + + got := getPatchActions(actions) + if diff := cmp.Diff(got, want); diff != "" { + t.Error("Unexpected patches:", diff) + } +} + +func emptyResourcePatch(name, version string) k8stesting.PatchAction { + return k8stesting.NewPatchAction( + fakeGR.WithVersion(version), + "default", + name, + types.MergePatchType, + []byte("{}")) +} + +func crdStorageVersionPatch(name, version string) k8stesting.PatchAction { + return k8stesting.NewRootPatchSubresourceAction( + apix.SchemeGroupVersion.WithResource("customresourcedefinitions"), + name, + types.StrategicMergePatchType, + []byte(`{"status":{"storedVersions":["`+version+`"]}}`), + "status", + ) +} + +func getPatchActions(actions []k8stesting.Action) []k8stesting.PatchAction { + var patches []k8stesting.PatchAction + + for _, action := range actions { + if pa, ok := action.(k8stesting.PatchAction); ok { + patches = append(patches, pa) + } + } + + return patches +} + +func fake(name string) runtime.Object { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "group.dev/v1", + "kind": "Fake", + "metadata": map[string]interface{}{ + "name": name, + "namespace": "default", + }, + }, + } +} diff --git a/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator.go b/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator.go index 046effec63..83998cae0d 100644 --- a/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator.go +++ b/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator.go @@ -18,38 +18,41 @@ package upgrade import ( "context" - "fmt" "go.uber.org/zap" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" - "knative.dev/pkg/apiextensions/storageversion" ) // performs crd storage version upgrade // lists all the resources and, // keeps only one storage version on the crd -func MigrateStorageVersion(ctx context.Context, logger *zap.SugaredLogger, migrator *storageversion.Migrator, crdGroups []string) error { +// continues the execution, even though exception happens +func MigrateStorageVersion(ctx context.Context, logger *zap.SugaredLogger, migrator *Migrator, crdGroups []string) { logger.Infof("migrating %d group resources", len(crdGroups)) for _, crdGroupString := range crdGroups { crdGroup := schema.ParseGroupResource(crdGroupString) if crdGroup.Empty() { - logger.Errorf("unable to parse group version: %s", crdGroupString) - return fmt.Errorf("unable to parse group version: %s", crdGroupString) + logger.Errorf("unable to parse group version: '%s'", crdGroupString) + continue } - logger.Info("migrating group resource ", crdGroup) + logger.Infow("migrating group resource", "crdGroup", crdGroup) if err := migrator.Migrate(ctx, crdGroup); err != nil { if apierrs.IsNotFound(err) { - logger.Infow("ignoring resource migration - unable to fetch a crd", - "crd", crdGroup, err, + logger.Infow("ignoring resource migration - unable to fetch a crdGroup", + "crdGroup", crdGroup, + err, ) continue } - logger.Errorw("failed to migrate: ", err) - return err + logger.Errorw("failed to migrate a crdGroup", + "crdGroup", crdGroup, + err, + ) + // continue the execution, even though failures on the crd migration + } else { + logger.Infow("migration completed", "crdGroup", crdGroup) } } - - return nil } diff --git a/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator_test.go b/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator_test.go index 193d2a81da..f0e9fc58a9 100644 --- a/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator_test.go +++ b/pkg/reconciler/shared/tektonconfig/upgrade/helper/storage_version_migrator_test.go @@ -28,7 +28,6 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" dynamicFake "k8s.io/client-go/dynamic/fake" - "knative.dev/pkg/apiextensions/storageversion" "knative.dev/pkg/logging" ) @@ -74,13 +73,12 @@ func TestMigrateStorageVersion(t *testing.T) { dclient := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), resources...) cclient := apixFake.NewSimpleClientset(fakeCRD) - migrator := storageversion.NewMigrator(dclient, cclient) + migrator := NewMigrator(dclient, cclient, logging.FromContext(ctx)) logger := logging.FromContext(ctx) // TEST // expects only "v1" - err := MigrateStorageVersion(ctx, logger, migrator, []string{"fakes.group.dev", "unknown.group.dev"}) - assert.NoError(t, err) + MigrateStorageVersion(ctx, logger, migrator, []string{"fakes.group.dev", "unknown.group.dev"}) crd, err := cclient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, fakeCRD.GetName(), metav1.GetOptions{}) assert.NoError(t, err) storageVersions := crd.Status.StoredVersions diff --git a/pkg/reconciler/shared/tektonconfig/upgrade/post_upgrade.go b/pkg/reconciler/shared/tektonconfig/upgrade/post_upgrade.go index c3ed10558d..18a746afe5 100644 --- a/pkg/reconciler/shared/tektonconfig/upgrade/post_upgrade.go +++ b/pkg/reconciler/shared/tektonconfig/upgrade/post_upgrade.go @@ -26,7 +26,6 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "knative.dev/pkg/apiextensions/storageversion" ) // performs storage versions upgrade @@ -61,10 +60,13 @@ func upgradeStorageVersion(ctx context.Context, logger *zap.SugaredLogger, k8sCl "triggertemplates.triggers.tekton.dev", } - migrator := storageversion.NewMigrator( + migrator := upgrade.NewMigrator( dynamic.NewForConfigOrDie(restConfig), apixclient.NewForConfigOrDie(restConfig), + logger, ) - return upgrade.MigrateStorageVersion(ctx, logger, migrator, crdGroups) + upgrade.MigrateStorageVersion(ctx, logger, migrator, crdGroups) + + return nil }