From 095bb862f0b821be8679bb62729f2c0159f41c2f Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 11 Oct 2023 20:41:40 +0300 Subject: [PATCH] Implement new tasks, flush and garbagecollect --- apis/control/v1alpha1/cassandratask_types.go | 11 ++- .../control/v1alpha1/zz_generated.deepcopy.go | 5 ++ .../control.k8ssandra.io_cassandratasks.yaml | 4 + .../control/cassandratask_controller.go | 10 ++- .../control/cassandratask_controller_test.go | 82 +++++++++++++++++++ internal/controllers/control/jobs.go | 67 ++++++++++++--- pkg/httphelper/server_test_utils.go | 8 +- 7 files changed, 165 insertions(+), 22 deletions(-) diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index d7a24845..cf669cc1 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -78,6 +78,8 @@ const ( CommandCompaction CassandraCommand = "compact" CommandScrub CassandraCommand = "scrub" CommandMove CassandraCommand = "move" + CommandGarbageCollect CassandraCommand = "garbagecollect" + CommandFlush CassandraCommand = "flush" ) type CassandraJob struct { @@ -91,10 +93,11 @@ type CassandraJob struct { } type JobArguments struct { - KeyspaceName string `json:"keyspace_name,omitempty"` - SourceDatacenter string `json:"source_datacenter,omitempty"` - PodName string `json:"pod_name,omitempty"` - RackName string `json:"rack,omitempty"` + KeyspaceName string `json:"keyspace_name,omitempty"` + SourceDatacenter string `json:"source_datacenter,omitempty"` + PodName string `json:"pod_name,omitempty"` + RackName string `json:"rack,omitempty"` + Tables []string `json:"tables,omitempty"` // NewTokens is a map of pod names to their newly-assigned tokens. Required for the move // command, ignored otherwise. Pods referenced in this map must exist; any existing pod not diff --git a/apis/control/v1alpha1/zz_generated.deepcopy.go b/apis/control/v1alpha1/zz_generated.deepcopy.go index f8bf3403..7c593d05 100644 --- a/apis/control/v1alpha1/zz_generated.deepcopy.go +++ b/apis/control/v1alpha1/zz_generated.deepcopy.go @@ -181,6 +181,11 @@ func (in *CassandraTaskTemplate) DeepCopy() *CassandraTaskTemplate { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobArguments) DeepCopyInto(out *JobArguments) { *out = *in + if in.Tables != nil { + in, out := &in.Tables, &out.Tables + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.NewTokens != nil { in, out := &in.NewTokens, &out.NewTokens *out = make(map[string]string, len(*in)) diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index ab5c25c3..1cccff3f 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -124,6 +124,10 @@ spec: type: string source_datacenter: type: string + tables: + items: + type: string + type: array type: object command: description: Command defines what is run against Cassandra pods diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 648a5b3e..60eb2a6d 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -292,16 +292,18 @@ JobDefinition: break JobDefinition case api.CommandReplaceNode: r.replace(taskConfig) - case "forceupgraderacks": - // res, failed, completed, err = r.reconcileDatacenter(ctx, &dc, forceupgrade(taskConfigProto)) case api.CommandUpgradeSSTables: upgradesstables(taskConfig) case api.CommandScrub: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, scrub(taskConfigProto)) + // scrub(taskConfig) case api.CommandCompaction: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, compact(taskConfigProto, job.Arguments)) + // compact(taskConfig) case api.CommandMove: r.move(taskConfig) + case api.CommandFlush: + flush(taskConfig) + case api.CommandGarbageCollect: + gc(taskConfig) default: err = fmt.Errorf("unknown job command: %s", job.Command) return ctrl.Result{}, err diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 02ca3c66..b425c6fd 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -307,6 +307,54 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3)) }) + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + It("Runs a flush task against a pod", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + When("Running cleanup twice in the same datacenter", func() { It("Runs a cleanup task against the datacenter pods", func() { By("Creating a task for cleanup") @@ -461,6 +509,40 @@ var _ = Describe("CassandraTask controller tests", func() { // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) + + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) }) Context("Task TTL", func() { var testNamespaceName string diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 72b5d04b..e8f85b0a 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -21,15 +21,15 @@ import ( // Cleanup functionality func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, tables) } func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, tables) } func cleanup(taskConfig *TaskConfiguration) { @@ -109,15 +109,15 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S // UpgradeSSTables functionality func callUpgradeSSTables(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, tables) } func callUpgradeSSTablesSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, tables) } func upgradesstables(taskConfig *TaskConfiguration) { @@ -178,10 +178,13 @@ func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration return fmt.Errorf("valid pod_name to replace is required") } -func replaceFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { +func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { // If pod isn't in the to be replaced pods, return false podName := taskConfig.Arguments.PodName - return pod.Name == podName + if podName != "" { + return pod.Name == podName + } + return true } // replacePreProcess adds enough information to CassandraDatacenter to ensure cass-operator knows this pod is being replaced @@ -207,7 +210,7 @@ func (r *CassandraTaskReconciler) setDatacenterCondition(dc *cassapi.CassandraDa func (r *CassandraTaskReconciler) replace(taskConfig *TaskConfiguration) { taskConfig.SyncFunc = r.replacePod taskConfig.ValidateFunc = r.replaceValidator - taskConfig.PodFilter = replaceFilter + taskConfig.PodFilter = genericPodFilter taskConfig.PreProcessFunc = r.replacePreProcess } @@ -253,6 +256,48 @@ func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) { taskConfig.ValidateFunc = r.moveValidator } +// Flush functionality + +func callFlushSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlushEndpoint(pod, keyspaceName, tables) +} + +func callFlushAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlush(pod, keyspaceName, tables) +} + +func flush(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncFlush + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = callFlushSync + taskConfig.AsyncFunc = callFlushAsync +} + +// GarbageCollect functionality + +func callGarbageCollectAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollect(pod, keyspaceName, tables) +} + +func callGarbageCollectSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollectEndpoint(pod, keyspaceName, tables) +} + +func gc(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncGarbageCollect + taskConfig.PodFilter = genericPodFilter + taskConfig.AsyncFunc = callGarbageCollectAsync + taskConfig.SyncFunc = callGarbageCollectSync +} + // Common functions func isCassandraUp(pod *corev1.Pod) bool { diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index ae839804..8d9cfe32 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -15,7 +15,9 @@ var featuresReply = `{ "async_sstable_tasks", "rebuild", "async_upgrade_sstable_task", - "async_move_task" + "async_move_task", + "async_gc_task", + "async_flush_task" ] }` @@ -66,7 +68,7 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, w.WriteHeader(http.StatusOK) jobId := query.Get("job_id") _, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId))) - } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move") { + } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move" || r.URL.Path == "/api/v1/ops/tables/compact" || r.URL.Path == "/api/v1/ops/tables/scrub" || r.URL.Path == "/api/v1/ops/tables/flush" || r.URL.Path == "/api/v1/ops/tables/garbagecollect") { w.WriteHeader(http.StatusOK) // Write jobId jobId++ @@ -117,7 +119,7 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) { return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain") { + if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusNotFound)