diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
index 17cd1317b09..04a585c73be 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
@@ -19,7 +19,6 @@
import io.delta.kernel.annotation.Evolving;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table.
@@ -58,27 +57,6 @@ public interface Engine {
*/
ParquetHandler getParquetHandler();
- /**
- * Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
- *
- *
{@link CommitCoordinatorClientHandler} helps Kernel perform commits to a table which is
- * owned by a commit coordinator.
- *
- * @see Coordinated
- * commit protocol table feature.
- *
This method creates and returns an implementation of {@link
- * CommitCoordinatorClientHandler} based on the provided name and configuration of the
- * underlying commit coordinator client.
- * @param name The identifier or name of the underlying commit coordinator client
- * @param conf The configuration settings for the underlying commit coordinator client
- * @return An implementation of {@link CommitCoordinatorClientHandler} configured for the
- * specified client
- * @since 3.3.0
- */
- CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
- String name, Map conf);
-
/** Get the engine's {@link MetricsReporter} instances to push reports to. */
default List getMetricsReporters() {
return Collections.emptyList();
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
index 72ee67492fa..e30a04f5e21 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
@@ -20,7 +20,6 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
-import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.DomainMetadata;
@@ -180,18 +179,6 @@ public Optional getLatestTransactionVersion(Engine engine, String applicat
*/
public Optional getTableCommitCoordinatorClientHandlerOpt(
Engine engine) {
- return COORDINATED_COMMITS_COORDINATOR_NAME
- .fromMetadata(metadata)
- .map(
- commitCoordinatorStr -> {
- CommitCoordinatorClientHandler handler =
- engine.getCommitCoordinatorClientHandler(
- commitCoordinatorStr,
- COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata));
- return new TableCommitCoordinatorClientHandler(
- handler,
- logPath.toString(),
- COORDINATED_COMMITS_TABLE_CONF.fromMetadata(metadata));
- });
+ return Optional.empty();
}
}
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java
similarity index 89%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java
index 250bcdcb355..841daeb0d74 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.utils.FileStatus;
@@ -22,8 +22,8 @@
/**
* Representation of a commit file. It contains the version of the commit, the file status of the
* commit, and the timestamp of the commit. This is used when we want to get the commit information
- * from the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} and {@link
- * io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits} APIs.
+ * from the {@link CommitCoordinatorClientHandler#commit} and {@link
+ * CommitCoordinatorClientHandler#getCommits} APIs.
*
* @since 3.3.0
*/
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java
similarity index 97%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java
index 5fdae8a8c66..619c1b76bbe 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java
@@ -14,13 +14,12 @@
* limitations under the License.
*/
-package io.delta.kernel.engine;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
-import io.delta.kernel.engine.coordinatedcommits.*;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java
similarity index 92%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java
index 5fc33d4c85e..f5f06697f7f 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java
@@ -14,12 +14,12 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
/**
- * Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}
+ * Exception raised by {@link CommitCoordinatorClientHandler#commit}
*
*
* | retryable | conflict | meaning |
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
similarity index 86%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
index 55ad6599c8f..ed23e99203a 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
@@ -14,12 +14,12 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
/**
- * Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
+ * Response container for {@link CommitCoordinatorClientHandler#commit}.
*
* @since 3.3.0
*/
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
similarity index 86%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
index 31c22672314..85b4c298726 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
@@ -14,16 +14,15 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
import java.util.List;
import java.util.Map;
/**
- * Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
- * String, Map, Long, Long)}. Holds all the commits that have not been backfilled as per the commit
- * coordinator.
+ * Response container for {@link CommitCoordinatorClientHandler#getCommits( String, Map, Long,
+ * Long)}. Holds all the commits that have not been backfilled as per the commit coordinator.
*
* @since 3.3.0
*/
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
similarity index 84%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
index a790dd9e556..0f9841403c9 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
@@ -14,16 +14,16 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
import io.delta.kernel.annotation.Evolving;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
/**
- * A container class to inform the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler}
- * about any changes in Protocol/Metadata
+ * A container class to inform the {@link CommitCoordinatorClientHandler} about any changes in
+ * Protocol/Metadata
*
* @since 3.3.0
*/
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
similarity index 94%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
index b4afcee37ee..1d2b1f1b849 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
import io.delta.kernel.annotation.Evolving;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
similarity index 96%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
index 0bc152115c3..46352e6bbe5 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
import io.delta.kernel.annotation.Evolving;
import java.util.List;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
similarity index 95%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
index ecd177a1d6a..9773b007aae 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
import io.delta.kernel.annotation.Evolving;
import java.util.Set;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
index afa43246a96..92784102f41 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
@@ -29,13 +29,13 @@
import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
-import io.delta.kernel.engine.coordinatedcommits.Commit;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
+import io.delta.kernel.internal.coordinatedcommits.Commit;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
index 8f97b2e0d2a..c379319522b 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
@@ -17,11 +17,11 @@
package io.delta.kernel.internal.snapshot;
import io.delta.kernel.data.Row;
-import io.delta.kernel.engine.CommitCoordinatorClientHandler;
-import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
-import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
-import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
-import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler;
+import io.delta.kernel.internal.coordinatedcommits.CommitFailedException;
+import io.delta.kernel.internal.coordinatedcommits.CommitResponse;
+import io.delta.kernel.internal.coordinatedcommits.GetCommitsResponse;
+import io.delta.kernel.internal.coordinatedcommits.UpdatedActions;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
index ac4de23b8be..d6c73925738 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
@@ -15,12 +15,12 @@
*/
package io.delta.kernel.internal.util;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import java.util.*;
public class CoordinatedCommitsUtils {
diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
index aa38cda8409..02b6e2c4e2f 100644
--- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
+++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
@@ -20,12 +20,11 @@ import java.{lang => javaLang}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
-import io.delta.kernel.engine.CommitCoordinatorClientHandler
-import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse}
import io.delta.kernel.exceptions.InvalidTableException
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.actions.CommitInfo
import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile}
+import io.delta.kernel.internal.coordinatedcommits.{Commit, CommitCoordinatorClientHandler, CommitResponse, GetCommitsResponse}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager, TableCommitCoordinatorClientHandler}
import io.delta.kernel.internal.util.{FileNames, Utils}
diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
index c7eb9caf7f9..ea095a94adb 100644
--- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
+++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
@@ -18,6 +18,7 @@ package io.delta.kernel.test
import io.delta.kernel.engine._
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
import io.delta.kernel.expressions.{Column, Expression, ExpressionEvaluator, Predicate, PredicateEvaluator}
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
import io.delta.kernel.types.{DataType, StructType}
import io.delta.kernel.utils.{CloseableIterator, DataFileStatus, FileStatus}
@@ -70,11 +71,6 @@ trait MockEngineUtils {
override def getParquetHandler: ParquetHandler =
Option(parquetHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))
-
- override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
- CommitCoordinatorClientHandler =
- Option(commitCoordinatorClientHandler).getOrElse(
- throw new UnsupportedOperationException("not supported in this test suite"))
}
}
}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java
deleted file mode 100644
index b0d2ac0c7f0..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.engine;
-
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorProvider;
-import io.delta.kernel.defaults.internal.coordinatedcommits.StorageKernelAPIAdapter;
-import io.delta.kernel.defaults.internal.json.JsonUtils;
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
-import io.delta.kernel.engine.CommitCoordinatorClientHandler;
-import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
-import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
-import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.storage.LogStore;
-import io.delta.storage.commit.CommitCoordinatorClient;
-import io.delta.storage.commit.CommitFailedException;
-import io.delta.storage.commit.TableDescriptor;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Default implementation of {@link CommitCoordinatorClientHandler} based on Hadoop APIs which uses
- * commit coordinator defined in delta-storage modules. It takes a Hadoop {@link Configuration}
- * object to interact with the commit coordinator client. The following optional configurations can
- * be set to customize the behavior of the client:
- *
- *
- * - {@code io.delta.kernel.logStore..impl} - The class name of the custom {@link
- * LogStore} implementation to use for operations on storage systems with the specified {@code
- * scheme}. For example, to use a custom {@link LogStore} for S3 storage objects:
- *
{@code
- *
- * io.delta.kernel.logStore.s3.impl
- * com.example.S3LogStore
- *
- *
- * }
- * If not set, the default LogStore implementation for the scheme will be used.
- * - {@code delta.enableFastS3AListFrom} - Set to {@code true} to enable fast listing
- * functionality when using a {@link LogStore} created for S3 storage objects.
- *
- {@code io.delta.kernel.commitCoordinatorBuilder..impl} - The class name of the custom
- * {@link io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorBuilder}
- * implementation to use for building the commit coordinator client.
- *
- */
-public class DefaultCommitCoordinatorClientHandler implements CommitCoordinatorClientHandler {
- private final Configuration hadoopConf;
- private final CommitCoordinatorClient commitCoordinatorClient;
-
- /**
- * Create an instance of the default {@link DefaultCommitCoordinatorClientHandler} implementation.
- *
- * @param hadoopConf Configuration to use. List of options to customize the behavior of the client
- * can be found in the class documentation.
- * @param name The identifier or name of the underlying commit coordinator client
- * @param commitCoordinatorConf The configuration settings for the underlying commit coordinator
- * client which contains the necessary information to create the client such as the endpoint
- * etc.
- */
- public DefaultCommitCoordinatorClientHandler(
- Configuration hadoopConf, String name, Map commitCoordinatorConf) {
- this.hadoopConf = hadoopConf;
- this.commitCoordinatorClient =
- CommitCoordinatorProvider.getCommitCoordinatorClient(
- hadoopConf, name, commitCoordinatorConf);
- }
-
- @Override
- public Map registerTable(
- String logPath,
- long currentVersion,
- AbstractMetadata currentMetadata,
- AbstractProtocol currentProtocol) {
- // TODO: Introduce table identifier concept in Table API in Kernel and plumb the
- // table identifier into `CommitCoordinatorClient` in all APIs.
- return commitCoordinatorClient.registerTable(
- new Path(logPath),
- Optional.empty() /* table identifier */,
- currentVersion,
- StorageKernelAPIAdapter.toStorageAbstractMetadata(currentMetadata),
- StorageKernelAPIAdapter.toStorageAbstractProtocol(currentProtocol));
- }
-
- @Override
- public CommitResponse commit(
- String logPath,
- Map tableConf,
- long commitVersion,
- CloseableIterator actions,
- UpdatedActions updatedActions)
- throws io.delta.kernel.engine.coordinatedcommits.CommitFailedException {
- Path path = new Path(logPath);
- LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
- try {
- return StorageKernelAPIAdapter.toKernelAPICommitResponse(
- commitCoordinatorClient.commit(
- logStore,
- hadoopConf,
- new TableDescriptor(path, Optional.empty() /* table identifier */, tableConf),
- commitVersion,
- new Iterator() {
- @Override
- public boolean hasNext() {
- return actions.hasNext();
- }
-
- @Override
- public String next() {
- return JsonUtils.rowToJson(actions.next());
- }
- },
- StorageKernelAPIAdapter.toStorageUpdatedActions(updatedActions)));
- } catch (CommitFailedException e) {
- throw StorageKernelAPIAdapter.toKernelAPICommitFailedException(e);
- }
- }
-
- @Override
- public GetCommitsResponse getCommits(
- String logPath, Map tableConf, Long startVersion, Long endVersion) {
- TableDescriptor tableDesc = new TableDescriptor(new Path(logPath), Optional.empty(), tableConf);
- return StorageKernelAPIAdapter.toKernelAPIGetCommitsResponse(
- commitCoordinatorClient.getCommits(tableDesc, startVersion, endVersion));
- }
-
- @Override
- public void backfillToVersion(
- String logPath, Map tableConf, long version, Long lastKnownBackfilledVersion)
- throws IOException {
- Path path = new Path(logPath);
- LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
- TableDescriptor tableDesc =
- new TableDescriptor(path, Optional.empty() /* table identifier */, tableConf);
- commitCoordinatorClient.backfillToVersion(
- logStore, hadoopConf, tableDesc, version, lastKnownBackfilledVersion);
- }
-
- @Override
- public boolean semanticEquals(CommitCoordinatorClientHandler other) {
- return other instanceof DefaultCommitCoordinatorClientHandler
- && commitCoordinatorClient.semanticEquals(
- ((DefaultCommitCoordinatorClientHandler) other).commitCoordinatorClient);
- }
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
index 610b9d62a55..7b9172c19f1 100644
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
+++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
@@ -16,7 +16,6 @@
package io.delta.kernel.defaults.engine;
import io.delta.kernel.engine.*;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
/** Default implementation of {@link Engine} based on Hadoop APIs. */
@@ -47,12 +46,6 @@ public ParquetHandler getParquetHandler() {
return new DefaultParquetHandler(hadoopConf);
}
- @Override
- public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
- String name, Map conf) {
- return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf);
- }
-
/**
* Create an instance of {@link DefaultEngine}.
*
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java
deleted file mode 100644
index 71b67edb597..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import io.delta.storage.commit.CommitCoordinatorClient;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-
-/** A builder interface for {@link CommitCoordinatorClient}. */
-public abstract class CommitCoordinatorBuilder {
- private Configuration initHadoopConf;
-
- public CommitCoordinatorBuilder(Configuration initHadoopConf) {
- this.initHadoopConf = initHadoopConf;
- }
- /** Name of the commit-coordinator */
- public abstract String getName();
-
- /** Returns a commit-coordinator client based on the given conf */
- public abstract CommitCoordinatorClient build(Map conf);
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java
deleted file mode 100644
index 4239b2bb390..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import static io.delta.kernel.defaults.internal.DefaultEngineErrors.canNotInstantiateCommitCoordinatorBuilder;
-
-import io.delta.kernel.defaults.engine.DefaultCommitCoordinatorClientHandler;
-import io.delta.storage.commit.CommitCoordinatorClient;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory to get the correct {@link CommitCoordinatorClient} for a table which is used by {@link
- * DefaultCommitCoordinatorClientHandler} to get the commit coordinator client using the {@code
- * getCommitCoordinatorClient} method.
- */
-public class CommitCoordinatorProvider {
- private static final Logger logger = LoggerFactory.getLogger(CommitCoordinatorProvider.class);
-
- /**
- * Returns a {@link CommitCoordinatorClient} for the given `hadoopConf`, `name` and
- * `commitCoordinatorConf`. Caller can set `io.delta.kernel.commitCoordinatorBuilder.{@code
- * name}.impl` to specify the {@link CommitCoordinatorBuilder} implementation to use for {@code
- * name}.
- *
- * @param hadoopConf {@link Configuration} to use for creating the {@link
- * CommitCoordinatorBuilder}.
- * @param name Name of the commit-coordinator.
- * @param commitCoordinatorConf Configuration for building the commit coordinator used by the
- * {@link CommitCoordinatorBuilder}.
- * @return {@link CommitCoordinatorClient} instance.
- */
- public static CommitCoordinatorClient getCommitCoordinatorClient(
- Configuration hadoopConf, String name, Map commitCoordinatorConf) {
- // Check if the CommitCoordinatorBuilder implementation is set in the configuration.
- String classNameFromConfig = hadoopConf.get(getCommitCoordinatorNameConfKey(name));
- if (classNameFromConfig != null) {
- CommitCoordinatorBuilder builder =
- createCommitCoordinatorBuilder(classNameFromConfig, hadoopConf, "from config");
- return builder.build(commitCoordinatorConf);
- } else {
- throw new IllegalArgumentException("Unknown commit-coordinator: " + name);
- }
- }
-
- /**
- * Configuration key for setting the CommitCoordinatorBuilder implementation for a name. ex:
- * `io.delta.kernel.commitCoordinatorBuilder.in-memory.impl` ->
- * `io.delta.storage.InMemoryCommitCoordinatorBuilder`
- */
- static String getCommitCoordinatorNameConfKey(String name) {
- return "io.delta.kernel.commitCoordinatorBuilder." + name + ".impl";
- }
-
- /** Utility method to get the CommitCoordinatorBuilder class from the class name. */
- private static Class extends CommitCoordinatorBuilder> getCommitCoordinatorBuilderClass(
- String commitCoordinatorBuilderClassName) throws ClassNotFoundException {
- return Class.forName(commitCoordinatorBuilderClassName)
- .asSubclass(CommitCoordinatorBuilder.class);
- }
-
- private static CommitCoordinatorBuilder createCommitCoordinatorBuilder(
- String className, Configuration hadoopConf, String context) {
- try {
- return getCommitCoordinatorBuilderClass(className)
- .getConstructor(Configuration.class)
- .newInstance(hadoopConf);
- } catch (Exception e) {
- String msgTemplate = "Failed to instantiate CommitCoordinatorBuilder class ({}): {}";
- logger.error(msgTemplate, context, className, e);
- throw canNotInstantiateCommitCoordinatorBuilder(className, context, e);
- }
- }
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java
deleted file mode 100644
index 53e379741ab..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import io.delta.kernel.engine.coordinatedcommits.*;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
-import io.delta.kernel.utils.FileStatus;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Adaptor to convert between Kernel's commit coordinator classes and the equivalent classes in
- * delta-storage.
- */
-public class StorageKernelAPIAdapter {
- public static io.delta.storage.commit.UpdatedActions toStorageUpdatedActions(
- UpdatedActions updatedActions) {
- if (updatedActions == null) {
- return null;
- }
- return new io.delta.storage.commit.UpdatedActions(
- toStorageAbstractCommitInfo(updatedActions.getCommitInfo()),
- toStorageAbstractMetadata(updatedActions.getNewMetadata()),
- toStorageAbstractProtocol(updatedActions.getNewProtocol()),
- toStorageAbstractMetadata(updatedActions.getOldMetadata()),
- toStorageAbstractProtocol(updatedActions.getOldProtocol()));
- }
-
- public static CommitResponse toKernelAPICommitResponse(
- io.delta.storage.commit.CommitResponse response) {
- return new CommitResponse(toKernelAPICommit(response.getCommit()));
- }
-
- public static Commit toKernelAPICommit(io.delta.storage.commit.Commit commit) {
- return new Commit(
- commit.getVersion(),
- toKernelAPIFileStatus(commit.getFileStatus()),
- commit.getCommitTimestamp());
- }
-
- public static FileStatus toKernelAPIFileStatus(org.apache.hadoop.fs.FileStatus hadoopFileStatus) {
- return FileStatus.of(
- hadoopFileStatus.getPath().toString(),
- hadoopFileStatus.getLen(),
- hadoopFileStatus.getModificationTime());
- }
-
- public static GetCommitsResponse toKernelAPIGetCommitsResponse(
- io.delta.storage.commit.GetCommitsResponse response) {
- List commits =
- response.getCommits().stream()
- .map(StorageKernelAPIAdapter::toKernelAPICommit)
- .collect(Collectors.toList());
- return new GetCommitsResponse(commits, response.getLatestTableVersion());
- }
-
- public static CommitFailedException toKernelAPICommitFailedException(
- io.delta.storage.commit.CommitFailedException e) {
- return new CommitFailedException(e.getRetryable(), e.getConflict(), e.getMessage());
- }
-
- public static io.delta.storage.commit.actions.AbstractMetadata toStorageAbstractMetadata(
- AbstractMetadata metadata) {
- return new io.delta.storage.commit.actions.AbstractMetadata() {
- @Override
- public String getId() {
- return metadata.getId();
- }
-
- @Override
- public String getName() {
- return metadata.getName();
- }
-
- @Override
- public String getDescription() {
- return metadata.getDescription();
- }
-
- @Override
- public String getProvider() {
- return metadata.getProvider();
- }
-
- @Override
- public Map getFormatOptions() {
- return metadata.getFormatOptions();
- }
-
- @Override
- public String getSchemaString() {
- return metadata.getSchemaString();
- }
-
- @Override
- public List getPartitionColumns() {
- return metadata.getPartitionColumns();
- }
-
- @Override
- public Map getConfiguration() {
- return metadata.getConfiguration();
- }
-
- @Override
- public Long getCreatedTime() {
- return metadata.getCreatedTime().orElse(null);
- }
- };
- }
-
- public static io.delta.storage.commit.actions.AbstractProtocol toStorageAbstractProtocol(
- AbstractProtocol protocol) {
- return new io.delta.storage.commit.actions.AbstractProtocol() {
- @Override
- public int getMinReaderVersion() {
- return protocol.getMinReaderVersion();
- }
-
- @Override
- public int getMinWriterVersion() {
- return protocol.getMinWriterVersion();
- }
-
- @Override
- public Set getReaderFeatures() {
- return protocol.getReaderFeatures();
- }
-
- @Override
- public Set getWriterFeatures() {
- return protocol.getWriterFeatures();
- }
- };
- }
-
- public static io.delta.storage.commit.actions.AbstractCommitInfo toStorageAbstractCommitInfo(
- AbstractCommitInfo commitInfo) {
- return commitInfo::getCommitTimestamp;
- }
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
index c3fe8acaca1..d30d5d1f92a 100644
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
+++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
@@ -18,11 +18,12 @@ package io.delta.kernel.defaults
import java.io.File
import io.delta.kernel.Table
-import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine, ExpressionHandler, FileSystemClient}
+import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient}
import io.delta.kernel.data.ColumnarBatch
import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler}
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.checkpoints.Checkpointer
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames
import io.delta.kernel.internal.util.Utils.toCloseableIterator
@@ -366,9 +367,6 @@ class MetricsEngine(config: Configuration) extends Engine {
override def getFileSystemClient: FileSystemClient = impl.getFileSystemClient
override def getParquetHandler: MetricsParquetHandler = parquetHandler
-
- override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
- CommitCoordinatorClientHandler = impl.getCommitCoordinatorClientHandler(name, conf)
}
/**
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala
deleted file mode 100644
index 9feb2ea04ba..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import CommitCoordinatorProvider._
-import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
-import io.delta.kernel.defaults.utils.TestUtils
-import io.delta.kernel.internal.actions.Metadata
-import io.delta.kernel.internal.TableConfig
-import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitResponse, GetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
-import io.delta.storage.LogStore
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.scalatest.funsuite.AnyFunSuite
-
-import java.{lang, util}
-import java.util.{Collections, Optional}
-import scala.collection.convert.ImplicitConversions.`map AsScala`
-import scala.collection.JavaConverters._
-
-class CommitCoordinatorProviderSuite extends AnyFunSuite with TestUtils {
- test("getCommitCoordinator - builder returns same object") {
- val hadoopConf = new Configuration()
- hadoopConf.set(
- getCommitCoordinatorNameConfKey("cc-x"),
- classOf[Builder1].getName)
- val cc1 = getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url1").asJava)
- assert(cc1.isInstanceOf[TestCommitCoordinatorClient1])
- val cc1_again = getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url1").asJava)
- assert(cc1 eq cc1_again)
- val cc2 =
- getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url2", "a" -> "b").asJava)
- assert(cc2.isInstanceOf[TestCommitCoordinatorClient2])
- // If builder receives a config which doesn't have expected params, then it can throw exception.
- intercept[IllegalArgumentException] {
- getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url3").asJava)
- }
- }
-
- test("getCommitCoordinatorClient - builder returns new object each time") {
- val hadoopConf = new Configuration()
- hadoopConf.set(getCommitCoordinatorNameConfKey("cc-name"), classOf[Builder2].getName)
- val cc1 = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("url" -> "url1").asJava)
- assert(cc1.isInstanceOf[TestCommitCoordinatorClient1])
- val cc1_again = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("url" -> "url1").asJava)
- assert(cc1 ne cc1_again)
- }
-
- test("Semantic Equality works as expected on CommitCoordinatorClientHandler") {
-
- withTempDirAndEngine( { (tablePath, engine) =>
- // Different CommitCoordinatorHandler with same keys should be semantically equal.
- val obj1 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
- val obj2 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
- assert(obj1 != obj2)
- assert(obj1.semanticEquals(obj2))
-
- // Different CommitCoordinatorHandler with different keys should be semantically unequal.
- val obj3 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url2").asJava)
- assert(obj1 != obj3)
- assert(!obj1.semanticEquals(obj3))
- }, Map(getCommitCoordinatorNameConfKey("cc-name") -> classOf[Builder3].getName))
- }
-
- test("CommitCoordinatorClientHandler works as expected") {
- withTempDirAndEngine( { (tablePath, engine) =>
- val hadoopConf = new Configuration()
- hadoopConf.set(getCommitCoordinatorNameConfKey("cc-name"), classOf[Builder4].getName)
-
- // Different CommitCoordinatorHandler with same keys should be semantically equal.
- val obj1 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
- val obj2 = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("key" -> "url1").asJava)
-
- assert(
- obj1.registerTable("logPath", 1, null, null) ===
- obj2.registerTable(new Path("logPath"), Optional.empty(), 1, null, null))
-
- val tableDesc =
- new TableDescriptor(new Path("logPath"), Optional.empty(), Collections.emptyMap())
- assert(
- obj1.getCommits("logPath", Collections.emptyMap(), 1, 2).getLatestTableVersion ===
- obj2.getCommits(tableDesc, 1, 2).getLatestTableVersion)
-
- assert(
- obj1.commit("logPath", Collections.emptyMap(), 1, null, null).getCommit.getVersion ===
- obj2
- .commit(null, null, tableDesc, 1, null, null)
- .getCommit
- .getVersion)
-
- val ex = intercept[UnsupportedOperationException] {
- obj1.backfillToVersion("logPath", null, 1, null)
- }
-
- assert(
- ex.getMessage.contains(
- "BackfillToVersion not implemented in TestCommitCoordinatorClient for logPath"))
- }, Map(getCommitCoordinatorNameConfKey("cc-name") -> classOf[Builder4].getName))
- }
-
- test("set CommitCoordinator config to a class that doesn't extend CommitCoordinator") {
- val hadoopConf = new Configuration()
- hadoopConf.set(getCommitCoordinatorNameConfKey("fake"), "java.lang.String")
- val e = intercept[IllegalArgumentException](
- getCommitCoordinatorClient(hadoopConf, "fake", Collections.emptyMap())
- )
- assert(e.getMessage.contains(
- "Can not instantiate `CommitCoordinatorBuilder` class (from config): %s"
- .format("java.lang.String")))
- }
-}
-
-protected trait TestCommitCoordinatorClientBase extends CommitCoordinatorClient {
- override def registerTable(
- logPath: Path,
- tableIdentifier: Optional[TableIdentifier],
- currentVersion: Long,
- currentMetadata: AbstractMetadata,
- currentProtocol: AbstractProtocol): util.Map[String, String] = {
- throw new UnsupportedOperationException("Not implemented")
- }
-
- override def commit(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- commitVersion: Long,
- actions: util.Iterator[String],
- updatedActions: UpdatedActions): CommitResponse = {
- throw new UnsupportedOperationException("Not implemented")
- }
-
- override def getCommits(
- tableDesc: TableDescriptor,
- startVersion: lang.Long,
- endVersion: lang.Long = null): GetCommitsResponse =
- new GetCommitsResponse(Collections.emptyList(), -1)
-
- override def backfillToVersion(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- version: Long,
- lastKnownBackfilledVersion: lang.Long): Unit = {}
-
- override def semanticEquals(other: CommitCoordinatorClient): Boolean = this == other
-}
-
-// Test 1
-// Builder that returns same object
-private[coordinatedcommits] class TestCommitCoordinatorClient1
- extends TestCommitCoordinatorClientBase
-private[coordinatedcommits] class TestCommitCoordinatorClient2
- extends TestCommitCoordinatorClientBase
-
-object TestCommitCoordinatorClientInstances {
- val cc1 = new TestCommitCoordinatorClient1()
- val cc2 = new TestCommitCoordinatorClient2()
-}
-
-class Builder1(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- conf.getOrElse("url", "") match {
- case "url1" => TestCommitCoordinatorClientInstances.cc1
- case "url2" => TestCommitCoordinatorClientInstances.cc2
- case _ => throw new IllegalArgumentException("Invalid url")
- }
- }
- override def getName: String = "cc-x"
-}
-
-// Test 2
-// Builder that returns new object each time
-class Builder2(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- conf.getOrElse("url", "") match {
- case "url1" => new TestCommitCoordinatorClient1()
- case _ => throw new IllegalArgumentException("Invalid url")
- }
- }
- override def getName: String = "cc-name"
-}
-
-// Test 3
-// Commit Coordinator Client with semanticEquals implemented for testing
-class TestCommitCoordinatorClient3(val key: String) extends TestCommitCoordinatorClientBase {
- override def semanticEquals(other: CommitCoordinatorClient): Boolean =
- other.isInstanceOf[TestCommitCoordinatorClient3] &&
- other.asInstanceOf[TestCommitCoordinatorClient3].key == key
-}
-
-// Builder that builds TestCommitCoordinatorClient3
-class Builder3(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- new TestCommitCoordinatorClient3(conf("key"))
- }
- override def getName: String = "cc-name"
-}
-
-// Test 4
-// Commit Coordinator Client with all methods implemented for testing the usage of
-// CommitCoordinatorClientHandler
-class TestCommitCoordinatorClient4 extends TestCommitCoordinatorClientBase {
- val fileStatus = new FileStatus()
- fileStatus.setPath(new Path("logPath"))
- override def registerTable(
- logPath: Path,
- tableIdentifier: Optional[TableIdentifier],
- currentVersion: Long,
- currentMetadata: AbstractMetadata,
- currentProtocol: AbstractProtocol): util.Map[String, String] = {
- Map("tableKey" -> "tableValue").asJava
- }
-
- override def getCommits(
- tableDesc: TableDescriptor,
- startVersion: lang.Long,
- endVersion: lang.Long = null): GetCommitsResponse = {
- new GetCommitsResponse(
- List(new Commit(-1, fileStatus, -1)).asJava, -1)
- }
-
- override def commit(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- commitVersion: Long,
- actions: util.Iterator[String],
- updatedActions: UpdatedActions): CommitResponse = {
- new CommitResponse(new Commit(-1, fileStatus, -1))
- }
-
- override def backfillToVersion(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- version: Long,
- lastKnownBackfilledVersion: lang.Long): Unit = {
- throw new UnsupportedOperationException(
- "BackfillToVersion not implemented in TestCommitCoordinatorClient" +
- " for %s".format(tableDesc.getLogPath))
- }
-}
-
-// Builder that builds TestCommitCoordinatorClient4
-class Builder4(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
- lazy val coordinator = new TestCommitCoordinatorClient4()
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- coordinator
- }
- override def getName: String = "cc-name"
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
deleted file mode 100644
index 2709fef5a3d..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
-import io.delta.kernel.defaults.utils.TestRow
-import io.delta.kernel.internal.TableConfig._
-import io.delta.kernel.Table
-import io.delta.kernel.data.Row
-import io.delta.kernel.defaults.engine.DefaultCommitCoordinatorClientHandler
-import io.delta.kernel.engine.Engine
-import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
-import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol, SingleAction}
-import io.delta.kernel.internal.actions.SingleAction.{createMetadataSingleAction, FULL_SCHEMA}
-import io.delta.kernel.internal.fs.{Path => KernelPath}
-import io.delta.kernel.internal.snapshot.{SnapshotManager, TableCommitCoordinatorClientHandler}
-import io.delta.kernel.internal.util.{CoordinatedCommitsUtils, FileNames}
-import io.delta.kernel.internal.util.Preconditions.checkArgument
-import io.delta.kernel.internal.util.Utils.{closeCloseables, singletonCloseableIterator, toCloseableIterator}
-import io.delta.kernel.utils.{CloseableIterator, FileStatus}
-import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse, InMemoryCommitCoordinator, TableDescriptor, TableIdentifier, UpdatedActions, CoordinatedCommitsUtils => CCU}
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import io.delta.storage.LogStore
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import java.{lang, util}
-import java.util.{Collections, Optional}
-import scala.collection.convert.ImplicitConversions.`iterator asScala`
-import scala.collection.JavaConverters._
-import scala.math
-
-class CoordinatedCommitsSuite extends DeltaTableWriteSuiteBase
- with CoordinatedCommitsTestUtils {
-
- private val trackingInMemoryBatchSize10Config = Map(
- CommitCoordinatorProvider.getCommitCoordinatorNameConfKey("tracking-in-memory") ->
- classOf[TrackingInMemoryCommitCoordinatorBuilder].getName,
- InMemoryCommitCoordinatorBuilder.BATCH_SIZE_CONF_KEY -> "10")
-
- def setupCoordinatedCommitFilesForTest(
- engine: Engine,
- tablePath: String,
- coordinatorName: String = "tracking-in-memory",
- coordinatorConf: String = "{}",
- tableConfToOverwrite: String = null,
- versionConvertToCC: Long = 0L,
- coordinatedCommitNum: Long = 3L,
- checkpointVersion: Long = -1L,
- deleteVersion: Long = -1L): Unit = {
- assert(checkpointVersion < versionConvertToCC)
- val versionToDelete = math.max(versionConvertToCC + 1, deleteVersion)
-
- val handler =
- engine.getCommitCoordinatorClientHandler(
- coordinatorName, OBJ_MAPPER.readValue(coordinatorConf, classOf[util.Map[String, String]]))
- val logPath = new Path("file:" + tablePath, "_delta_log")
- val tableSpark = Table.forPath(engine, tablePath)
- val totalCommitNum = coordinatedCommitNum + versionConvertToCC
-
- (0L until totalCommitNum).foreach(version => {
- spark.range(
- version * 10, version * 10 + 10).write.format("delta").mode("append").save(tablePath)
- })
- checkAnswer(
- spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)),
- (0L until totalCommitNum * 10L).map(TestRow(_)))
-
- var tableConf: util.Map[String, String] = null
-
- /** Rewrite the FS to CC conversion commit and move coordinated commits to _commits folder */
- (0L until totalCommitNum).foreach{ version =>
- val commitFilePath = getHadoopDeltaFile(logPath, version)
-
- if (version == versionConvertToCC) {
- tableConf = handler.registerTable(
- logPath.toString,
- version - 1L,
- CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(getEmptyMetadata),
- CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(1, 1)))
- val tableConfString = if (tableConfToOverwrite != null) {
- tableConfToOverwrite
- } else {
- OBJ_MAPPER.writeValueAsString(tableConf)
- }
- val rows = addCoordinatedCommitToMetadataRow(
- engine,
- commitFilePath,
- tableSpark.getSnapshotAsOfVersion(engine, version).asInstanceOf[SnapshotImpl],
- Map(
- TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> coordinatorName,
- TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> coordinatorConf,
- TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> tableConfString))
- writeConvertToCCCommit(engine, logPath, rows, version)
- } else if (version > versionConvertToCC) {
- val rows = getRowsFromFile(engine, commitFilePath)
- commit(logPath.toString, tableConf, version, version, rows, handler)
- if (version >= versionToDelete) {
- logPath.getFileSystem(hadoopConf).delete(commitFilePath)
- }
- } else if (version == checkpointVersion) {
- tableSpark.checkpoint(engine, version)
- }
- }
- }
-
- def testWithCoordinatorCommits(
- testName: String,
- hadoopConf: Map[String, String] = Map.empty)(f: (String, Engine) => Unit): Unit = {
- test(testName) {
- InMemoryCommitCoordinatorBuilder.clearInMemoryInstances()
- withTempDirAndEngine(f, hadoopConf)
- }
- }
-
- testWithCoordinatorCommits("cold snapshot initialization", trackingInMemoryBatchSize10Config) {
- (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(engine, tablePath)
-
- val table = Table.forPath(engine, tablePath)
- for (version <- 0L to 1L) {
- val snapshot = table.getSnapshotAsOfVersion(engine, version)
- val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
- checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
- }
-
- TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
- val snapshot2 = table.getLatestSnapshot(engine)
- val result2 = readSnapshot(snapshot2, snapshot2.getSchema(engine), null, null, engine)
- checkAnswer(result2, (0L to 29L).map(TestRow(_)))
- assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
- }
-
- testWithCoordinatorCommits(
- "snapshot read should use coordinated commit related properties properly",
- Map(CommitCoordinatorProvider.getCommitCoordinatorNameConfKey("test-coordinator") ->
- classOf[TestCommitCoordinatorBuilder].getName)) { (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine,
- tablePath,
- coordinatorName = "test-coordinator",
- coordinatorConf = OBJ_MAPPER.writeValueAsString(
- TestCommitCoordinator.EXP_COORDINATOR_CONF))
-
- val table = Table.forPath(engine, tablePath)
- val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
- val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
- checkAnswer(result, (0L to 29L).map(TestRow(_)))
- assert(snapshot.getTableCommitCoordinatorClientHandlerOpt(engine).isPresent)
- assert(
- snapshot
- .getTableCommitCoordinatorClientHandlerOpt(engine)
- .get()
- .semanticEquals(
- engine.getCommitCoordinatorClientHandler(
- "test-coordinator", TestCommitCoordinator.EXP_COORDINATOR_CONF)))
- }
-
- testWithCoordinatorCommits(
- "snapshot read fails if we try to put bad value for COORDINATED_COMMITS_TABLE_CONF",
- trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine,
- tablePath,
- tableConfToOverwrite = """{"key1": "string_value", "key2Int": "2""")
-
- val table = Table.forPath(engine, tablePath)
- intercept[RuntimeException] {
- table.getLatestSnapshot(engine)
- }
- }
-
- testWithCoordinatorCommits(
- "snapshot read with checkpoint before table converted to coordinated commit table",
- trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine,
- tablePath,
- versionConvertToCC = 2L,
- coordinatedCommitNum = 2L,
- checkpointVersion = 1L)
-
- val table = Table.forPath(engine, tablePath)
- for (version <- 0L to 2L) {
- val snapshot = table.getSnapshotAsOfVersion(engine, version)
- val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
- checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
- }
-
- TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
- val snapshot3 = table.getLatestSnapshot(engine)
- val result3 = readSnapshot(snapshot3, snapshot3.getSchema(engine), null, null, engine)
- checkAnswer(result3, (0L to 39L).map(TestRow(_)))
- assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
- }
-
- testWithCoordinatorCommits(
- "snapshot read with overlap between filesystem based commits and coordinated commits",
- trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine,
- tablePath,
- versionConvertToCC = 2L,
- coordinatedCommitNum = 4L,
- deleteVersion = 4L)
-
- val table = Table.forPath(engine, tablePath)
- for (version <- 0L to 4L) {
- val snapshot = table.getSnapshotAsOfVersion(engine, version)
- val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
- checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
- }
-
- TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
- val snapshot5 = table.getLatestSnapshot(engine)
- val result5 = readSnapshot(snapshot5, snapshot5.getSchema(engine), null, null, engine)
- checkAnswer(result5, (0L to 59L).map(TestRow(_)))
- assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
- }
-
- testWithCoordinatorCommits(
- "getSnapshotAt with coordinated commits enabled", trackingInMemoryBatchSize10Config) {
- (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine, tablePath, versionConvertToCC = 2L)
-
- val table = Table.forPath(engine, tablePath)
- for (version <- 0L to 4L) {
- val snapshot = table.getSnapshotAsOfVersion(engine, version)
- val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
- checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
- }
- }
-
- testWithCoordinatorCommits(
- "versionToLoad higher than possible", trackingInMemoryBatchSize10Config) {
- (tablePath, engine) =>
- setupCoordinatedCommitFilesForTest(
- engine, tablePath, versionConvertToCC = 2L)
- val table = Table.forPath(engine, tablePath)
- val e = intercept[RuntimeException] {
- table.getSnapshotAsOfVersion(engine, 5L)
- }
- assert(e.getMessage.contains(
- "Cannot load table version 5 as it does not exist. The latest available version is 4"))
- }
-
- def getRowsFromFile(engine: Engine, delta: Path): CloseableIterator[Row] = {
- val file = FileStatus.of(delta.toString, 0, 0)
- val columnarBatches =
- engine.getJsonHandler.readJsonFiles(
- singletonCloseableIterator(file),
- SingleAction.FULL_SCHEMA,
- Optional.empty())
-
-
- var allRowsIterators = List.empty[Row]
-
- while (columnarBatches.hasNext) {
- val batch = columnarBatches.next()
- val rows = batch.getRows
- while (rows.hasNext) {
- val row = rows.next()
- allRowsIterators = allRowsIterators :+ row
- }
- }
-
- toCloseableIterator(allRowsIterators.iterator.asJava)
- }
-
- def addCoordinatedCommitToMetadataRow(
- engine: Engine,
- delta: Path,
- snapshot: SnapshotImpl,
- configurations: Map[String, String]): CloseableIterator[Row] = {
- var rows = getRowsFromFile(engine, delta)
- val metadata = snapshot.getMetadata.withNewConfiguration(configurations.asJava)
- var hasMetadataRow = false
- rows = rows.map(row => {
- val metadataOrd = row.getSchema.indexOf("metaData")
- if (row.isNullAt(metadataOrd)) {
- row
- } else {
- hasMetadataRow = true
- createMetadataSingleAction(metadata.toRow)
- }
- })
- if (!hasMetadataRow) {
- toCloseableIterator((rows.toIterator ++ singletonCloseableIterator(
- createMetadataSingleAction(metadata.toRow)).toIterator).asJava)
- } else {
- rows
- }
- }
-}
-
-object TestCommitCoordinator {
- val EXP_TABLE_CONF: util.Map[String, String] = Map(
- "tableKey1" -> "string_value",
- "tableKey2Int" -> "2",
- "tableKey3ComplexStr" -> "\"hello\""
- ).asJava
-
- val EXP_COORDINATOR_CONF: util.Map[String, String] = Map(
- "coordinatorKey1" -> "string_value",
- "coordinatorKey2Int" -> "2",
- "coordinatorKey3ComplexStr" -> "\"hello\"").asJava
-
- val COORDINATOR = new TestCommitCoordinatorClient()
-}
-
-/**
- * A [[CommitCoordinatorClient]] that tests can use to check the coordinator configuration and
- * table configuration.
- */
-class TestCommitCoordinatorClient extends InMemoryCommitCoordinator(10) {
- override def registerTable(
- logPath: Path,
- tableIdentifier: Optional[TableIdentifier],
- currentVersion: Long,
- currentMetadata: AbstractMetadata,
- currentProtocol: AbstractProtocol): util.Map[String, String] = {
- super.registerTable(logPath, tableIdentifier, currentVersion, currentMetadata, currentProtocol)
- TestCommitCoordinator.EXP_TABLE_CONF
- }
- override def getCommits(
- tableDesc: TableDescriptor,
- startVersion: lang.Long,
- endVersion: lang.Long = null): GetCommitsResponse = {
- checkArgument(tableDesc.getTableConf == TestCommitCoordinator.EXP_TABLE_CONF)
- super.getCommits(tableDesc, startVersion, endVersion)
- }
- override def commit(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- commitVersion: Long,
- actions: util.Iterator[String],
- updatedActions: UpdatedActions): CommitResponse = {
- checkArgument(tableDesc.getTableConf == TestCommitCoordinator.EXP_TABLE_CONF)
- super.commit(logStore, hadoopConf, tableDesc, commitVersion, actions, updatedActions)
- }
-}
-
-class TestCommitCoordinatorBuilder(
- hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- checkArgument(conf == TestCommitCoordinator.EXP_COORDINATOR_CONF)
- TestCommitCoordinator.COORDINATOR
- }
- override def getName: String = "test-coordinator"
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala
deleted file mode 100644
index 61379564770..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import io.delta.kernel.data.Row
-
-import java.{lang, util}
-import io.delta.storage.commit.{CommitCoordinatorClient, InMemoryCommitCoordinator, Commit => StorageCommit, CommitResponse => StorageCommitResponse, GetCommitsResponse => StorageGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions => StorageUpdatedActions}
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
-import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine}
-import io.delta.kernel.internal.actions.{CommitInfo, Format, Metadata, Protocol}
-import io.delta.kernel.internal.TableConfig
-import io.delta.kernel.internal.util.{CoordinatedCommitsUtils, FileNames, VectorUtils}
-import io.delta.kernel.internal.util.VectorUtils.{stringArrayValue, stringVector}
-import io.delta.kernel.utils.CloseableIterator
-import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse, UpdatedActions}
-import io.delta.kernel.types.{LongType, StringType, StructType}
-import io.delta.storage.LogStore
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import java.util.{Collections, Optional}
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
-
-trait CoordinatedCommitsTestUtils {
-
- val hadoopConf = new Configuration()
- def getEmptyMetadata: Metadata = {
- new Metadata(
- util.UUID.randomUUID().toString,
- Optional.empty(),
- Optional.empty(),
- new Format(),
- "",
- null,
- stringArrayValue(Collections.emptyList()),
- Optional.empty(),
- VectorUtils.stringStringMapValue(Collections.emptyMap())
- )
- }
-
- def getProtocol(minReaderVersion: Int, minWriterVersion: Int): Protocol = {
- new Protocol(
- minReaderVersion, minWriterVersion, Collections.emptyList(), Collections.emptyList())
- }
-
- def getCommitInfo(newTimestamp: Long): CommitInfo = {
- new CommitInfo(
- Optional.of(newTimestamp),
- -1,
- null,
- null,
- Collections.emptyMap(),
- true,
- null,
- Collections.emptyMap()
- )
- }
-
- def commit(
- logPath: String,
- tableConf: util.Map[String, String],
- version: Long,
- timestamp: Long,
- commit: CloseableIterator[Row],
- commitCoordinatorClientHandler: CommitCoordinatorClientHandler): Commit = {
- val updatedCommitInfo = getCommitInfo(timestamp)
- val updatedActions = if (version == 0) {
- getUpdatedActionsForZerothCommit(updatedCommitInfo)
- } else {
- getUpdatedActionsForNonZerothCommit(updatedCommitInfo)
- }
- commitCoordinatorClientHandler.commit(
- logPath,
- tableConf,
- version,
- commit,
- updatedActions).getCommit
- }
-
- def getHadoopDeltaFile(logPath: Path, version: Long): Path = {
- new Path(FileNames.deltaFile(new io.delta.kernel.internal.fs.Path(logPath.toString), version))
- }
-
- def writeConvertToCCCommit(
- engine: Engine, logPath: Path, commit: CloseableIterator[Row], version: Long): Unit = {
- createLogPath(engine, logPath)
- engine.getJsonHandler.writeJsonFileAtomically(
- getHadoopDeltaFile(logPath, version).toString,
- commit,
- true)
- }
-
- def createLogPath(engine: Engine, logPath: Path): Unit = {
- // New table, create a delta log directory
- if (!engine.getFileSystemClient.mkdirs(logPath.toString)) {
- throw new RuntimeException("Failed to create delta log directory: " + logPath)
- }
- }
-
- def getUpdatedActionsForZerothCommit(
- commitInfo: CommitInfo,
- oldMetadata: Metadata = getEmptyMetadata): UpdatedActions = {
- val newMetadataConfiguration =
- Map(TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "in-memory",
- TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}")
- val newMetadata = oldMetadata.withNewConfiguration(newMetadataConfiguration.asJava)
- new UpdatedActions(
- CoordinatedCommitsUtils.convertCommitInfoToAbstractCommitInfo(commitInfo),
- CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(newMetadata),
- CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(3, 7)),
- CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(oldMetadata),
- CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(3, 7)))
- }
-
- def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo): UpdatedActions = {
- val updatedActions = getUpdatedActionsForZerothCommit(commitInfo)
- new UpdatedActions(
- updatedActions.getCommitInfo,
- updatedActions.getNewMetadata,
- updatedActions.getNewProtocol,
- updatedActions.getNewMetadata, // oldMetadata is replaced with newMetadata
- updatedActions.getOldProtocol)
- }
-
- def getVersionTimestampSchema: StructType = {
- new StructType()
- .add("version", LongType.LONG)
- .add("timestamp", LongType.LONG)
- }
-
- def getCommitRows(engine: Engine, version: Long, timestamp: Long): CloseableIterator[Row] = {
- val input = Seq(
- s"""{
- | "version":$version,
- | "timestamp":$timestamp
- |}
- |""".stripMargin.linesIterator.mkString)
-
- engine.getJsonHandler.parseJson(
- stringVector(input.asJava), getVersionTimestampSchema, Optional.empty()).getRows
- }
-}
-
-case class TrackingInMemoryCommitCoordinatorBuilder(hadoopConf: Configuration)
- extends CommitCoordinatorBuilder(hadoopConf) {
- override def getName: String = "tracking-in-memory"
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- new TrackingCommitCoordinatorClient(
- new InMemoryCommitCoordinatorBuilder(hadoopConf).build(conf)
- .asInstanceOf[InMemoryCommitCoordinator])
- }
-}
-
-object TrackingCommitCoordinatorClient {
- val numCommitsCalled = new AtomicInteger(0)
- val numGetCommitsCalled = new AtomicInteger(0)
- val numBackfillToVersionCalled = new AtomicInteger(0)
- val numRegisterTableCalled = new AtomicInteger(0)
-
- private val insideOperation = new ThreadLocal[Boolean] {
- override def initialValue(): Boolean = false
- }
-}
-
-class TrackingCommitCoordinatorClient(delegatingCommitCoordinatorClient: InMemoryCommitCoordinator)
- extends CommitCoordinatorClient {
-
- def recordOperation[T](op: String)(f: => T): T = {
- val oldInsideOperation = TrackingCommitCoordinatorClient.insideOperation.get()
- try {
- if (!TrackingCommitCoordinatorClient.insideOperation.get()) {
- op match {
- case "commit" => TrackingCommitCoordinatorClient.numCommitsCalled.incrementAndGet()
- case "getCommits" => TrackingCommitCoordinatorClient.numGetCommitsCalled.incrementAndGet()
- case "backfillToVersion" =>
- TrackingCommitCoordinatorClient.numBackfillToVersionCalled.incrementAndGet()
- case "registerTable" =>
- TrackingCommitCoordinatorClient.numRegisterTableCalled.incrementAndGet()
- case _ => ()
- }
- }
- TrackingCommitCoordinatorClient.insideOperation.set(true)
- f
- } finally {
- TrackingCommitCoordinatorClient.insideOperation.set(oldInsideOperation)
- }
- }
-
- override def commit(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- commitVersion: Long,
- actions: util.Iterator[String],
- updatedActions: StorageUpdatedActions): StorageCommitResponse = recordOperation("commit") {
- delegatingCommitCoordinatorClient.commit(
- logStore,
- hadoopConf,
- tableDesc,
- commitVersion,
- actions,
- updatedActions)
- }
-
- override def getCommits(
- tableDesc: TableDescriptor,
- startVersion: lang.Long,
- endVersion: lang.Long = null): StorageGetCommitsResponse = recordOperation("getCommits") {
- delegatingCommitCoordinatorClient.getCommits(tableDesc, startVersion, endVersion)
- }
-
- override def backfillToVersion(
- logStore: LogStore,
- hadoopConf: Configuration,
- tableDesc: TableDescriptor,
- version: Long,
- lastKnownBackfilledVersion: lang.Long): Unit = recordOperation("backfillToVersion") {
- delegatingCommitCoordinatorClient.backfillToVersion(
- logStore, hadoopConf, tableDesc, version, lastKnownBackfilledVersion)
- }
-
- override def semanticEquals(other: CommitCoordinatorClient): Boolean = this == other
-
- def reset(): Unit = {
- TrackingCommitCoordinatorClient.numCommitsCalled.set(0)
- TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
- TrackingCommitCoordinatorClient.numBackfillToVersionCalled.set(0)
- }
-
- override def registerTable(
- logPath: Path,
- tableIdentifier: Optional[TableIdentifier],
- currentVersion: Long,
- currentMetadata: AbstractMetadata,
- currentProtocol: AbstractProtocol):
- util.Map[String, String] = recordOperation("registerTable") {
- delegatingCommitCoordinatorClient.registerTable(
- logPath, tableIdentifier, currentVersion, currentMetadata, currentProtocol)
- }
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala
deleted file mode 100644
index 118029f6107..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import java.util
-
-import io.delta.storage.commit.{CommitCoordinatorClient, InMemoryCommitCoordinator}
-import org.apache.hadoop.conf.Configuration
-
-/**
- * The [[InMemoryCommitCoordinatorBuilder]] class is responsible for creating singleton instances of
- * [[InMemoryCommitCoordinator]] with the specified batchSize.
- *
- * For testing purposes, a test can clear the instances of [[InMemoryCommitCoordinator]] by calling
- * [[InMemoryCommitCoordinatorBuilder.clearInMemoryInstances()]], configure the
- * [[InMemoryCommitCoordinatorBuilder]] and batchSize in hadoopConf passed to the engine. In this
- * way, the [[InMemoryCommitCoordinator]] instances can be used by Kernel read and write across
- * the test.
- */
-class InMemoryCommitCoordinatorBuilder(hadoopConf: Configuration)
- extends CommitCoordinatorBuilder(hadoopConf) {
- private val batchSize =
- hadoopConf.getLong(InMemoryCommitCoordinatorBuilder.BATCH_SIZE_CONF_KEY, 1)
-
- /** Name of the commit-coordinator */
- override def getName: String = "in-memory"
-
- /** Returns a commit-coordinator based on the given conf */
- override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
- if (InMemoryCommitCoordinatorBuilder.batchSizeMap.containsKey(batchSize)) {
- InMemoryCommitCoordinatorBuilder.batchSizeMap.get(batchSize)
- } else {
- val coordinator = new PredictableUuidInMemoryCommitCoordinatorClient(batchSize)
- InMemoryCommitCoordinatorBuilder.batchSizeMap.put(batchSize, coordinator)
- coordinator
- }
- }
-}
-
-/**
- * The [[InMemoryCommitCoordinatorBuilder]] companion object is responsible for storing the
- * singleton instances of [[InMemoryCommitCoordinator]] based on the batchSize. This is useful for
- * checking the state of the instances in UTs.
- */
-object InMemoryCommitCoordinatorBuilder {
- val BATCH_SIZE_CONF_KEY = "delta.kernel.default.coordinatedCommits.inMemoryCoordinator.batchSize"
- val batchSizeMap: util.Map[Long, InMemoryCommitCoordinator] =
- new util.HashMap[Long, InMemoryCommitCoordinator]()
-
- // Visible only for UTs
- private[defaults] def clearInMemoryInstances(): Unit = {
- batchSizeMap.clear()
- }
-}
-
-class PredictableUuidInMemoryCommitCoordinatorClient(batchSize: Long)
- extends InMemoryCommitCoordinator(batchSize) {
-
- var nextUuidSuffix = 1L
- override def generateUUID(): String = {
- nextUuidSuffix += 1
- s"uuid-${nextUuidSuffix - 1}"
- }
-}