Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23747 Support timeouts for RO transactions #4902

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
* Ignite transaction options.
*/
public class TransactionOptions {
/** Transaction timeout. */
/** Transaction timeout. 0 means 'use default timeout'. */
private long timeoutMillis = 0;

/** Read-only transaction. */
private boolean readOnly = false;

/**
* Returns transaction timeout, in milliseconds.
* Returns transaction timeout, in milliseconds. 0 means 'use default timeout'.
*
* @return Transaction timeout, in milliseconds.
*/
Expand All @@ -39,10 +39,14 @@ public long timeoutMillis() {
/**
* Sets transaction timeout, in milliseconds.
*
* @param timeoutMillis Transaction timeout, in milliseconds.
* @param timeoutMillis Transaction timeout, in milliseconds. Cannot be negative; 0 means 'use default timeout'.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @return {@code this} for chaining.
*/
public TransactionOptions timeoutMillis(long timeoutMillis) {
if (timeoutMillis < 0) {
throw new IllegalArgumentException("Negative timeoutMillis: " + timeoutMillis);
}

this.timeoutMillis = timeoutMillis;

return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.tx;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

class TransactionOptionsTest {
@Test
void readOnlyIsFalseByDefault() {
assertThat(new TransactionOptions().readOnly(), is(false));
}

@Test
void readOnlyStatusIsSet() {
var options = new TransactionOptions();

options.readOnly(true);

assertThat(options.readOnly(), is(true));
}

@Test
void readOnlySetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.readOnly(true);

assertSame(options, afterSetting);
}

@Test
void timeoutIsZeroByDefault() {
assertThat(new TransactionOptions().timeoutMillis(), is(0L));
}

@Test
void timeoutIsSet() {
var options = new TransactionOptions();

options.timeoutMillis(3333);

assertThat(options.timeoutMillis(), is(3333L));
}

@Test
void timeoutSetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.timeoutMillis(3333);

assertSame(options, afterSetting);
}

@Test
void positiveTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void zeroTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void negativeTimeoutIsRejected() {
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new TransactionOptions().timeoutMillis(-1));

assertThat(ex.getMessage(), is("Negative timeoutMillis: -1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -130,7 +129,7 @@ public class ClientHandlerModule implements IgniteComponent {

@TestOnly
@SuppressWarnings("unused")
private volatile ChannelHandler handler;
private volatile ClientInboundMessageHandler handler;

/**
* Constructor.
Expand Down Expand Up @@ -395,4 +394,9 @@ private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorV
partitionOperationsExecutor
);
}

@TestOnly
public ClientInboundMessageHandler handler() {
return handler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Handles messages from thin clients.
Expand Down Expand Up @@ -1091,4 +1092,9 @@ private static Set<AuthenticationEvent> authenticationEventsToSubscribe() {
AuthenticationEvent.USER_REMOVED
);
}

@TestOnly
public ClientResourceRegistry resources() {
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ public class ClientTransactionBeginRequest {
IgniteTransactionsImpl transactions,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics) throws IgniteInternalCheckedException {
TransactionOptions options = null;
TransactionOptions options = new TransactionOptions();
HybridTimestamp observableTs = null;

boolean readOnly = in.unpackBoolean();
if (readOnly) {
options = new TransactionOptions().readOnly(true);
options.readOnly(readOnly);

long timeoutMillis = in.unpackLong();
options.timeoutMillis(timeoutMillis);

if (readOnly) {
// Timestamp makes sense only for read-only transactions.
observableTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ private synchronized CompletableFuture<ClientTransaction> ensureStarted(
return tx0;
}

ClientTransaction startedTx() {
/**
* Returns actual {@link ClientTransaction} started by this transaction or throws an exception if no transaction was started yet.
*/
public ClientTransaction startedTx() {
var tx0 = tx;

assert tx0 != null : "Transaction is not started";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ static CompletableFuture<ClientTransaction> beginAsync(
@Nullable String preferredNodeName,
@Nullable TransactionOptions options,
long observableTimestamp) {
if (options != null && options.timeoutMillis() != 0) {
if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) {
// TODO: IGNITE-16193
throw new UnsupportedOperationException("Timeouts are not supported yet");
throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions");
}

boolean readOnly = options != null && options.readOnly();
Expand All @@ -73,6 +73,7 @@ static CompletableFuture<ClientTransaction> beginAsync(
ClientOp.TX_BEGIN,
w -> {
w.out().packBoolean(readOnly);
w.out().packLong(options == null ? 0 : options.timeoutMillis());
w.out().packLong(observableTimestamp);
},
r -> readTx(r, readOnly),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.network.ClusterNode;
Expand Down Expand Up @@ -63,17 +63,16 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
}

@Override
public InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit) {
return begin(tracker, implicit, false);
public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) {
return begin(timestampTracker, true, readOnly, InternalTxOptions.defaults());
}

@Override
public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit, boolean readOnly) {
return begin(timestampTracker, implicit, readOnly, TxPriority.NORMAL);
public InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
return begin(timestampTracker, false, readOnly, txOptions);
}

@Override
public InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit, boolean readOnly, TxPriority priority) {
private InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit, boolean readOnly, InternalTxOptions options) {
return new InternalTransaction() {
private final UUID id = UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.testframework;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.NamedThreadFactory;

/**
* Provides access to a single-thread scheduler for light-weight non-blocking operations. It doesn't need to be stopped.
*/
public class CommonTestScheduler {
sashapolo marked this conversation as resolved.
Show resolved Hide resolved
private static final IgniteLogger LOG = Loggers.forClass(CommonTestScheduler.class);

private static final ScheduledExecutorService INSTANCE = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("test-common-scheduler-", true, LOG)
);

/**
* Returns a single-thread scheduler for light-weight non-blocking operations. It doesn't need to be shut down.
*/
public static ScheduledExecutorService instance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
import org.apache.ignite.internal.schema.configuration.GcExtensionConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfigurationSchema;
Expand Down Expand Up @@ -249,6 +250,9 @@ public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest {
@InjectConfiguration
private static TransactionConfiguration txConfiguration;

@InjectConfiguration
private static LowWatermarkConfiguration lowWatermarkConfiguration;

@InjectConfiguration
private static RaftConfiguration raftConfiguration;

Expand Down Expand Up @@ -1226,6 +1230,7 @@ public CompletableFuture<Boolean> invoke(

txManager = new TxManagerImpl(
txConfiguration,
lowWatermarkConfiguration,
clusterService,
replicaSvc,
lockManager,
Expand All @@ -1236,7 +1241,8 @@ public CompletableFuture<Boolean> invoke(
new TestLocalRwTxCounter(),
resourcesRegistry,
transactionInflights,
lowWatermark
lowWatermark,
threadPoolsManager.commonScheduler()
);

replicaManager = new ReplicaManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class transactions_impl {
IGNITE_API void begin_async(ignite_callback<transaction> callback) {
auto writer_func = [this](protocol::writer &writer) {
writer.write_bool(false); // readOnly.
writer.write((std::int64_t) 0); // timeoutMillis.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
writer.write((std::int64_t) 0); // timeoutMillis.
writer.write<std::int64_t>(0); // timeoutMillis.

writer.write(m_connection->get_observable_timestamp());
};

Expand Down
1 change: 1 addition & 0 deletions modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ protected override void Handle(Socket handler, CancellationToken cancellationTok

case ClientOp.TxBegin:
reader.Skip(); // Read only.
reader.Skip(); // TimeoutMillis.
LastClientObservableTimestamp = reader.ReadInt64();

Send(handler, requestId, new byte[] { 0 }.AsMemory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ void Write()
{
var w = writer.MessageWriter;
w.Write(_options.ReadOnly);
w.Write(_options.TimeoutMillis);
w.Write(failoverSocket.ObservableTimestamp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ namespace Apache.Ignite.Transactions;
/// Read-only transactions provide a snapshot view of data at a certain point in time.
/// They are lock-free and perform better than normal transactions, but do not permit data modifications.
/// </param>
public readonly record struct TransactionOptions(bool ReadOnly);
/// <param name="TimeoutMillis">
/// Transaction timeout. 0 means 'use default timeout'.
/// </param>
public readonly record struct TransactionOptions(bool ReadOnly, long TimeoutMillis = 0);
Loading