getWriterFeatures();
+}
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java
new file mode 100644
index 00000000000..189f655a91b
--- /dev/null
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.kernel.engine;
+
+import java.io.IOException;
+import java.util.Map;
+
+import io.delta.kernel.annotation.Evolving;
+import io.delta.kernel.commit.Commit;
+import io.delta.kernel.commit.CommitFailedException;
+import io.delta.kernel.commit.CommitResponse;
+import io.delta.kernel.commit.GetCommitsResponse;
+import io.delta.kernel.commit.UpdatedActions;
+import io.delta.kernel.commit.actions.AbstractMetadata;
+import io.delta.kernel.commit.actions.AbstractProtocol;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.utils.CloseableIterator;
+
+/**
+ * Provides coordinated commits related functionalities to Delta Kernel.
+ *
+ * @since 3.0.0
+ */
+@Evolving
+public interface CommitCoordinatorClientHandler {
+
+ /**
+ * API to register the table represented by the given `logPath` at the provided
+ * currentTableVersion with the commit coordinator this commit coordinator client represents.
+ *
+ * This API is called when the table is being converted from a file system table to a
+ * coordinated-commit table.
+ *
+ * When a new coordinated-commit table is being created, the currentTableVersion will be -1 and
+ * the upgrade commit needs to be a file system commit which will write the backfilled file
+ * directly.
+ *
+ * @param logPath The path to the delta log of the table that should be converted
+ * @param currentVersion The currentTableVersion is the version of the table just before
+ * conversion. currentTableVersion + 1 represents the commit that
+ * will do the conversion. This must be backfilled atomically.
+ * currentTableVersion + 2 represents the first commit after conversion.
+ * This will go through the CommitCoordinatorClient and the client is
+ * free to choose when it wants to backfill this commit.
+ * @param currentMetadata The metadata of the table at currentTableVersion
+ * @param currentProtocol The protocol of the table at currentTableVersion
+ * @return A map of key-value pairs which is issued by the commit coordinator to identify the
+ * table. This should be stored in the table's metadata. This information needs to be
+ * passed to the {@link #commit}, {@link #getCommits}, and {@link #backfillToVersion}
+ * APIs to identify the table.
+ */
+ Map registerTable(
+ String logPath,
+ long currentVersion,
+ AbstractMetadata currentMetadata,
+ AbstractProtocol currentProtocol);
+
+ /**
+ * API to commit the given set of actions to the table represented by logPath at the
+ * given commitVersion.
+ *
+ * @param logPath The path to the delta log of the table that should be committed to.
+ * @param tableConf The table configuration that was returned by the commit coordinator
+ * client during registration.
+ * @param commitVersion The version of the commit that is being committed.
+ * @param actions The actions that need to be committed.
+ * @param updatedActions The commit info and any metadata or protocol changes that are made
+ * as part of this commit.
+ * @return CommitResponse which contains the file status of the committed commit file. If the
+ * commit is already backfilled, then the file status could be omitted from the response
+ * and the client could retrieve the information by itself.
+ */
+ CommitResponse commit(
+ String logPath,
+ Map tableConf,
+ long commitVersion,
+ CloseableIterator actions,
+ UpdatedActions updatedActions) throws IOException, CommitFailedException;
+
+ /**
+ * API to get the unbackfilled commits for the table represented by the given logPath.
+ * Commits older than startVersion or newer than endVersion (if given) are ignored. The
+ * returned commits are contiguous and in ascending version order.
+ *
+ * Note that the first version returned by this API may not be equal to startVersion. This
+ * happens when some versions starting from startVersion have already been backfilled and so
+ * the commit coordinator may have stopped tracking them.
+ *
+ * The returned latestTableVersion is the maximum commit version ratified by the commit
+ * coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
+ * coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
+ *
+ * @param tablePath The path to the delta log of the table for which the unbackfilled
+ * commits should be retrieved.
+ * @param tableConf The table configuration that was returned by the commit coordinator
+ * during registration.
+ * @param startVersion The minimum version of the commit that should be returned. Can be null.
+ * @param endVersion The maximum version of the commit that should be returned. Can be null.
+ * @return GetCommitsResponse which has a list of {@link Commit}s and the latestTableVersion
+ * which is tracked by {@link CommitCoordinatorClientHandler}.
+ */
+ GetCommitsResponse getCommits(
+ String tablePath,
+ Map tableConf,
+ Long startVersion,
+ Long endVersion);
+
+ /**
+ * API to ask the commit coordinator client to backfill all commits up to {@code version}
+ * and notify the commit coordinator.
+ *
+ * If this API returns successfully, that means the backfill must have been completed, although
+ * the commit coordinator may not be aware of it yet.
+ *
+ * @param logPath The path to the delta log of the table that should be backfilled.
+ * @param tableConf The table configuration that was returned by the commit coordinator
+ * during registration.
+ * @param version The version till which the commit coordinator client should backfill.
+ * @param lastKnownBackfilledVersion The last known version that was backfilled before this API
+ * was called. If it is None or invalid, then the commit
+ * coordinator client should backfill from the beginning of
+ * the table. Can be null.
+ */
+ void backfillToVersion(
+ String logPath,
+ Map tableConf,
+ long version,
+ Long lastKnownBackfilledVersion) throws IOException;
+
+ /**
+ * Determines whether this CommitCoordinatorClient is semantically equal to another
+ * CommitCoordinatorClient.
+ *
+ * Semantic equality is determined by each CommitCoordinatorClient implementation based on
+ * whether the two instances can be used interchangeably when invoking any of the
+ * CommitCoordinatorClient APIs, such as {@link #commit}, {@link #getCommits}, etc. For example,
+ * both instances might be pointing to the same underlying endpoint.
+ */
+ Boolean semanticEquals(CommitCoordinatorClientHandler other);
+}
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
index 3a7801dd797..c027c5c490a 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
@@ -16,6 +16,8 @@
package io.delta.kernel.engine;
+import java.util.Map;
+
import io.delta.kernel.annotation.Evolving;
/**
@@ -55,4 +57,13 @@ public interface Engine {
* @return An implementation of {@link ParquetHandler}.
*/
ParquetHandler getParquetHandler();
+
+ /**
+ * Get the connector provided {@link CommitCoordinatorClientHandler}.
+ *
+ * @param name Name of the underlying commit coordinator client.
+ * @return An implementation of {@link CommitCoordinatorClientHandler}.
+ */
+ CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
+ String name, Map conf);
}
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
index 118f1dc20da..037b1dffc74 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
@@ -20,6 +20,7 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
+import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;
@@ -30,7 +31,7 @@
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
-import static io.delta.kernel.internal.TableConfig.TOMBSTONE_RETENTION;
+import static io.delta.kernel.internal.TableConfig.*;
/**
* Implementation of {@link Snapshot}.
@@ -159,4 +160,12 @@ public long getTimestamp(Engine engine) {
return logSegment.lastCommitTimestamp;
}
}
+
+ public Optional getCommitCoordinatorClientHandlerOpt(
+ Engine engine) {
+ return COORDINATED_COMMITS_COORDINATOR_NAME.fromMetadata(metadata).map(
+ commitCoordinatorStr -> engine.getCommitCoordinatorClientHandler(
+ commitCoordinatorStr,
+ COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata)));
+ }
}
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java
index b7c1586fc7a..36a32446121 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java
@@ -19,6 +19,10 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.actions.Metadata;
@@ -29,6 +33,8 @@
* from the table metadata.
*/
public class TableConfig {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
/**
* The shortest duration we have to keep logically deleted data files around before deleting
* them physically.
@@ -105,6 +111,45 @@ public class TableConfig {
"needs to be a long."
);
+
+ /*
+ * This table property is used to track the commit-coordinator name for this table. If this
+ * property is not set, the table will be considered as file system table and commits will be
+ * done via atomically publishing the commit file.
+ */
+ public static final TableConfig> COORDINATED_COMMITS_COORDINATOR_NAME =
+ new TableConfig<>(
+ "delta.coordinatedCommits.commitCoordinator-preview",
+ null, /* default values */
+ Optional::ofNullable,
+ value -> true,
+ "The commit-coordinator name for this table. This is used to determine\n" +
+ "which implementation of commit-coordinator to use when committing\n" +
+ "to this table. If this property is not set, the table will be\n" +
+ "considered as file system table and commits will be done via\n" +
+ "atomically publishing the commit file.\n"
+ );
+
+ public static final TableConfig