Skip to content

Commit

Permalink
Merge pull request #721 from FgForrest/720-session-killer-might-kill-…
Browse files Browse the repository at this point in the history
…session-with-active-long-running-call

fix(#720): Session killer might kill session with active long running…
  • Loading branch information
novoj authored Oct 25, 2024
2 parents 324f182 + ff74325 commit 7cfad9e
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1138,5 +1138,4 @@ default boolean isTransactionOpen() {
*/
@Nonnull
ProxyFactory getProxyFactory();

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ <S extends Serializable, T extends EvitaResponse<S>> T query(@Nonnull EvitaReque
*/
void execute(@Nonnull Consumer<EvitaSessionContract> logic) throws TransactionException;

/**
* Returns true if there is active method invocation in place. When method is running, it is not possible to
* kill session due to inactivity.
*
* @return true if there is active method invocation in place
*/
boolean methodIsRunning();

/**
* Retrieves a CompletableFuture that represents the finalization status of a session. If the catalog is in
* transactional mode, the future will respect the requested {@link CommitBehavior} bound to the current transaction.
Expand Down
17 changes: 11 additions & 6 deletions evita_engine/src/main/java/io/evitadb/core/EvitaSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -1218,14 +1218,14 @@ public Task<?, FileForFetch> backupCatalog(@Nullable OffsetDateTime pastMoment,
@Nonnull
@Override
public Optional<UUID> getOpenedTransactionId() {
return ofNullable(transactionAccessor.get())
return ofNullable(this.transactionAccessor.get())
.filter(it -> !it.isClosed())
.map(Transaction::getTransactionId);
}

@Override
public boolean isRollbackOnly() {
return ofNullable(transactionAccessor.get())
return ofNullable(this.transactionAccessor.get())
.map(Transaction::isRollbackOnly)
.orElse(false);
}
Expand All @@ -1243,22 +1243,22 @@ public void setRollbackOnly() {

@Override
public boolean isReadOnly() {
return !sessionTraits.isReadWrite();
return !this.sessionTraits.isReadWrite();
}

@Override
public boolean isBinaryFormat() {
return sessionTraits.isBinary();
return this.sessionTraits.isBinary();
}

@Override
public boolean isDryRun() {
return sessionTraits.isDryRun();
return this.sessionTraits.isDryRun();
}

@Override
public long getInactivityDurationInSeconds() {
return (System.currentTimeMillis() - lastCall) / 1000;
return (System.currentTimeMillis() - this.lastCall) / 1000;
}

@Interruptible
Expand Down Expand Up @@ -1335,6 +1335,11 @@ public void execute(@Nonnull Consumer<EvitaSessionContract> logic) {
);
}

@Override
public boolean methodIsRunning() {
return false;
}

/**
* Retrieves a CompletableFuture that represents the finalization status of a session. If the catalog is in
* transactional mode, the future will respect the requested {@link CommitBehavior} bound to the current transaction.
Expand Down
23 changes: 23 additions & 0 deletions evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -259,9 +260,22 @@ private interface EvitaProxyFinalization {
* supports JDK proxies out-of-the-box so this shouldn't be a problem in the future.
*/
private static class EvitaSessionProxy implements InvocationHandler {
private final static Method IS_METHOD_RUNNING;
private final static Method INACTIVITY_IN_SECONDS;
private final EvitaSession evitaSession;
private final TracingContext tracingContext;
@Getter private final ClosedEvent sessionClosedEvent;
private final AtomicInteger insideInvocation = new AtomicInteger(0);
private final AtomicLong lastCall = new AtomicLong(System.currentTimeMillis());

static {
try {
IS_METHOD_RUNNING = EvitaInternalSessionContract.class.getMethod("methodIsRunning");
INACTIVITY_IN_SECONDS = EvitaInternalSessionContract.class.getMethod("getInactivityDurationInSeconds");
} catch (NoSuchMethodException ex) {
throw new GenericEvitaInternalError("Method not found.", ex);
}
}

/**
* Handles arguments printing.
Expand Down Expand Up @@ -311,6 +325,10 @@ public Object invoke(Object proxy, Method method, Object[] args) {
.finish((OffsetDateTime) args[0], (int) args[1])
.commit();
return null;
} else if (method.equals(INACTIVITY_IN_SECONDS)) {
return (System.currentTimeMillis() - this.lastCall.get()) / 1000;
} else if (method.equals(IS_METHOD_RUNNING)) {
return this.insideInvocation.get() > 0;
} else {
try {
this.evitaSession.increaseNestLevel();
Expand All @@ -320,6 +338,8 @@ public Object invoke(Object proxy, Method method, Object[] args) {
() -> {
final Supplier<Object> invocation = () -> {
try {
this.insideInvocation.incrementAndGet();
this.lastCall.set(System.currentTimeMillis());
return method.invoke(evitaSession, args);
} catch (InvocationTargetException ex) {
// handle the error
Expand Down Expand Up @@ -369,6 +389,9 @@ public Object invoke(Object proxy, Method method, Object[] args) {
"Unexpected system error occurred.",
ex
);
} finally {
this.insideInvocation.decrementAndGet();
this.lastCall.set(System.currentTimeMillis());
}
};
if (method.isAnnotationPresent(RepresentsQuery.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.evitadb.api.configuration.ServerOptions;
import io.evitadb.api.exception.InstanceTerminatedException;
import io.evitadb.core.Evita;
import io.evitadb.core.EvitaInternalSessionContract;
import io.evitadb.core.metric.event.session.KilledEvent;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -76,8 +77,13 @@ public SessionKiller(int allowedInactivityInSeconds, @Nonnull Evita evita, @Nonn
public void run() {
try {
final AtomicInteger counter = new AtomicInteger(0);
evita.getActiveSessions()
.filter(session -> session.getInactivityDurationInSeconds() >= allowedInactivityInSeconds)
this.evita.getActiveSessions()
.map(EvitaInternalSessionContract.class::cast)
.filter(session -> {
final boolean sessionOld = session.getInactivityDurationInSeconds() >= this.allowedInactivityInSeconds;
final boolean methodRunning = session.methodIsRunning();
return sessionOld && !methodRunning;
})
.forEach(session -> {
try {
final String catalogName = session.getCatalogName();
Expand All @@ -91,6 +97,7 @@ public void run() {
evita.terminateSession(session);
counter.incrementAndGet();

log.info("Killed session " + session.getId() + " (" + this.allowedInactivityInSeconds + "s of inactivity).");
// emit the event
new KilledEvent(catalogName).commit();
} catch (InstanceTerminatedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ void shouldKillInactiveSessionsAutomatically() {
final long start = System.currentTimeMillis();
do {
assertNotNull(sessionActive.getCatalogSchema());
} while (!(System.currentTimeMillis() - start > 5000 || !sessionInactive.isActive()));
} while (!(System.currentTimeMillis() - start > 2000));

assertFalse(sessionInactive.isActive());
assertTrue(sessionActive.isActive());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
*
* _ _ ____ ____
* _____ _(_) |_ __ _| _ \| __ )
* / _ \ \ / / | __/ _` | | | | _ \
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2024
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/FgForrest/evitaDB/blob/master/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.evitadb.core.async;

import io.evitadb.api.EvitaSessionContract;
import io.evitadb.api.configuration.EvitaConfiguration;
import io.evitadb.api.configuration.ServerOptions;
import io.evitadb.api.configuration.StorageOptions;
import io.evitadb.api.exception.CollectionNotFoundException;
import io.evitadb.api.query.Query;
import io.evitadb.api.query.QueryConstraints;
import io.evitadb.api.requestResponse.data.structure.EntityReference;
import io.evitadb.core.Evita;
import io.evitadb.test.EvitaTestSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static graphql.Assert.assertFalse;
import static graphql.Assert.assertNotNull;
import static graphql.Assert.assertTrue;
import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST;

/**
* This test verifies the correct functionality of the {@link SessionKiller} class.
*
* @author Jan Novotný ([email protected]), FG Forrest a.s. (c) 2024
*/
@DisplayName("Session killer functionality")
@Tag(LONG_RUNNING_TEST)
class SessionKillerTest implements EvitaTestSupport {
public static final String SUB_DIRECTORY = "SessionKillerTest";
private Evita evita;
private SessionKiller sessionKiller;

@BeforeEach
void setUp() throws IOException, NoSuchFieldException, IllegalAccessException {
cleanTestSubDirectory(SUB_DIRECTORY);
this.evita = new Evita(
EvitaConfiguration.builder()
.storage(
StorageOptions.builder()
.storageDirectory(getTestDirectory().resolve(SUB_DIRECTORY))
.build()
)
.server(
ServerOptions.builder()
.closeSessionsAfterSecondsOfInactivity(1)
.build()
)
.build()
);
final Field sessionKillerField = Evita.class.getDeclaredField("sessionKiller");
sessionKillerField.setAccessible(true);
this.sessionKiller = (SessionKiller) sessionKillerField.get(this.evita);
}

@Test
void shouldKillSessionAfterIntervalOfInactivity() throws InterruptedException {
this.evita.defineCatalog("test");
final EvitaSessionContract session = this.evita.createReadOnlySession("test");
synchronized (this.evita) {
this.evita.wait(2000);
}
this.sessionKiller.run();
assertFalse(session.isActive());
}

@Test
void shouldNotKillSessionWhenThereAreInvocations() {
this.evita.defineCatalog("test");
final EvitaSessionContract session = this.evita.createReadOnlySession("test");
final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000) {
assertNotNull(session.getCatalogName());
Thread.onSpinWait();
}
this.sessionKiller.run();
assertTrue(session.isActive());
}

@Test
void shouldNotKillSessionWhenThereIsLongLastingInvocationCallActive() throws InterruptedException {
final AtomicBoolean finishMethodCall = new AtomicBoolean(false);
try {
this.evita.defineCatalog("test");
final EvitaSessionContract session = this.evita.createReadOnlySession("test");
final Runnable asyncCall = () -> {
final Query query = Mockito.mock(Query.class);
Mockito.when(query.normalizeQuery()).thenAnswer(invocation -> {
do {
Thread.onSpinWait();
} while (!finishMethodCall.get());
return Query.query(QueryConstraints.collection("unknownEntity"));
});
try {
session.query(
query, EntityReference.class
);
} catch (CollectionNotFoundException e) {
// expected
System.out.println("Async call finished");
}
};
final CompletableFuture<Void> future = CompletableFuture.runAsync(asyncCall);
synchronized (this.evita) {
this.evita.wait(2000);
}

this.sessionKiller.run();

System.out.println("Finishing async call");
finishMethodCall.set(true);
future.join();

assertTrue(session.isActive());

System.out.println("Waiting for session killer to finish");
synchronized (this.evita) {
this.evita.wait(2000);
}

this.sessionKiller.run();
assertFalse(session.isActive());
} finally {
finishMethodCall.set(true);
}
}
}

0 comments on commit 7cfad9e

Please sign in to comment.