Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23305 Get rid of client HybridTimestampTracker #4929

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
import org.mockito.Mockito;
Expand Down Expand Up @@ -125,7 +125,7 @@ ClientHandlerModule start(TestInfo testInfo) {
var module = new ClientHandlerModule(
mock(QueryProcessor.class),
mock(IgniteTablesInternal.class),
mock(IgniteTransactionsImpl.class),
mock(TxManager.class),
mock(IgniteComputeInternal.class),
clusterService,
bootstrapFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
Expand All @@ -82,8 +82,8 @@ public class ClientHandlerModule implements IgniteComponent {
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;

/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** Transaction manager. */
private final TxManager txManager;

/** Cluster ID supplier. */
private final Supplier<ClusterInfo> clusterInfoSupplier;
Expand Down Expand Up @@ -137,7 +137,7 @@ public class ClientHandlerModule implements IgniteComponent {
*
* @param queryProcessor Sql query processor.
* @param igniteTables Ignite.
* @param igniteTransactions Transactions.
* @param txManager Transaction manager.
* @param igniteCompute Compute.
* @param clusterService Cluster.
* @param bootstrapFactory Bootstrap factory.
Expand All @@ -152,7 +152,7 @@ public class ClientHandlerModule implements IgniteComponent {
public ClientHandlerModule(
QueryProcessor queryProcessor,
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
TxManager txManager,
IgniteComputeInternal igniteCompute,
ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory,
Expand All @@ -170,6 +170,7 @@ public ClientHandlerModule(
) {
assert igniteTables != null;
assert queryProcessor != null;
assert txManager != null;
assert igniteCompute != null;
assert clusterService != null;
assert bootstrapFactory != null;
Expand All @@ -187,7 +188,7 @@ public ClientHandlerModule(

this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.txManager = txManager;
this.igniteCompute = igniteCompute;
this.clusterService = clusterService;
this.bootstrapFactory = bootstrapFactory;
Expand Down Expand Up @@ -379,7 +380,7 @@ protected void initChannel(Channel ch) {
private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorView configuration, long connectionId) {
return new ClientInboundMessageHandler(
igniteTables,
igniteTransactions,
txManager,
queryProcessor,
configuration,
igniteCompute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand All @@ -131,7 +130,7 @@
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TraceableException;
Expand Down Expand Up @@ -160,11 +159,11 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;

/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** Transaction manager. */
private final TxManager txManager;

/** JDBC Handler. */
private final JdbcQueryEventHandler jdbcQueryEventHandler;
private final JdbcQueryEventHandlerImpl jdbcQueryEventHandler;

/** Connection resources. */
private final ClientResourceRegistry resources = new ClientResourceRegistry();
Expand Down Expand Up @@ -219,7 +218,6 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
* Constructor.
*
* @param igniteTables Ignite tables API entry point.
* @param igniteTransactions Ignite transactions API.
* @param processor Sql query processor.
* @param configuration Configuration.
* @param compute Compute.
Expand All @@ -231,7 +229,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
*/
public ClientInboundMessageHandler(
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
TxManager txManager,
QueryProcessor processor,
ClientConnectorView configuration,
IgniteComputeInternal compute,
Expand All @@ -247,7 +245,7 @@ public ClientInboundMessageHandler(
Executor partitionOperationsExecutor
) {
assert igniteTables != null;
assert igniteTransactions != null;
assert txManager != null;
assert processor != null;
assert configuration != null;
assert compute != null;
Expand All @@ -262,7 +260,7 @@ public ClientInboundMessageHandler(
assert partitionOperationsExecutor != null;

this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.txManager = txManager;
this.configuration = configuration;
this.compute = compute;
this.clusterService = clusterService;
Expand All @@ -279,7 +277,7 @@ public ClientInboundMessageHandler(
processor,
new JdbcMetadataCatalog(clockService, schemaSyncService, catalogService),
resources,
igniteTransactions
txManager
);

schemaVersions = new SchemaVersionsImpl(schemaSyncService, catalogService, clockService);
Expand Down Expand Up @@ -653,7 +651,9 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return null;

case ClientOp.TABLES_GET:
return ClientTablesGetRequest.process(out, igniteTables);
return ClientTablesGetRequest.process(out, igniteTables).thenRun(() -> {
out.meta(clockService.current());
});

case ClientOp.SCHEMAS_GET:
return ClientSchemasGetRequest.process(in, out, igniteTables, schemaVersions);
Expand All @@ -662,67 +662,81 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientTableGetRequest.process(in, out, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources);
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleInsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources);
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
return ClientTupleContainsAllKeysRequest.process(in, out, igniteTables, resources);
return ClientTupleContainsAllKeysRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.JDBC_CONNECT:
// TODO: IGNITE-24053 JDBC request ought to contain the client observation timestamp.
jdbcQueryEventHandler.getTimestampTracker().update(clockService.current());
out.meta(jdbcQueryEventHandler.getTimestampTracker().get());

return ClientJdbcConnectRequest.execute(in, out, jdbcQueryEventHandler);

case ClientOp.JDBC_EXEC:
return ClientJdbcExecuteRequest.execute(in, out, jdbcQueryEventHandler);
return ClientJdbcExecuteRequest.execute(in, out, jdbcQueryEventHandler).thenRun(() -> {
// TODO: IGNITE-24055 Observation timestamp must be updated only for DDL "CREATE TABLE..."
if (!(out.meta() instanceof HybridTimestamp)) {
out.meta(clockService.current());
}
});

case ClientOp.JDBC_CANCEL:
return ClientJdbcCancelRequest.execute(in, out, jdbcQueryEventHandler);

case ClientOp.JDBC_EXEC_BATCH:
return ClientJdbcExecuteBatchRequest.process(in, out, jdbcQueryEventHandler);
return ClientJdbcExecuteBatchRequest.process(in, out, jdbcQueryEventHandler).thenRun(() -> {
// TODO: IGNITE-24055 Observation timestamp must be updated only for DDL "CREATE TABLE..."
if (!(out.meta() instanceof HybridTimestamp)) {
out.meta(clockService.current());
}
});

case ClientOp.JDBC_SQL_EXEC_PS_BATCH:
return ClientJdbcPreparedStmntBatchRequest.process(in, out, jdbcQueryEventHandler);
Expand All @@ -743,16 +757,18 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientJdbcColumnMetadataRequest.process(in, out, jdbcQueryEventHandler);

case ClientOp.JDBC_SCHEMAS_META:
return ClientJdbcSchemasMetadataRequest.process(in, out, jdbcQueryEventHandler);
return ClientJdbcSchemasMetadataRequest.process(in, out, jdbcQueryEventHandler).thenRun(() -> {
out.meta(clockService.current());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to update timestamp when requesting metadata?
why most time we updating it conditionally?

if (!(out.meta() instanceof HybridTimestamp))

});

case ClientOp.JDBC_PK_META:
return ClientJdbcPrimaryKeyMetadataRequest.process(in, out, jdbcQueryEventHandler);

case ClientOp.TX_BEGIN:
return ClientTransactionBeginRequest.process(in, out, igniteTransactions, resources, metrics);
return ClientTransactionBeginRequest.process(in, out, txManager, resources, metrics);

case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources, metrics);
return ClientTransactionCommitRequest.process(in, out, resources, metrics, clockService);

case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics);
Expand Down Expand Up @@ -786,13 +802,18 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientClusterGetNodesRequest.process(out, clusterService);

case ClientOp.SQL_EXEC:
return ClientSqlExecuteRequest.process(in, out, queryProcessor, resources, metrics, igniteTransactions);
return ClientSqlExecuteRequest.process(in, out, queryProcessor, resources, metrics).thenRun(() -> {
// TODO: IGNITE-24055 Observation timestamp must be updated only for DDL "CREATE TABLE..."
if (!(out.meta() instanceof HybridTimestamp)) {
out.meta(clockService.current());
}
});

case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientSqlCursorNextPageRequest.process(in, out, resources, igniteTransactions);
return ClientSqlCursorNextPageRequest.process(in, out, resources);

case ClientOp.SQL_CURSOR_CLOSE:
return ClientSqlCursorCloseRequest.process(in, out, resources, igniteTransactions);
return ClientSqlCursorCloseRequest.process(in, out, resources);

case ClientOp.PARTITION_ASSIGNMENT_GET:
return ClientTablePartitionPrimaryReplicasGetRequest.process(in, out, primaryReplicaTracker);
Expand All @@ -801,13 +822,23 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientJdbcFinishTxRequest.process(in, out, jdbcQueryEventHandler);

case ClientOp.SQL_EXEC_SCRIPT:
return ClientSqlExecuteScriptRequest.process(in, queryProcessor, igniteTransactions);
return ClientSqlExecuteScriptRequest.process(in, queryProcessor).thenRun(() -> {
// TODO: IGNITE-24055 Observation timestamp must be updated only for DDL "CREATE TABLE..."
if (!(out.meta() instanceof HybridTimestamp)) {
out.meta(clockService.current());
}
});

case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(in, out, queryProcessor, resources);

case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(in, out, queryProcessor, resources, igniteTransactions);
return ClientSqlExecuteBatchRequest.process(in, out, queryProcessor, resources).thenRun(() -> {
// TODO: IGNITE-24055 Observation timestamp must be updated only for DDL "CREATE TABLE..."
if (!(out.meta() instanceof HybridTimestamp)) {
out.meta(clockService.current());
}
});

case ClientOp.STREAMER_BATCH_SEND:
return ClientStreamerBatchSendRequest.process(in, out, igniteTables);
Expand Down Expand Up @@ -993,9 +1024,11 @@ private long observableTimestamp(@Nullable ClientMessagePacker out) {
if (meta instanceof HybridTimestamp) {
return ((HybridTimestamp) meta).longValue();
}
} else {
return clockService.currentLong();
}

return clockService.currentLong();
return HybridTimestamp.MIN_VALUE.longValue();
}

private void sendNotification(long requestId, @Nullable Consumer<ClientMessagePacker> writer, @Nullable Throwable err) {
Expand Down
Loading