Skip to content

Commit

Permalink
fix(#717): Client and server timeouts are not aligned
Browse files Browse the repository at this point in the history
There are some operations that may take quite a while - for example replacing `goLive` operation (which reloads entire catalog in memory) or (`replaceCatalog`). These operations randomly times out when executed from gRPC client because the client and server timeouts are not aligned and either of those might cancel the operation any time.
  • Loading branch information
novoj committed Oct 25, 2024
1 parent 2836e3d commit 13bcd2d
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 408 deletions.
6 changes: 3 additions & 3 deletions evita_engine/src/main/java/io/evitadb/core/Evita.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import io.evitadb.api.requestResponse.schema.mutation.catalog.RemoveCatalogSchemaMutation;
import io.evitadb.api.task.ServerTask;
import io.evitadb.core.async.ClientRunnableTask;
import io.evitadb.core.async.ObservableExecutorService;
import io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline;
import io.evitadb.core.async.ObservableThreadExecutor;
import io.evitadb.core.async.Scheduler;
import io.evitadb.core.async.SessionKiller;
Expand Down Expand Up @@ -165,12 +165,12 @@ public final class Evita implements EvitaContract {
* Executor service that handles all requests to the Evita instance.
*/
@Getter
private final ObservableExecutorService requestExecutor;
private final ObservableExecutorServiceWithHardDeadline requestExecutor;
/**
* Executor service that handles transaction handling, once transaction gets committed.
*/
@Getter
private final ObservableExecutorService transactionExecutor;
private final ObservableExecutorServiceWithHardDeadline transactionExecutor;
/**
* Scheduler service for executing asynchronous service tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
*
* _ _ ____ ____
* _____ _(_) |_ __ _| _ \| __ )
* / _ \ \ / / | __/ _` | | | | _ \
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2024
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/FgForrest/evitaDB/blob/master/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.evitadb.core.async;


import javax.annotation.Nonnull;
import java.util.concurrent.Callable;

/**
* This interface extends {@link ObservableExecutorService} and marks a service that actively cancels tasks that exceed
* their specified timeout duration. The default timeout duration for tasks submitted without an explicit timeout is
* specified by {@link #getDefaultTimeoutInMilliseconds()}.
*
* @author Jan Novotný ([email protected]), FG Forrest a.s. (c) 2024
*/
public interface ObservableExecutorServiceWithHardDeadline extends ObservableExecutorService {

/**
* Retrieves the default timeout value in milliseconds for all tasks submitted without explicit timeout.
*
* @return the default timeout duration in milliseconds
*/
long getDefaultTimeoutInMilliseconds();

/**
* Creates a task with the given name and lambda function to be executed.
*
* @param name the name of the task
* @param lambda the task to be executed
* @return a Runnable representing the task
*/
@Nonnull
Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda);

/**
* Creates a task to be executed from the given lambda.
*
* @param lambda the task to be executed
* @return a Runnable representing the task
*/
@Nonnull
Runnable createTask(@Nonnull Runnable lambda);

/**
* Creates a task with the given name and lambda function, to be executed with a specified timeout.
*
* @param name the name of the task
* @param lambda the task to be executed
* @param timeoutInMilliseconds the timeout duration in milliseconds
* @return a Runnable representing the task
*/
@Nonnull
Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda, long timeoutInMilliseconds);

/**
* Creates a task with the given lambda function, to be executed with a specified timeout.
*
* @param lambda the task to be executed
* @param timeoutInMilliseconds the timeout duration in milliseconds
* @return a Runnable representing the task
*/
@Nonnull
Runnable createTask(@Nonnull Runnable lambda, long timeoutInMilliseconds);

/**
* Creates a task with the given name and lambda function to be executed.
*
* @param name the name of the task
* @param lambda the task to be executed
* @param <V> the result type of method call
* @return a Callable representing the task
*/
@Nonnull
<V> Callable<V> createTask(@Nonnull String name, @Nonnull Callable<V> lambda);

/**
* Creates a task to be executed from the given lambda.
*
* @param lambda the task to be executed
* @param <V> the result type of method call
* @return a Callable representing the task
*/
@Nonnull
<V> Callable<V> createTask(@Nonnull Callable<V> lambda);

/**
* Creates a task with the given name and lambda function, to be executed with a specified timeout.
*
* @param name the name of the task
* @param lambda the task to be executed
* @param timeoutInMilliseconds the timeout duration in milliseconds
* @param <V> the result type of method call
* @return a Callable representing the task
*/
@Nonnull
<V> Callable<V> createTask(@Nonnull String name, @Nonnull Callable<V> lambda, long timeoutInMilliseconds);

/**
* Creates a task from the given lambda function, to be executed with a specified timeout.
*
* @param lambda the task to be executed
* @param timeoutInMilliseconds the timeout duration in milliseconds
* @param <V> the result type of method call
* @return a Callable representing the task
*/
@Nonnull
<V> Callable<V> createTask(@Nonnull Callable<V> lambda, long timeoutInMilliseconds);

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* @author Jan Novotný ([email protected]), FG Forrest a.s. (c) 2024
*/
@Slf4j
public class ObservableThreadExecutor implements ObservableExecutorService {
public class ObservableThreadExecutor implements ObservableExecutorServiceWithHardDeadline {
private static final int BUFFER_CAPACITY = 512;
/**
* Buffer used for purging finished tasks.
Expand Down Expand Up @@ -133,6 +133,11 @@ public ForkJoinPool getForkJoinPoolInternal() {
return forkJoinPool;
}

@Override
public long getDefaultTimeoutInMilliseconds() {
return this.timeoutInMilliseconds;
}

@Override
public long getSubmittedTaskCount() {
return this.submittedTaskCount.get();
Expand Down Expand Up @@ -357,6 +362,7 @@ private void cancelTimedOutTasks() {
// if task is running / waiting longer than the threshold, cancel it and remove it from the queue
if (task.isTimedOut(threshold)) {
timedOutTasks++;
log.info("Cancelling timed out task: {}", task);
task.cancel();
it.remove();
} else {
Expand Down Expand Up @@ -409,10 +415,76 @@ private interface ObservableTask {

}

@Override
@Nonnull
public Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda) {
return new ObservableRunnable(name, lambda, this.timeoutInMilliseconds);
}

@Override
@Nonnull
public Runnable createTask(@Nonnull Runnable lambda) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
return new ObservableRunnable(
stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown",
lambda, this.timeoutInMilliseconds);
}

@Override
@Nonnull
public Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda, long timeoutInMilliseconds) {
return new ObservableRunnable(name, lambda, timeoutInMilliseconds);
}

@Override
@Nonnull
public Runnable createTask(@Nonnull Runnable lambda, long timeoutInMilliseconds) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
return new ObservableRunnable(
stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown",
lambda, timeoutInMilliseconds);
}

@Override
@Nonnull
public <V> Callable<V> createTask(@Nonnull String name, @Nonnull Callable<V> lambda) {
return new ObservableCallable<>(name, lambda, this.timeoutInMilliseconds);
}

@Override
@Nonnull
public <V> Callable<V> createTask(@Nonnull Callable<V> lambda) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
return new ObservableCallable<>(
stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown",
lambda, this.timeoutInMilliseconds
);
}

@Override
@Nonnull
public <V> Callable<V> createTask(@Nonnull String name, @Nonnull Callable<V> lambda, long timeoutInMilliseconds) {
return new ObservableCallable<>(name, lambda, timeoutInMilliseconds);
}

@Override
@Nonnull
public <V> Callable<V> createTask(@Nonnull Callable<V> lambda, long timeoutInMilliseconds) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
return new ObservableCallable<>(
stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown",
lambda, timeoutInMilliseconds
);
}

/**
* Wrapper around a {@link Runnable} that implements the {@link ObservableTask} interface.
*/
private static class ObservableRunnable implements Runnable, ObservableTask {
/**
* Name / description of the task.
*/
private final String name;
/**
* Delegate runnable that is being wrapped.
*/
Expand All @@ -427,6 +499,14 @@ private static class ObservableRunnable implements Runnable, ObservableTask {
private final CompletableFuture<Void> future = new CompletableFuture<>();

public ObservableRunnable(@Nonnull Runnable delegate, long timeoutInMilliseconds) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown";
this.delegate = delegate;
this.timedOutAt = System.currentTimeMillis() + timeoutInMilliseconds;
}

public ObservableRunnable(@Nonnull String name, @Nonnull Runnable delegate, long timeoutInMilliseconds) {
this.name = name;
this.delegate = delegate;
this.timedOutAt = System.currentTimeMillis() + timeoutInMilliseconds;
}
Expand Down Expand Up @@ -457,13 +537,22 @@ public void run() {
throw e;
}
}

@Override
public String toString() {
return this.name;
}
}

/**
* Wrapper around a {@link Callable} that implements the {@link ObservableTask} interface.
* @param <V> the type of the result
*/
private static class ObservableCallable<V> implements Callable<V>, ObservableTask {
/**
* Name / description of the task.
*/
private final String name;
/**
* Delegate callable that is being wrapped.
*/
Expand All @@ -478,6 +567,14 @@ private static class ObservableCallable<V> implements Callable<V>, ObservableTas
private final CompletableFuture<V> future = new CompletableFuture<>();

public ObservableCallable(@Nonnull Callable<V> delegate, long timeout) {
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown";
this.delegate = delegate;
this.timedOutAt = System.currentTimeMillis() + timeout;
}

public ObservableCallable(@Nonnull String name, @Nonnull Callable<V> delegate, long timeout) {
this.name = name;
this.delegate = delegate;
this.timedOutAt = System.currentTimeMillis() + timeout;
}
Expand Down Expand Up @@ -509,6 +606,11 @@ public V call() throws Exception {
throw e;
}
}

@Override
public String toString() {
return this.name;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ private long purgeFinishedTasks() {
final TaskSimplifiedState taskState = status.simplifiedState();
if (taskState == TaskSimplifiedState.FINISHED || taskState == TaskSimplifiedState.FAILED) {
// if task is finished, remove it from the queue
log.info("Task {} is waiting for precondition for too long, removing it from the queue.", status.taskId());
it.remove();
// if its defense period hasn't perished add it to list, that might end up in the queue again
if (status.finished().isAfter(threshold)) {
Expand Down
Loading

0 comments on commit 13bcd2d

Please sign in to comment.