Skip to content

Commit

Permalink
IGNITE-23604 Fixed rollback of table driven tx.
Browse files Browse the repository at this point in the history
  • Loading branch information
xtern committed Dec 20, 2024
1 parent 3cc258a commit 194c1de
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.AsyncWrapper;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.sql.ResultSetMetadata;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -148,61 +149,63 @@ public <RowT> AsyncCursor<InternalSqlRow> execute(
List<RexNode> projectionExpr = lookupNode.projects();
List<RexNode> keyExpressions = lookupNode.keyExpressions();

RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);

Supplier<RowT> keySupplier = ctx.expressionFactory()
.rowSource(keyExpressions);
Predicate<RowT> filter = filterExpr == null ? null : ctx.expressionFactory()
.predicate(filterExpr, rowType);
Function<RowT, RowT> projection = projectionExpr == null ? null : ctx.expressionFactory()
.project(projectionExpr, rowType);

RowHandler<RowT> rowHandler = ctx.rowHandler();
RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);

RelDataType resultType = lookupNode.getRowType();
BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType);

ScannableTable scannableTable = execTable.scannableTable();
Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
if (row == null) {
return Collections.emptyIterator();
}

if (filter != null && !filter.test(row)) {
return Collections.emptyIterator();
}

if (projection != null) {
row = projection.apply(row);
}

return List.<InternalSqlRow>of(
new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
).iterator();
};

CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
);

CompletableFuture<Iterator<InternalSqlRow>> result;
if (projection == null && filter == null) {
// no arbitrary computations, should be safe to proceed execution on
// thread that completes the future
result = lookupResult.thenApply(postProcess);
} else {
Executor executor = task -> ctx.execute(task::run, error -> {
// this executor is used to process future chain, so any unhandled exception
// should be wrapped with CompletionException and returned as a result, implying
// no error handler should be called.
// But just in case there is error in future processing pipeline let's log error
LOG.error("Unexpected error", error);
});

result = lookupResult.thenApplyAsync(postProcess, executor);
}
CompletableFuture<Iterator<InternalSqlRow>> result = CompletableFutures.nullCompletedFuture()
.thenCompose(ignore -> {
RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);

Supplier<RowT> keySupplier = ctx.expressionFactory()
.rowSource(keyExpressions);
Predicate<RowT> filter = filterExpr == null ? null : ctx.expressionFactory()
.predicate(filterExpr, rowType);
Function<RowT, RowT> projection = projectionExpr == null ? null : ctx.expressionFactory()
.project(projectionExpr, rowType);

RowHandler<RowT> rowHandler = ctx.rowHandler();
RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);

RelDataType resultType = lookupNode.getRowType();
BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType);

ScannableTable scannableTable = execTable.scannableTable();
Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
if (row == null) {
return Collections.emptyIterator();
}

if (filter != null && !filter.test(row)) {
return Collections.emptyIterator();
}

if (projection != null) {
row = projection.apply(row);
}

return List.<InternalSqlRow>of(
new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
).iterator();
};

CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
);

if (projection == null && filter == null) {
// no arbitrary computations, should be safe to proceed execution on
// thread that completes the future
return lookupResult.thenApply(postProcess);
} else {
Executor executor = task -> ctx.execute(task::run, error -> {
// this executor is used to process future chain, so any unhandled exception
// should be wrapped with CompletionException and returned as a result, implying
// no error handler should be called.
// But just in case there is error in future processing pipeline let's log error
LOG.error("Unexpected error", error);
});

return lookupResult.thenApplyAsync(postProcess, executor);
}
});

if (firstPageReadyCallback != null) {
result.whenComplete((res, err) -> firstPageReadyCallback.onPrefetchComplete(err));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.AsyncWrapper;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.sql.ResultSetMetadata;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -135,9 +136,12 @@ public <RowT> AsyncCursor<InternalSqlRow> execute(

UpdatableTable updatableTable = execTable.updatableTable();

CompletableFuture<Iterator<InternalSqlRow>> result = updatableTable.insert(
tx, ctx, rowSupplier.get()
).thenApply(none -> List.<InternalSqlRow>of(new InternalSqlRowSingleLong(1L)).iterator());
CompletableFuture<Iterator<InternalSqlRow>> result = CompletableFutures.nullCompletedFuture()
.thenCompose(ignore -> {
return updatableTable.insert(
tx, ctx, rowSupplier.get()
).thenApply(none -> List.<InternalSqlRow>of(new InternalSqlRowSingleLong(1L)).iterator());
});

if (firstPageReadyCallback != null) {
result.whenComplete((res, err) -> firstPageReadyCallback.onPrefetchComplete(err));
Expand Down

0 comments on commit 194c1de

Please sign in to comment.