From bbd464ca1b59fbeb352cf454bbd699179a6cfbb8 Mon Sep 17 00:00:00 2001 From: Anton Zukovskij Date: Tue, 14 May 2024 17:51:04 +0300 Subject: [PATCH 1/2] added UT to reproduce duplicate connection issue --- .../r2dbc/pool/ConnectionPoolUnitTests.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java index fdddae1..27c34bd 100644 --- a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java +++ b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java @@ -31,6 +31,7 @@ import org.reactivestreams.Subscription; import org.springframework.util.ReflectionUtils; import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -47,8 +48,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -60,8 +63,10 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -1119,6 +1124,38 @@ public void cancel() { assertThat(closed).isTrue(); } + + @Test + void shouldNotDuplicateObjectsDuringCancel() { + Set uniqueConnections = ConcurrentHashMap.newKeySet(); + ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); + + when(connectionFactoryMock.create()).thenReturn((Publisher)Mono.fromCallable(() -> { + ConnectionWithLifecycle connectionMock = mock(ConnectionWithLifecycle.class); + when(connectionMock.validate(any())).thenReturn(Mono.just(true)); + when(connectionMock.postAllocate()).thenReturn(Mono.empty()); + when(connectionMock.preRelease()).thenReturn(Mono.fromRunnable(() -> + uniqueConnections.remove(connectionMock)) + ); + return connectionMock; + })); + + ConnectionPool pool = new ConnectionPool(ConnectionPoolConfiguration.builder(connectionFactoryMock) + .build()); + CompletableFuture future = Flux.range(0, 32) + .flatMap(i -> Mono.fromCallable(() -> true) + .then(Mono.defer(() -> Mono.usingWhen(Flux.defer(pool::create), + c -> { + assertFalse(!uniqueConnections.add(((PooledConnection)c).unwrap()), + "duplicate connections returned from pool"); + return Mono.delay(Duration.ofMillis(1)); + }, + Connection::close)))) + .ignoreElements() + .toFuture(); + + assertThatNoException().isThrownBy(future::join); + } interface ConnectionWithLifecycle extends Connection, Lifecycle { From 7fcfc4c118bfd5e5a872a1d895204226acbc3c8a Mon Sep 17 00:00:00 2001 From: Anton Zukovskij Date: Tue, 14 May 2024 17:56:40 +0300 Subject: [PATCH 2/2] fix connection returned to pool prematurely --- src/main/java/io/r2dbc/pool/ConnectionPool.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/r2dbc/pool/ConnectionPool.java b/src/main/java/io/r2dbc/pool/ConnectionPool.java index 5c57f9c..85edcda 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPool.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPool.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; @@ -110,7 +111,7 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { Function> allocateValidation = getValidationFunction(configuration); Mono create = Mono.defer(() -> { - + AtomicBoolean emitting = new AtomicBoolean(true); Mono mono = this.connectionPool.acquire() .flatMap(ref -> { @@ -140,7 +141,10 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable))); return Operators.discardOnCancel(conn, () -> { - ref.release().subscribe(); + if (emitting.compareAndSet(true, false)) { + logger.debug("Discarding pooled reference as creation was disposed"); + ref.release().subscribe(); + } return false; }); }) @@ -167,7 +171,8 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { return context.put(HOOK_ON_DROPPED, onNextDropped); }).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e)); } - return mono; + return mono + .doOnTerminate(() -> emitting.compareAndSet(true, false)); }); this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create; }