Skip to content

Commit

Permalink
IGNITE-23305 Get rid of client HybridTimestampTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
vldpyatkov committed Dec 18, 2024
1 parent e51581e commit d60b660
Show file tree
Hide file tree
Showing 57 changed files with 481 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
import org.mockito.Mockito;
Expand Down Expand Up @@ -125,7 +125,7 @@ ClientHandlerModule start(TestInfo testInfo) {
var module = new ClientHandlerModule(
mock(QueryProcessor.class),
mock(IgniteTablesInternal.class),
mock(IgniteTransactionsImpl.class),
mock(TxManager.class),
mock(IgniteComputeInternal.class),
clusterService,
bootstrapFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
Expand All @@ -82,8 +82,8 @@ public class ClientHandlerModule implements IgniteComponent {
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;

/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** Transaction manager. */
private final TxManager txManager;

/** Cluster ID supplier. */
private final Supplier<ClusterInfo> clusterInfoSupplier;
Expand Down Expand Up @@ -137,7 +137,7 @@ public class ClientHandlerModule implements IgniteComponent {
*
* @param queryProcessor Sql query processor.
* @param igniteTables Ignite.
* @param igniteTransactions Transactions.
* @param txManager Transaction manager.
* @param igniteCompute Compute.
* @param clusterService Cluster.
* @param bootstrapFactory Bootstrap factory.
Expand All @@ -152,7 +152,7 @@ public class ClientHandlerModule implements IgniteComponent {
public ClientHandlerModule(
QueryProcessor queryProcessor,
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
TxManager txManager,
IgniteComputeInternal igniteCompute,
ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory,
Expand All @@ -170,6 +170,7 @@ public ClientHandlerModule(
) {
assert igniteTables != null;
assert queryProcessor != null;
assert txManager != null;
assert igniteCompute != null;
assert clusterService != null;
assert bootstrapFactory != null;
Expand All @@ -187,7 +188,7 @@ public ClientHandlerModule(

this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.txManager = txManager;
this.igniteCompute = igniteCompute;
this.clusterService = clusterService;
this.bootstrapFactory = bootstrapFactory;
Expand Down Expand Up @@ -379,7 +380,7 @@ protected void initChannel(Channel ch) {
private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorView configuration, long connectionId) {
return new ClientInboundMessageHandler(
igniteTables,
igniteTransactions,
txManager,
queryProcessor,
configuration,
igniteCompute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TraceableException;
Expand Down Expand Up @@ -160,8 +160,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
/** Ignite tables API. */
private final IgniteTablesInternal igniteTables;

/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
/** Transaction manager. */
private final TxManager txManager;

/** JDBC Handler. */
private final JdbcQueryEventHandler jdbcQueryEventHandler;
Expand Down Expand Up @@ -219,7 +219,6 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
* Constructor.
*
* @param igniteTables Ignite tables API entry point.
* @param igniteTransactions Ignite transactions API.
* @param processor Sql query processor.
* @param configuration Configuration.
* @param compute Compute.
Expand All @@ -231,7 +230,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
*/
public ClientInboundMessageHandler(
IgniteTablesInternal igniteTables,
IgniteTransactionsImpl igniteTransactions,
TxManager txManager,
QueryProcessor processor,
ClientConnectorView configuration,
IgniteComputeInternal compute,
Expand All @@ -247,7 +246,7 @@ public ClientInboundMessageHandler(
Executor partitionOperationsExecutor
) {
assert igniteTables != null;
assert igniteTransactions != null;
assert txManager != null;
assert processor != null;
assert configuration != null;
assert compute != null;
Expand All @@ -262,7 +261,7 @@ public ClientInboundMessageHandler(
assert partitionOperationsExecutor != null;

this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.txManager = txManager;
this.configuration = configuration;
this.compute = compute;
this.clusterService = clusterService;
Expand All @@ -279,7 +278,7 @@ public ClientInboundMessageHandler(
processor,
new JdbcMetadataCatalog(clockService, schemaSyncService, catalogService),
resources,
igniteTransactions
txManager
);

schemaVersions = new SchemaVersionsImpl(schemaSyncService, catalogService, clockService);
Expand Down Expand Up @@ -662,70 +661,70 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientTableGetRequest.process(in, out, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources);
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleInsertRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources);
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
return ClientTupleContainsAllKeysRequest.process(in, out, igniteTables, resources);
return ClientTupleContainsAllKeysRequest.process(in, out, igniteTables, resources, txManager);

case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in, out, jdbcQueryEventHandler);

case ClientOp.JDBC_EXEC:
return ClientJdbcExecuteRequest.execute(in, out, jdbcQueryEventHandler);
return ClientJdbcExecuteRequest.execute(in, out, jdbcQueryEventHandler, clockService);

case ClientOp.JDBC_CANCEL:
return ClientJdbcCancelRequest.execute(in, out, jdbcQueryEventHandler);

case ClientOp.JDBC_EXEC_BATCH:
return ClientJdbcExecuteBatchRequest.process(in, out, jdbcQueryEventHandler);
return ClientJdbcExecuteBatchRequest.process(in, out, jdbcQueryEventHandler, clockService);

case ClientOp.JDBC_SQL_EXEC_PS_BATCH:
return ClientJdbcPreparedStmntBatchRequest.process(in, out, jdbcQueryEventHandler);
return ClientJdbcPreparedStmntBatchRequest.process(in, out, jdbcQueryEventHandler, clockService);

case ClientOp.JDBC_NEXT:
return ClientJdbcFetchRequest.process(in, out, jdbcQueryCursorHandler);
Expand All @@ -749,10 +748,10 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientJdbcPrimaryKeyMetadataRequest.process(in, out, jdbcQueryEventHandler);

case ClientOp.TX_BEGIN:
return ClientTransactionBeginRequest.process(in, out, igniteTransactions, resources, metrics);
return ClientTransactionBeginRequest.process(in, out, txManager, resources, metrics);

case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources, metrics);
return ClientTransactionCommitRequest.process(in, out, resources, metrics, clockService);

case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics);
Expand Down Expand Up @@ -786,28 +785,28 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientClusterGetNodesRequest.process(out, clusterService);

case ClientOp.SQL_EXEC:
return ClientSqlExecuteRequest.process(in, out, queryProcessor, resources, metrics, igniteTransactions);
return ClientSqlExecuteRequest.process(in, out, queryProcessor, resources, metrics);

case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientSqlCursorNextPageRequest.process(in, out, resources, igniteTransactions);
return ClientSqlCursorNextPageRequest.process(in, out, resources);

case ClientOp.SQL_CURSOR_CLOSE:
return ClientSqlCursorCloseRequest.process(in, out, resources, igniteTransactions);
return ClientSqlCursorCloseRequest.process(in, out, resources);

case ClientOp.PARTITION_ASSIGNMENT_GET:
return ClientTablePartitionPrimaryReplicasGetRequest.process(in, out, primaryReplicaTracker);

case ClientOp.JDBC_TX_FINISH:
return ClientJdbcFinishTxRequest.process(in, out, jdbcQueryEventHandler);
return ClientJdbcFinishTxRequest.process(in, out, jdbcQueryEventHandler, clockService);

case ClientOp.SQL_EXEC_SCRIPT:
return ClientSqlExecuteScriptRequest.process(in, queryProcessor, igniteTransactions);
return ClientSqlExecuteScriptRequest.process(in, queryProcessor);

case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(in, out, queryProcessor, resources);

case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(in, out, queryProcessor, resources, igniteTransactions);
return ClientSqlExecuteBatchRequest.process(in, out, queryProcessor, resources);

case ClientOp.STREAMER_BATCH_SEND:
return ClientStreamerBatchSendRequest.process(in, out, igniteTables);
Expand Down Expand Up @@ -995,7 +994,7 @@ private long observableTimestamp(@Nullable ClientMessagePacker out) {
}
}

return clockService.currentLong();
return HybridTimestamp.MIN_VALUE.longValue();
}

private void sendNotification(long requestId, @Nullable Consumer<ClientMessagePacker> writer, @Nullable Throwable err) {
Expand Down
Loading

0 comments on commit d60b660

Please sign in to comment.