Skip to content

Commit

Permalink
Implement new tasks, flush and garbagecollect
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Oct 11, 2023
1 parent 8963678 commit 095bb86
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 22 deletions.
11 changes: 7 additions & 4 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const (
CommandCompaction CassandraCommand = "compact"
CommandScrub CassandraCommand = "scrub"
CommandMove CassandraCommand = "move"
CommandGarbageCollect CassandraCommand = "garbagecollect"
CommandFlush CassandraCommand = "flush"
)

type CassandraJob struct {
Expand All @@ -91,10 +93,11 @@ type CassandraJob struct {
}

type JobArguments struct {
KeyspaceName string `json:"keyspace_name,omitempty"`
SourceDatacenter string `json:"source_datacenter,omitempty"`
PodName string `json:"pod_name,omitempty"`
RackName string `json:"rack,omitempty"`
KeyspaceName string `json:"keyspace_name,omitempty"`
SourceDatacenter string `json:"source_datacenter,omitempty"`
PodName string `json:"pod_name,omitempty"`
RackName string `json:"rack,omitempty"`
Tables []string `json:"tables,omitempty"`

// NewTokens is a map of pod names to their newly-assigned tokens. Required for the move
// command, ignored otherwise. Pods referenced in this map must exist; any existing pod not
Expand Down
5 changes: 5 additions & 0 deletions apis/control/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/control.k8ssandra.io_cassandratasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ spec:
type: string
source_datacenter:
type: string
tables:
items:
type: string
type: array
type: object
command:
description: Command defines what is run against Cassandra pods
Expand Down
10 changes: 6 additions & 4 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,18 @@ JobDefinition:
break JobDefinition
case api.CommandReplaceNode:
r.replace(taskConfig)
case "forceupgraderacks":
// res, failed, completed, err = r.reconcileDatacenter(ctx, &dc, forceupgrade(taskConfigProto))
case api.CommandUpgradeSSTables:
upgradesstables(taskConfig)
case api.CommandScrub:
// res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, scrub(taskConfigProto))
// scrub(taskConfig)
case api.CommandCompaction:
// res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, compact(taskConfigProto, job.Arguments))
// compact(taskConfig)
case api.CommandMove:
r.move(taskConfig)
case api.CommandFlush:
flush(taskConfig)
case api.CommandGarbageCollect:
gc(taskConfig)
default:
err = fmt.Errorf("unknown job command: %s", job.Command)
return ctrl.Result{}, err
Expand Down
82 changes: 82 additions & 0 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,54 @@ var _ = Describe("CassandraTask controller tests", func() {

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3))
})
It("Runs a flush task against the datacenter pods", func() {
By("Creating a task for flush")

taskKey, task := buildTask(api.CommandFlush, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
It("Runs a flush task against a pod", func() {
By("Creating a task for flush")

taskKey, task := buildTask(api.CommandFlush, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(1))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})

It("Runs a garbagecollect task against the datacenter pods", func() {
By("Creating a task for garbagecollect")
taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})

When("Running cleanup twice in the same datacenter", func() {
It("Runs a cleanup task against the datacenter pods", func() {
By("Creating a task for cleanup")
Expand Down Expand Up @@ -461,6 +509,40 @@ var _ = Describe("CassandraTask controller tests", func() {
// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})

It("Runs a flush task against the datacenter pods", func() {
By("Creating a task for flush")
time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new
taskKey, task := buildTask(api.CommandFlush, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/tables/flush"]).To(Equal(nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})

It("Runs a garbagecollect task against the datacenter pods", func() {
By("Creating a task for garbagecollect")
time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new
taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/tables/garbagecollect"]).To(Equal(nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
})
Context("Task TTL", func() {
var testNamespaceName string
Expand Down
67 changes: 56 additions & 11 deletions internal/controllers/control/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
// Cleanup functionality

func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) {
// TODO Add more arguments configurations
keyspaceName := taskConfig.Arguments.KeyspaceName
return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, nil)
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, tables)
}

func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
// TODO Add more arguments configurations
keyspaceName := taskConfig.Arguments.KeyspaceName
return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, nil)
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, tables)
}

func cleanup(taskConfig *TaskConfiguration) {
Expand Down Expand Up @@ -109,15 +109,15 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S
// UpgradeSSTables functionality

func callUpgradeSSTables(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) {
// TODO Add more arguments configurations
keyspaceName := taskConfig.Arguments.KeyspaceName
return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, nil)
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, tables)
}

func callUpgradeSSTablesSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
// TODO Add more arguments configurations
keyspaceName := taskConfig.Arguments.KeyspaceName
return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, nil)
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, tables)
}

func upgradesstables(taskConfig *TaskConfiguration) {
Expand Down Expand Up @@ -178,10 +178,13 @@ func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration
return fmt.Errorf("valid pod_name to replace is required")
}

func replaceFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool {
func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool {
// If pod isn't in the to be replaced pods, return false
podName := taskConfig.Arguments.PodName
return pod.Name == podName
if podName != "" {
return pod.Name == podName
}
return true
}

// replacePreProcess adds enough information to CassandraDatacenter to ensure cass-operator knows this pod is being replaced
Expand All @@ -207,7 +210,7 @@ func (r *CassandraTaskReconciler) setDatacenterCondition(dc *cassapi.CassandraDa
func (r *CassandraTaskReconciler) replace(taskConfig *TaskConfiguration) {
taskConfig.SyncFunc = r.replacePod
taskConfig.ValidateFunc = r.replaceValidator
taskConfig.PodFilter = replaceFilter
taskConfig.PodFilter = genericPodFilter
taskConfig.PreProcessFunc = r.replacePreProcess
}

Expand Down Expand Up @@ -253,6 +256,48 @@ func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) {
taskConfig.ValidateFunc = r.moveValidator
}

// Flush functionality

func callFlushSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
keyspaceName := taskConfig.Arguments.KeyspaceName
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallFlushEndpoint(pod, keyspaceName, tables)
}

func callFlushAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) {
keyspaceName := taskConfig.Arguments.KeyspaceName
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallFlush(pod, keyspaceName, tables)
}

func flush(taskConfig *TaskConfiguration) {
taskConfig.AsyncFeature = httphelper.AsyncFlush
taskConfig.PodFilter = genericPodFilter
taskConfig.SyncFunc = callFlushSync
taskConfig.AsyncFunc = callFlushAsync
}

// GarbageCollect functionality

func callGarbageCollectAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) {
keyspaceName := taskConfig.Arguments.KeyspaceName
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallGarbageCollect(pod, keyspaceName, tables)
}

func callGarbageCollectSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
keyspaceName := taskConfig.Arguments.KeyspaceName
tables := taskConfig.Arguments.Tables
return nodeMgmtClient.CallGarbageCollectEndpoint(pod, keyspaceName, tables)
}

func gc(taskConfig *TaskConfiguration) {
taskConfig.AsyncFeature = httphelper.AsyncGarbageCollect
taskConfig.PodFilter = genericPodFilter
taskConfig.AsyncFunc = callGarbageCollectAsync
taskConfig.SyncFunc = callGarbageCollectSync
}

// Common functions

func isCassandraUp(pod *corev1.Pod) bool {
Expand Down
8 changes: 5 additions & 3 deletions pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ var featuresReply = `{
"async_sstable_tasks",
"rebuild",
"async_upgrade_sstable_task",
"async_move_task"
"async_move_task",
"async_gc_task",
"async_flush_task"
]
}`

Expand Down Expand Up @@ -66,7 +68,7 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server,
w.WriteHeader(http.StatusOK)
jobId := query.Get("job_id")
_, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId)))
} else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move") {
} else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move" || r.URL.Path == "/api/v1/ops/tables/compact" || r.URL.Path == "/api/v1/ops/tables/scrub" || r.URL.Path == "/api/v1/ops/tables/flush" || r.URL.Path == "/api/v1/ops/tables/garbagecollect") {
w.WriteHeader(http.StatusOK)
// Write jobId
jobId++
Expand Down Expand Up @@ -117,7 +119,7 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser

func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) {
return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain") {
if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
Expand Down

0 comments on commit 095bb86

Please sign in to comment.