Skip to content

Commit

Permalink
[BACKPORT 2.14][PLAT-9814] Do not garbage collect the customer task w…
Browse files Browse the repository at this point in the history
…hich is still being referred in the universe task or the last task for a provider

Summary:
Original diff - https://phorge.dev.yugabyte.com/D28457 (0bd8593)

This change makes sure that the task garbage collector does not delete the lastest task for provider or if the task UUID is being referenced in a universe.

No change in API.

Test Plan: Unit tests

Reviewers: cwang, amalyshev, sanketh

Reviewed By: amalyshev

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D28596
  • Loading branch information
nkhogen committed Sep 18, 2023
1 parent c4c953a commit ed06d25
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

package com.yugabyte.yw.commissioner;

import static play.mvc.Http.Status.BAD_REQUEST;

import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -21,6 +23,23 @@
import com.yugabyte.yw.models.TaskInfo;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.helpers.TaskType;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,15 +48,6 @@
import play.libs.Json;
import scala.concurrent.ExecutionContext;

import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static play.mvc.Http.Status.BAD_REQUEST;

@Singleton
public class Commissioner {

Expand Down Expand Up @@ -114,7 +124,12 @@ public UUID submit(TaskType taskType, ITaskParams taskParams) {
} catch (Throwable t) {
if (taskRunnable != null) {
// Destroy the task initialization in case of failure.
taskRunnable.task.terminate();
taskRunnable.getTask().terminate();
TaskInfo taskInfo = taskRunnable.getTaskInfo();
if (taskInfo.getTaskState() != TaskInfo.State.Failure) {
taskInfo.setTaskState(TaskInfo.State.Failure);
taskInfo.save();
}
}
String msg = "Error processing " + taskType + " task for " + taskParams.toString();
LOG.error(msg, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public Optional<TaskInfo> abort(UUID taskUUID) {
return Optional.empty();
}
RunnableTask runnableTask = optional.get();
ITask task = runnableTask.task;
ITask task = runnableTask.getTask();
if (!isTaskAbortable(task.getClass())) {
throw new RuntimeException("Task " + task.getName() + " is not abortable");
}
Expand All @@ -415,7 +415,7 @@ public Optional<TaskInfo> abort(UUID taskUUID) {
// Update the task state in the memory and DB.
runnableTask.compareAndSetTaskState(
Sets.immutableEnumSet(State.Initializing, State.Created, State.Running), State.Abort);
return Optional.of(runnableTask.taskInfo);
return Optional.of(runnableTask.getTaskInfo());
}

/**
Expand Down Expand Up @@ -598,7 +598,7 @@ private void removeCompletedSubTask(
RunnableSubTask runnableSubTask,
Throwable throwable) {
if (throwable != null) {
log.error("Error occurred in subtask " + runnableSubTask.taskInfo, throwable);
log.error("Error occurred in subtask " + runnableSubTask.getTaskInfo(), throwable);
}
taskIterator.remove();
numTasksCompleted.incrementAndGet();
Expand Down Expand Up @@ -651,7 +651,7 @@ private void waitForSubTasks() {
} else if (abortTime != null
&& Duration.between(abortTime, Instant.now()).compareTo(defaultAbortTaskTimeout) > 0
&& (skipSubTaskAbortableCheck
|| isTaskAbortable(runnableSubTask.task.getClass()))) {
|| isTaskAbortable(runnableSubTask.getTask().getClass()))) {
future.cancel(true);
// Report aborted to the parent task.
// Update the subtask state to aborted if the execution timed out.
Expand Down Expand Up @@ -719,15 +719,15 @@ public String toString() {
* started running. Synchronization is on the this object for taskInfo.
*/
public abstract class AbstractRunnableTask implements Runnable {
final ITask task;
final TaskInfo taskInfo;
private final ITask task;
private final TaskInfo taskInfo;
// Timeout limit for this task.
final Duration timeLimit;
final String[] creatorCallstack;
private final Duration timeLimit;
private final String[] creatorCallstack;

Instant taskScheduledTime;
Instant taskStartTime;
Instant taskCompletionTime;
private Instant taskScheduledTime;
private Instant taskStartTime;
private Instant taskCompletionTime;

// Future of the task that is set after it is submitted to the ExecutorService.
Future<?> future = null;
Expand Down Expand Up @@ -761,6 +761,14 @@ protected AbstractRunnableTask(ITask task, TaskInfo taskInfo) {
}
}

public ITask getTask() {
return task;
}

public TaskInfo getTaskInfo() {
return taskInfo;
}

@VisibleForTesting
String[] getCreatorCallstack() {
return creatorCallstack;
Expand Down Expand Up @@ -957,9 +965,9 @@ public void setTaskExecutionListener(TaskExecutionListener taskExecutionListener
/** Invoked by the ExecutorService. Do not invoke this directly. */
@Override
public void run() {
UUID taskUUID = taskInfo.getTaskUUID();
UUID taskUUID = getTaskInfo().getTaskUUID();
try {
task.setUserTaskUUID(taskUUID);
getTask().setUserTaskUUID(taskUUID);
super.run();
} catch (Exception e) {
Throwables.propagate(e);
Expand Down Expand Up @@ -1127,7 +1135,7 @@ private void executeWith(ExecutorService executorService) {
@Override
public void run() {
// Sets the top-level user task UUID.
task.setUserTaskUUID(parentRunnableTask.getTaskUUID());
getTask().setUserTaskUUID(parentRunnableTask.getTaskUUID());
super.run();
}

Expand All @@ -1142,18 +1150,18 @@ protected synchronized TaskExecutionListener getTaskExecutionListener() {
}

public synchronized void setSubTaskGroupType(SubTaskGroupType subTaskGroupType) {
if (taskInfo.getSubTaskGroupType() != subTaskGroupType) {
taskInfo.setSubTaskGroupType(subTaskGroupType);
taskInfo.save();
if (getTaskInfo().getSubTaskGroupType() != subTaskGroupType) {
getTaskInfo().setSubTaskGroupType(subTaskGroupType);
getTaskInfo().save();
}
}

private synchronized void setRunnableTaskContext(
RunnableTask parentRunnableTask, int position) {
this.parentRunnableTask = parentRunnableTask;
taskInfo.setParentUuid(parentRunnableTask.getTaskUUID());
taskInfo.setPosition(position);
taskInfo.save();
getTaskInfo().setParentUuid(parentRunnableTask.getTaskUUID());
getTaskInfo().setPosition(position);
getTaskInfo().save();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void start() {
} else {
log.info("Scheduling TaskGC every " + gcInterval);
scheduler.schedule(
Duration.ZERO, // InitialDelay
Duration.ofMinutes(5), // InitialDelay
gcInterval,
this::scheduleRunner,
this.executionContext);
Expand All @@ -108,17 +108,22 @@ private void checkCustomer(Customer c) {
@VisibleForTesting
void purgeStaleTasks(Customer c, List<CustomerTask> staleTasks) {
NUM_TASK_GC_RUNS_COUNT.inc();
int numRowsGCdInThisRun = 0;
for (CustomerTask customerTask : staleTasks) {
int numRowsDeleted = customerTask.cascadeDeleteCompleted();
numRowsGCdInThisRun += numRowsDeleted;
if (numRowsDeleted > 0) {
PURGED_CUSTOMER_TASK_COUNT.labels(c.getUuid().toString()).inc();
PURGED_TASK_INFO_COUNT.labels(c.getUuid().toString()).inc(numRowsDeleted - 1);
} else {
NUM_TASK_GC_ERRORS_COUNT.inc();
}
}
int numRowsGCdInThisRun =
staleTasks
.stream()
.filter(CustomerTask::isDeletable)
.map(
customerTask -> {
int numRowsDeleted = customerTask.cascadeDeleteCompleted();
if (numRowsDeleted > 0) {
PURGED_CUSTOMER_TASK_COUNT.labels(c.getUuid().toString()).inc();
PURGED_TASK_INFO_COUNT.labels(c.getUuid().toString()).inc(numRowsDeleted - 1);
} else {
NUM_TASK_GC_ERRORS_COUNT.inc();
}
return numRowsDeleted;
})
.reduce(0, Integer::sum);
log.info("Garbage collected {} rows", numRowsGCdInThisRun);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@

package com.yugabyte.yw.common;

import static com.yugabyte.yw.models.CustomerTask.TargetType;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yugabyte.yw.commissioner.tasks.subtasks.LoadBalancerStateChange;
import com.yugabyte.yw.forms.RestoreBackupParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.common.services.YBClientService;
import org.yb.client.ChangeLoadBalancerStateResponse;
import org.yb.client.YBClient;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.models.Backup;
import com.yugabyte.yw.models.CustomerTask;
import com.yugabyte.yw.models.CustomerTask.TargetType;
import com.yugabyte.yw.models.ScheduleTask;
import com.yugabyte.yw.models.TaskInfo;
import com.yugabyte.yw.models.Universe;
Expand All @@ -25,11 +20,12 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Singleton;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import play.api.Play;
import org.yb.client.ChangeLoadBalancerStateResponse;
import org.yb.client.YBClient;

@Singleton
public class CustomerTaskManager {
Expand Down Expand Up @@ -67,13 +63,16 @@ public void failPendingTask(CustomerTask customerTask, TaskInfo taskInfo) {
subtask.save();
});

Optional<Universe> optUniv =
customerTask.getTarget().isUniverseTarget()
? Universe.maybeGet(customerTask.getTargetUUID())
: Optional.empty();
if (LOAD_BALANCER_TASK_TYPES.contains(taskInfo.getTaskType())) {
Boolean isLoadBalanceAltered = false;
JsonNode node = taskInfo.getTaskDetails();
if (node.has(ALTER_LOAD_BALANCER)) {
isLoadBalanceAltered = node.path(ALTER_LOAD_BALANCER).asBoolean(false);
}
Optional<Universe> optUniv = Universe.maybeGet(customerTask.getTargetUUID());
if (optUniv.isPresent() && isLoadBalanceAltered) {
enableLoadBalancer(optUniv.get());
}
Expand Down Expand Up @@ -110,25 +109,23 @@ public void failPendingTask(CustomerTask customerTask, TaskInfo taskInfo) {

if (unlockUniverse) {
// Unlock the universe for future operations.
Universe.maybeGet(customerTask.getTargetUUID())
.ifPresent(
u -> {
UniverseDefinitionTaskParams details = u.getUniverseDetails();
if (details.backupInProgress || details.updateInProgress) {
// Create the update lambda.
Universe.UniverseUpdater updater =
universe -> {
UniverseDefinitionTaskParams universeDetails =
universe.getUniverseDetails();
universeDetails.backupInProgress = false;
universeDetails.updateInProgress = false;
universe.setUniverseDetails(universeDetails);
};

Universe.saveDetails(customerTask.getTargetUUID(), updater, false);
LOG.debug("Unlocked universe {}.", customerTask.getTargetUUID());
}
});
optUniv.ifPresent(
u -> {
UniverseDefinitionTaskParams details = u.getUniverseDetails();
if (details.backupInProgress || details.updateInProgress) {
// Create the update lambda.
Universe.UniverseUpdater updater =
universe -> {
UniverseDefinitionTaskParams universeDetails = universe.getUniverseDetails();
universeDetails.backupInProgress = false;
universeDetails.updateInProgress = false;
universe.setUniverseDetails(universeDetails);
};

Universe.saveDetails(customerTask.getTargetUUID(), updater, false);
LOG.debug("Unlocked universe {}.", customerTask.getTargetUUID());
}
});
}

// Mark task as a failure after the universe is unlocked.
Expand All @@ -153,9 +150,18 @@ public void failAllPendingTasks() {
.stream()
.map(Objects::toString)
.collect(Collectors.joining("','"));
// Retrieve all incomplete customer tasks or task in incomplete state. Task state update and
// customer completion time update are not transactional.

// Delete orphaned parent tasks which do not have any associated customer task.
// It is rare but can happen as a customer task and task info are not saved in transaction.
// TODO It can be handled better with transaction but involves bigger change.
String query =
"DELETE FROM task_info WHERE parent_uuid IS NULL AND uuid NOT IN "
+ "(SELECT task_uuid FROM customer_task)";
Ebean.createSqlUpdate(query).execute();

// Retrieve all incomplete customer tasks or task in incomplete state. Task state update
// and customer completion time update are not transactional.
query =
"SELECT ti.uuid AS task_uuid, ct.id AS customer_task_id "
+ "FROM task_info ti, customer_task ct "
+ "WHERE ti.uuid = ct.task_uuid "
Expand All @@ -168,7 +174,7 @@ public void failAllPendingTasks() {
.findList()
.forEach(
row -> {
TaskInfo taskInfo = TaskInfo.get(row.getUUID("task_uuid"));
TaskInfo taskInfo = TaskInfo.getOrBadRequest(row.getUUID("task_uuid"));
CustomerTask customerTask = CustomerTask.get(row.getLong("customer_task_id"));
failPendingTask(customerTask, taskInfo);
});
Expand Down
Loading

0 comments on commit ed06d25

Please sign in to comment.