-
Notifications
You must be signed in to change notification settings - Fork 200
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
continue the post upgrade on storage migration errors
Signed-off-by: Jeeva Kandasamy <[email protected]>
- Loading branch information
Showing
5 changed files
with
381 additions
and
19 deletions.
There are no files selected for viewing
131 changes: 131 additions & 0 deletions
131
pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
/* | ||
Copyright 2023 The Tekton Authors | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// copied from: https://github.com/knative/pkg/blob/2783cd8cfad9ba907e6f31cafeef3eb2943424ee/apiextensions/storageversion/migrator.go | ||
// local changes: continue the execution even though error happens on patching a resource | ||
//--- | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.uber.org/zap" | ||
apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||
apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | ||
apierrs "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/tools/pager" | ||
) | ||
|
||
// Migrator will read custom resource definitions and upgrade | ||
// the associated resources to the latest storage version | ||
type Migrator struct { | ||
dynamicClient dynamic.Interface | ||
apixClient apixclient.Interface | ||
logger *zap.SugaredLogger | ||
} | ||
|
||
// NewMigrator will return a new Migrator | ||
func NewMigrator(d dynamic.Interface, a apixclient.Interface, logger *zap.SugaredLogger) *Migrator { | ||
return &Migrator{ | ||
dynamicClient: d, | ||
apixClient: a, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// Migrate takes a group resource (ie. resource.some.group.dev) and | ||
// updates instances of the resource to the latest storage version | ||
// | ||
// This is done by listing all the resources and performing an empty patch | ||
// which triggers a migration on the K8s API server | ||
// | ||
// Finally the migrator will update the CRD's status and drop older storage | ||
// versions | ||
func (m *Migrator) Migrate(ctx context.Context, gr schema.GroupResource) error { | ||
crdClient := m.apixClient.ApiextensionsV1().CustomResourceDefinitions() | ||
|
||
crd, err := crdClient.Get(ctx, gr.String(), metav1.GetOptions{}) | ||
if err != nil { | ||
return fmt.Errorf("unable to fetch crd %s - %w", gr, err) | ||
} | ||
|
||
version := storageVersion(crd) | ||
|
||
if version == "" { | ||
return fmt.Errorf("unable to determine storage version for %s", gr) | ||
} | ||
|
||
if err := m.migrateResources(ctx, gr.WithVersion(version)); err != nil { | ||
return err | ||
} | ||
|
||
patch := `{"status":{"storedVersions":["` + version + `"]}}` | ||
_, err = crdClient.Patch(ctx, crd.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") | ||
if err != nil { | ||
return fmt.Errorf("unable to drop storage version definition %s - %w", gr, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (m *Migrator) migrateResources(ctx context.Context, gvr schema.GroupVersionResource) error { | ||
client := m.dynamicClient.Resource(gvr) | ||
|
||
listFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { | ||
return client.Namespace(metav1.NamespaceAll).List(ctx, opts) | ||
} | ||
|
||
onEach := func(obj runtime.Object) error { | ||
item := obj.(metav1.Object) | ||
|
||
_, err := client.Namespace(item.GetNamespace()). | ||
Patch(ctx, item.GetName(), types.MergePatchType, []byte("{}"), metav1.PatchOptions{}) | ||
|
||
if err != nil && !apierrs.IsNotFound(err) { | ||
m.logger.Errorw("unable to patch a resource", | ||
"resourceName", item.GetName(), | ||
"namespace", item.GetNamespace(), | ||
"groupVersionResource", gvr, | ||
err, | ||
) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
pager := pager.New(listFunc) | ||
return pager.EachListItem(ctx, metav1.ListOptions{}, onEach) | ||
} | ||
|
||
func storageVersion(crd *apix.CustomResourceDefinition) string { | ||
var version string | ||
|
||
for _, v := range crd.Spec.Versions { | ||
if v.Storage { | ||
version = v.Name | ||
break | ||
} | ||
} | ||
|
||
return version | ||
} |
228 changes: 228 additions & 0 deletions
228
pkg/reconciler/shared/tektonconfig/upgrade/helper/migrator_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
/* | ||
Copyright 2023 The Tekton Authors | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// copied from: https://github.com/knative/pkg/blob/2783cd8cfad9ba907e6f31cafeef3eb2943424ee/apiextensions/storageversion/migrator_test.go | ||
// local changes: aligned with local migrator.go | ||
// --- | ||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||
apixFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" | ||
apierrs "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
runtime "k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
dynamicFake "k8s.io/client-go/dynamic/fake" | ||
k8stesting "k8s.io/client-go/testing" | ||
"knative.dev/pkg/logging" | ||
) | ||
|
||
var ( | ||
fakeGK = schema.GroupKind{ | ||
Kind: "Fake", | ||
Group: "group.dev", | ||
} | ||
|
||
fakeGR = schema.GroupResource{ | ||
Resource: "fakes", | ||
Group: "group.dev", | ||
} | ||
|
||
fakeCRD = &apix.CustomResourceDefinition{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: fakeGR.String(), | ||
}, | ||
Spec: apix.CustomResourceDefinitionSpec{ | ||
Group: fakeGK.Group, | ||
Versions: []apix.CustomResourceDefinitionVersion{ | ||
{Name: "v1alpha1", Served: true, Storage: false}, | ||
{Name: "v1", Served: true, Storage: true}, | ||
}, | ||
}, | ||
Status: apix.CustomResourceDefinitionStatus{ | ||
StoredVersions: []string{ | ||
"v1alpha1", | ||
"v1", | ||
}, | ||
}, | ||
} | ||
) | ||
|
||
func TestMigrate(t *testing.T) { | ||
// setup | ||
resources := []runtime.Object{fake("first"), fake("second")} | ||
dclient := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), resources...) | ||
cclient := apixFake.NewSimpleClientset(fakeCRD) | ||
ctx := context.TODO() | ||
m := NewMigrator(dclient, cclient, logging.FromContext(ctx)) | ||
|
||
if err := m.Migrate(context.Background(), fakeGR); err != nil { | ||
t.Fatal("Migrate() =", err) | ||
} | ||
|
||
assertPatches(t, dclient.Actions(), | ||
// patch resource definition dropping non-storage version | ||
emptyResourcePatch("first", "v1"), | ||
emptyResourcePatch("second", "v1"), | ||
) | ||
|
||
assertPatches(t, cclient.Actions(), | ||
// patch resource definition dropping non-storage version | ||
crdStorageVersionPatch(fakeCRD.Name, "v1"), | ||
) | ||
} | ||
|
||
// func TestMigrate_Paging(t *testing.T) { | ||
// t.Skip("client-go lacks testing pagination " + | ||
// "since list options aren't captured in actions") | ||
// } | ||
|
||
func TestMigrate_Errors(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
crd func(*k8stesting.Fake) | ||
dyn func(*k8stesting.Fake) | ||
pass bool | ||
}{{ | ||
name: "failed to fetch CRD", | ||
crd: func(fake *k8stesting.Fake) { | ||
fake.PrependReactor("get", "*", | ||
func(k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, nil, errors.New("failed to get crd") | ||
}) | ||
}, | ||
}, { | ||
name: "listing fails", | ||
dyn: func(fake *k8stesting.Fake) { | ||
fake.PrependReactor("list", "*", | ||
func(k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, nil, errors.New("failed to list resources") | ||
}) | ||
}, | ||
}, { | ||
name: "patching resource fails", | ||
dyn: func(fake *k8stesting.Fake) { | ||
fake.PrependReactor("patch", "*", | ||
func(k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, nil, errors.New("failed to patch resources") | ||
}) | ||
}, | ||
// prints the error and continues the execution | ||
pass: true, | ||
}, { | ||
name: "patching definition fails", | ||
crd: func(fake *k8stesting.Fake) { | ||
fake.PrependReactor("patch", "*", | ||
func(k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, nil, errors.New("failed to patch definition") | ||
}) | ||
}, | ||
}, { | ||
name: "patching unexisting resource", | ||
dyn: func(fake *k8stesting.Fake) { | ||
fake.PrependReactor("patch", "*", | ||
func(k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, nil, apierrs.NewNotFound(fakeGR, "resource-removed") | ||
}) | ||
}, | ||
// Resouce not found error should not block the storage migration. | ||
pass: true, | ||
}, | ||
// todo paging fails | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
resources := []runtime.Object{fake("first"), fake("second")} | ||
dclient := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), resources...) | ||
cclient := apixFake.NewSimpleClientset(fakeCRD) | ||
|
||
if test.crd != nil { | ||
test.crd(&cclient.Fake) | ||
} | ||
|
||
if test.dyn != nil { | ||
test.dyn(&dclient.Fake) | ||
} | ||
|
||
m := NewMigrator(dclient, cclient, logging.FromContext(context.TODO())) | ||
if err := m.Migrate(context.Background(), fakeGR); test.pass != (err == nil) { | ||
t.Error("Migrate should have returned an error") | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func assertPatches(t *testing.T, actions []k8stesting.Action, want ...k8stesting.PatchAction) { | ||
t.Helper() | ||
|
||
got := getPatchActions(actions) | ||
if diff := cmp.Diff(got, want); diff != "" { | ||
t.Error("Unexpected patches:", diff) | ||
} | ||
} | ||
|
||
func emptyResourcePatch(name, version string) k8stesting.PatchAction { | ||
return k8stesting.NewPatchAction( | ||
fakeGR.WithVersion(version), | ||
"default", | ||
name, | ||
types.MergePatchType, | ||
[]byte("{}")) | ||
} | ||
|
||
func crdStorageVersionPatch(name, version string) k8stesting.PatchAction { | ||
return k8stesting.NewRootPatchSubresourceAction( | ||
apix.SchemeGroupVersion.WithResource("customresourcedefinitions"), | ||
name, | ||
types.StrategicMergePatchType, | ||
[]byte(`{"status":{"storedVersions":["`+version+`"]}}`), | ||
"status", | ||
) | ||
} | ||
|
||
func getPatchActions(actions []k8stesting.Action) []k8stesting.PatchAction { | ||
var patches []k8stesting.PatchAction | ||
|
||
for _, action := range actions { | ||
if pa, ok := action.(k8stesting.PatchAction); ok { | ||
patches = append(patches, pa) | ||
} | ||
} | ||
|
||
return patches | ||
} | ||
|
||
func fake(name string) runtime.Object { | ||
return &unstructured.Unstructured{ | ||
Object: map[string]interface{}{ | ||
"apiVersion": "group.dev/v1", | ||
"kind": "Fake", | ||
"metadata": map[string]interface{}{ | ||
"name": name, | ||
"namespace": "default", | ||
}, | ||
}, | ||
} | ||
} |
Oops, something went wrong.