diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java index 732b487cd15..30ee041077a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.AsyncWrapper; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.sql.ResultSetMetadata; import org.jetbrains.annotations.Nullable; @@ -148,61 +149,63 @@ public AsyncCursor execute( List projectionExpr = lookupNode.projects(); List keyExpressions = lookupNode.keyExpressions(); - RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns); - - Supplier keySupplier = ctx.expressionFactory() - .rowSource(keyExpressions); - Predicate filter = filterExpr == null ? null : ctx.expressionFactory() - .predicate(filterExpr, rowType); - Function projection = projectionExpr == null ? null : ctx.expressionFactory() - .project(projectionExpr, rowType); - - RowHandler rowHandler = ctx.rowHandler(); - RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType)); - RowFactory rowFactory = rowHandler.factory(rowSchema); - - RelDataType resultType = lookupNode.getRowType(); - BiFunction internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType); - - ScannableTable scannableTable = execTable.scannableTable(); - Function> postProcess = row -> { - if (row == null) { - return Collections.emptyIterator(); - } - - if (filter != null && !filter.test(row)) { - return Collections.emptyIterator(); - } - - if (projection != null) { - row = projection.apply(row); - } - - return List.of( - new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter) - ).iterator(); - }; - - CompletableFuture lookupResult = scannableTable.primaryKeyLookup( - ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet() - ); - - CompletableFuture> result; - if (projection == null && filter == null) { - // no arbitrary computations, should be safe to proceed execution on - // thread that completes the future - result = lookupResult.thenApply(postProcess); - } else { - Executor executor = task -> ctx.execute(task::run, error -> { - // this executor is used to process future chain, so any unhandled exception - // should be wrapped with CompletionException and returned as a result, implying - // no error handler should be called. - // But just in case there is error in future processing pipeline let's log error - LOG.error("Unexpected error", error); - }); - - result = lookupResult.thenApplyAsync(postProcess, executor); - } + CompletableFuture> result = CompletableFutures.nullCompletedFuture() + .thenCompose(ignore -> { + RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns); + + Supplier keySupplier = ctx.expressionFactory() + .rowSource(keyExpressions); + Predicate filter = filterExpr == null ? null : ctx.expressionFactory() + .predicate(filterExpr, rowType); + Function projection = projectionExpr == null ? null : ctx.expressionFactory() + .project(projectionExpr, rowType); + + RowHandler rowHandler = ctx.rowHandler(); + RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType)); + RowFactory rowFactory = rowHandler.factory(rowSchema); + + RelDataType resultType = lookupNode.getRowType(); + BiFunction internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType); + + ScannableTable scannableTable = execTable.scannableTable(); + Function> postProcess = row -> { + if (row == null) { + return Collections.emptyIterator(); + } + + if (filter != null && !filter.test(row)) { + return Collections.emptyIterator(); + } + + if (projection != null) { + row = projection.apply(row); + } + + return List.of( + new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter) + ).iterator(); + }; + + CompletableFuture lookupResult = scannableTable.primaryKeyLookup( + ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet() + ); + + if (projection == null && filter == null) { + // no arbitrary computations, should be safe to proceed execution on + // thread that completes the future + return lookupResult.thenApply(postProcess); + } else { + Executor executor = task -> ctx.execute(task::run, error -> { + // this executor is used to process future chain, so any unhandled exception + // should be wrapped with CompletionException and returned as a result, implying + // no error handler should be called. + // But just in case there is error in future processing pipeline let's log error + LOG.error("Unexpected error", error); + }); + + return lookupResult.thenApplyAsync(postProcess, executor); + } + }); if (firstPageReadyCallback != null) { result.whenComplete((res, err) -> firstPageReadyCallback.onPrefetchComplete(err)); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java index 9793a17b845..af48e591938 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.AsyncWrapper; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.sql.ResultSetMetadata; import org.jetbrains.annotations.Nullable; @@ -135,9 +136,12 @@ public AsyncCursor execute( UpdatableTable updatableTable = execTable.updatableTable(); - CompletableFuture> result = updatableTable.insert( - tx, ctx, rowSupplier.get() - ).thenApply(none -> List.of(new InternalSqlRowSingleLong(1L)).iterator()); + CompletableFuture> result = CompletableFutures.nullCompletedFuture() + .thenCompose(ignore -> { + return updatableTable.insert( + tx, ctx, rowSupplier.get() + ).thenApply(none -> List.of(new InternalSqlRowSingleLong(1L)).iterator()); + }); if (firstPageReadyCallback != null) { result.whenComplete((res, err) -> firstPageReadyCallback.onPrefetchComplete(err));