Skip to content

Commit

Permalink
Add configuration option to set warmupParallelism parameter for con…
Browse files Browse the repository at this point in the history
…nection pool.

[#195]

Signed-off-by: cty123 <[email protected]>
  • Loading branch information
cty123 committed Jan 13, 2024
1 parent 3ee034a commit 6bf9f3b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Mono<Object> resultMono = Mono.usingWhen(pooledConnectionFactory.create(),
| `registerJmx` | Whether to register the pool to JMX.
| `validationDepth` | Validation depth used to validate an R2DBC connection. Defaults to `LOCAL`.
| `validationQuery` | Query that will be executed just before a connection is given to you from the pool to validate that the connection to the database is still alive.
| `warmupParallelism` | The concurrency level used when the allocator is subscribed to during the warmup phase. Default to `1`.

All other properties are driver-specific.

Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,15 @@ private InstrumentedPool<Connection> createConnectionPool(ConnectionPoolConfigur
.idleResourceReuseMruOrder(); // MRU to support eviction of idle

if (maxSize == -1 || initialSize > 0) {
builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize == -1 ? Integer.MAX_VALUE : maxSize);
builder.sizeBetween(
Math.max(configuration.getMinIdle(), initialSize),
maxSize == -1 ? Integer.MAX_VALUE : maxSize,
configuration.getWarmupParallelism());
} else {
builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize);
builder.sizeBetween(
Math.max(configuration.getMinIdle(), initialSize),
maxSize,
configuration.getWarmupParallelism());
}

Duration backgroundEvictionInterval = configuration.getBackgroundEvictionInterval();
Expand Down
35 changes: 33 additions & 2 deletions src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public final class ConnectionPoolConfiguration {
*/
public static final Duration NO_TIMEOUT = Duration.ofMillis(-1);

/**
* Constant indicating the default parallelism used during connection pool warmup.
*/
public static final int DEFAULT_WARMUP_PARALLELISM = 1;

@Nullable
private final Scheduler allocatorSubscribeOn;

Expand Down Expand Up @@ -99,12 +104,14 @@ public final class ConnectionPoolConfiguration {
@Nullable
private final String validationQuery;

private final int warmupParallelism;

private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer<PoolBuilder<Connection, ?
extends PoolConfig<? extends Connection>>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime,
Duration maxLifeTime, Duration maxValidationTime, PoolMetricsRecorder metricsRecorder, @Nullable String name,
@Nullable Function<? super Connection, ? extends Publisher<Void>> postAllocate,
@Nullable Function<? super Connection, ? extends Publisher<Void>> preRelease, boolean registerJmx, ValidationDepth validationDepth,
@Nullable String validationQuery) {
@Nullable String validationQuery, int warmupParallelism) {
this.allocatorSubscribeOn = allocatorSubscribeOn;
this.acquireRetry = acquireRetry;
this.connectionFactory = Assert.requireNonNull(connectionFactory, "ConnectionFactory must not be null");
Expand All @@ -126,6 +133,7 @@ private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, in
this.validationDepth = validationDepth;
this.validationQuery = validationQuery;
this.backgroundEvictionInterval = backgroundEvictionInterval;
this.warmupParallelism = warmupParallelism;
}

/**
Expand Down Expand Up @@ -237,6 +245,10 @@ String getValidationQuery() {
return this.validationQuery;
}

int getWarmupParallelism() {
return this.warmupParallelism;
}

/**
* A builder for {@link ConnectionPoolConfiguration} instances.
* <p>
Expand Down Expand Up @@ -293,6 +305,8 @@ public static final class Builder {

private ValidationDepth validationDepth = ValidationDepth.LOCAL;

private Integer warmupParallelism = DEFAULT_WARMUP_PARALLELISM;

private Builder() {
}

Expand Down Expand Up @@ -583,6 +597,23 @@ public Builder validationQuery(String validationQuery) {
return this;
}

/**
* Configure the concurrency level used when the allocator is subscribed to during the warmup phase.
*
* @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are
* subscribed to at the same time.
* @return this {@link Builder}
* @throws IllegalArgumentException if {@code warmupParallelism} is negative
*/
public Builder warmupParallelism(int warmupParallelism) {
if (warmupParallelism < 0) {
throw new IllegalArgumentException("warmupParallelism must not be negative");
}
this.warmupParallelism = warmupParallelism;
return this;
}

/**
* Returns a configured {@link ConnectionPoolConfiguration}.
*
Expand All @@ -596,7 +627,7 @@ public ConnectionPoolConfiguration build() {
this.clock, this.customizer, this.initialSize, this.maxSize, this.minIdle,
this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime,
this.metricsRecorder, this.name, this.postAllocate, this.preRelease, this.registerJmx,
this.validationDepth, this.validationQuery
this.validationDepth, this.validationQuery, this.warmupParallelism
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public class PoolingConnectionFactoryProvider implements ConnectionFactoryProvid
*/
public static final Option<ValidationDepth> VALIDATION_DEPTH = Option.valueOf("validationDepth");

/**
* WarmupParallelism {@link Option}.
*/
public static final Option<Integer> WARMUP_PARALLELISM = Option.valueOf("warmupParallelism");

private static final String COLON = ":";

/**
Expand Down Expand Up @@ -210,6 +215,7 @@ static ConnectionPoolConfiguration buildConfiguration(ConnectionFactoryOptions c
mapper.from(REGISTER_JMX).as(OptionMapper::toBoolean).to(builder::registerJmx);
mapper.fromExact(VALIDATION_QUERY).to(builder::validationQuery);
mapper.from(VALIDATION_DEPTH).as(validationDepth -> OptionMapper.toEnum(validationDepth, ValidationDepth.class)).to(builder::validationDepth);
mapper.from(WARMUP_PARALLELISM).as(OptionMapper::toInteger).to(builder::warmupParallelism);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void configuration() {
.maxSize(20)
.name("bar")
.registerJmx(true)
.warmupParallelism(99)
.build();

assertThat(configuration)
Expand All @@ -62,7 +63,8 @@ void configuration() {
.hasFieldOrPropertyWithValue("initialSize", 2)
.hasFieldOrPropertyWithValue("maxSize", 20)
.hasFieldOrPropertyWithValue("name", "bar")
.hasFieldOrPropertyWithValue("registerJmx", true);
.hasFieldOrPropertyWithValue("registerJmx", true)
.hasFieldOrPropertyWithValue("warmupParallelism", 99);
}

@Test
Expand All @@ -82,7 +84,8 @@ void configurationDefaults() {
.hasFieldOrPropertyWithValue("maxValidationTime", Duration.ofMillis(-1))
.hasFieldOrPropertyWithValue("initialSize", 10)
.hasFieldOrPropertyWithValue("maxSize", 10)
.hasFieldOrPropertyWithValue("registerJmx", false);
.hasFieldOrPropertyWithValue("registerJmx", false)
.hasFieldOrPropertyWithValue("warmupParallelism", 1);
}

@Test
Expand Down

0 comments on commit 6bf9f3b

Please sign in to comment.