diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java index 17cd1317b09..04a585c73be 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java @@ -19,7 +19,6 @@ import io.delta.kernel.annotation.Evolving; import java.util.Collections; import java.util.List; -import java.util.Map; /** * Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table. @@ -58,27 +57,6 @@ public interface Engine { */ ParquetHandler getParquetHandler(); - /** - * Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client. - * - *

{@link CommitCoordinatorClientHandler} helps Kernel perform commits to a table which is - * owned by a commit coordinator. - * - * @see Coordinated - * commit protocol table feature. - *

This method creates and returns an implementation of {@link - * CommitCoordinatorClientHandler} based on the provided name and configuration of the - * underlying commit coordinator client. - * @param name The identifier or name of the underlying commit coordinator client - * @param conf The configuration settings for the underlying commit coordinator client - * @return An implementation of {@link CommitCoordinatorClientHandler} configured for the - * specified client - * @since 3.3.0 - */ - CommitCoordinatorClientHandler getCommitCoordinatorClientHandler( - String name, Map conf); - /** Get the engine's {@link MetricsReporter} instances to push reports to. */ default List getMetricsReporters() { return Collections.emptyList(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 72ee67492fa..e30a04f5e21 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -20,7 +20,6 @@ import io.delta.kernel.ScanBuilder; import io.delta.kernel.Snapshot; -import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.DomainMetadata; @@ -180,18 +179,6 @@ public Optional getLatestTransactionVersion(Engine engine, String applicat */ public Optional getTableCommitCoordinatorClientHandlerOpt( Engine engine) { - return COORDINATED_COMMITS_COORDINATOR_NAME - .fromMetadata(metadata) - .map( - commitCoordinatorStr -> { - CommitCoordinatorClientHandler handler = - engine.getCommitCoordinatorClientHandler( - commitCoordinatorStr, - COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata)); - return new TableCommitCoordinatorClientHandler( - handler, - logPath.toString(), - COORDINATED_COMMITS_TABLE_CONF.fromMetadata(metadata)); - }); + return Optional.empty(); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java similarity index 89% rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java index 250bcdcb355..841daeb0d74 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/Commit.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/Commit.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.delta.kernel.engine.coordinatedcommits; +package io.delta.kernel.internal.coordinatedcommits; import io.delta.kernel.annotation.Evolving; import io.delta.kernel.utils.FileStatus; @@ -22,8 +22,8 @@ /** * Representation of a commit file. It contains the version of the commit, the file status of the * commit, and the timestamp of the commit. This is used when we want to get the commit information - * from the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} and {@link - * io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits} APIs. + * from the {@link CommitCoordinatorClientHandler#commit} and {@link + * CommitCoordinatorClientHandler#getCommits} APIs. * * @since 3.3.0 */ diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java similarity index 97% rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java index 5fdae8a8c66..619c1b76bbe 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitCoordinatorClientHandler.java @@ -14,13 +14,12 @@ * limitations under the License. */ -package io.delta.kernel.engine; +package io.delta.kernel.internal.coordinatedcommits; import io.delta.kernel.annotation.Evolving; import io.delta.kernel.data.Row; -import io.delta.kernel.engine.coordinatedcommits.*; -import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata; -import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol; +import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata; +import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; import java.util.Map; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java similarity index 92% rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java index 5fc33d4c85e..f5f06697f7f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitFailedException.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitFailedException.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package io.delta.kernel.engine.coordinatedcommits; +package io.delta.kernel.internal.coordinatedcommits; import io.delta.kernel.annotation.Evolving; /** - * Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} + * Exception raised by {@link CommitCoordinatorClientHandler#commit} * *

  *  | retryable | conflict  | meaning                                                         |
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
similarity index 86%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
index 55ad6599c8f..ed23e99203a 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/CommitResponse.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/CommitResponse.java
@@ -14,12 +14,12 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
 
 import io.delta.kernel.annotation.Evolving;
 
 /**
- * Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
+ * Response container for {@link CommitCoordinatorClientHandler#commit}.
  *
  * @since 3.3.0
  */
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
similarity index 86%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
index 31c22672314..85b4c298726 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/GetCommitsResponse.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/GetCommitsResponse.java
@@ -14,16 +14,15 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
 
 import io.delta.kernel.annotation.Evolving;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
- * String, Map, Long, Long)}. Holds all the commits that have not been backfilled as per the commit
- * coordinator.
+ * Response container for {@link CommitCoordinatorClientHandler#getCommits( String, Map, Long,
+ * Long)}. Holds all the commits that have not been backfilled as per the commit coordinator.
  *
  * @since 3.3.0
  */
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
similarity index 84%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
index a790dd9e556..0f9841403c9 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/UpdatedActions.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/UpdatedActions.java
@@ -14,16 +14,16 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits;
+package io.delta.kernel.internal.coordinatedcommits;
 
 import io.delta.kernel.annotation.Evolving;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
 
 /**
- * A container class to inform the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler}
- * about any changes in Protocol/Metadata
+ * A container class to inform the {@link CommitCoordinatorClientHandler} about any changes in
+ * Protocol/Metadata
  *
  * @since 3.3.0
  */
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
similarity index 94%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
index b4afcee37ee..1d2b1f1b849 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractCommitInfo.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractCommitInfo.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
 
 import io.delta.kernel.annotation.Evolving;
 
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
similarity index 96%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
index 0bc152115c3..46352e6bbe5 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractMetadata.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractMetadata.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
 
 import io.delta.kernel.annotation.Evolving;
 import java.util.List;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
similarity index 95%
rename from kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java
rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
index ecd177a1d6a..9773b007aae 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/coordinatedcommits/actions/AbstractProtocol.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/coordinatedcommits/actions/AbstractProtocol.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package io.delta.kernel.engine.coordinatedcommits.actions;
+package io.delta.kernel.internal.coordinatedcommits.actions;
 
 import io.delta.kernel.annotation.Evolving;
 import java.util.Set;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
index afa43246a96..92784102f41 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
@@ -29,13 +29,13 @@
 
 import io.delta.kernel.*;
 import io.delta.kernel.engine.Engine;
-import io.delta.kernel.engine.coordinatedcommits.Commit;
 import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
 import io.delta.kernel.exceptions.InvalidTableException;
 import io.delta.kernel.exceptions.TableNotFoundException;
 import io.delta.kernel.internal.*;
 import io.delta.kernel.internal.actions.Metadata;
 import io.delta.kernel.internal.checkpoints.*;
+import io.delta.kernel.internal.coordinatedcommits.Commit;
 import io.delta.kernel.internal.fs.Path;
 import io.delta.kernel.internal.lang.ListUtils;
 import io.delta.kernel.internal.replay.CreateCheckpointIterator;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
index 8f97b2e0d2a..c379319522b 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
@@ -17,11 +17,11 @@
 package io.delta.kernel.internal.snapshot;
 
 import io.delta.kernel.data.Row;
-import io.delta.kernel.engine.CommitCoordinatorClientHandler;
-import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
-import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
-import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
-import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler;
+import io.delta.kernel.internal.coordinatedcommits.CommitFailedException;
+import io.delta.kernel.internal.coordinatedcommits.CommitResponse;
+import io.delta.kernel.internal.coordinatedcommits.GetCommitsResponse;
+import io.delta.kernel.internal.coordinatedcommits.UpdatedActions;
 import io.delta.kernel.utils.CloseableIterator;
 import java.io.IOException;
 import java.util.Map;
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
index ac4de23b8be..d6c73925738 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CoordinatedCommitsUtils.java
@@ -15,12 +15,12 @@
  */
 package io.delta.kernel.internal.util;
 
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
 import io.delta.kernel.internal.actions.CommitInfo;
 import io.delta.kernel.internal.actions.Metadata;
 import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
+import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
 import java.util.*;
 
 public class CoordinatedCommitsUtils {
diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
index aa38cda8409..02b6e2c4e2f 100644
--- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
+++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
@@ -20,12 +20,11 @@ import java.{lang => javaLang}
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
-import io.delta.kernel.engine.CommitCoordinatorClientHandler
-import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse}
 import io.delta.kernel.exceptions.InvalidTableException
 import io.delta.kernel.expressions.Predicate
 import io.delta.kernel.internal.actions.CommitInfo
 import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile}
+import io.delta.kernel.internal.coordinatedcommits.{Commit, CommitCoordinatorClientHandler, CommitResponse, GetCommitsResponse}
 import io.delta.kernel.internal.fs.Path
 import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager, TableCommitCoordinatorClientHandler}
 import io.delta.kernel.internal.util.{FileNames, Utils}
diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
index c7eb9caf7f9..ea095a94adb 100644
--- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
+++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala
@@ -18,6 +18,7 @@ package io.delta.kernel.test
 import io.delta.kernel.engine._
 import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
 import io.delta.kernel.expressions.{Column, Expression, ExpressionEvaluator, Predicate, PredicateEvaluator}
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
 import io.delta.kernel.types.{DataType, StructType}
 import io.delta.kernel.utils.{CloseableIterator, DataFileStatus, FileStatus}
 
@@ -70,11 +71,6 @@ trait MockEngineUtils {
       override def getParquetHandler: ParquetHandler =
         Option(parquetHandler).getOrElse(
           throw new UnsupportedOperationException("not supported in this test suite"))
-
-      override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
-      CommitCoordinatorClientHandler =
-        Option(commitCoordinatorClientHandler).getOrElse(
-          throw new UnsupportedOperationException("not supported in this test suite"))
     }
   }
 }
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java
deleted file mode 100644
index b0d2ac0c7f0..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.engine;
-
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorProvider;
-import io.delta.kernel.defaults.internal.coordinatedcommits.StorageKernelAPIAdapter;
-import io.delta.kernel.defaults.internal.json.JsonUtils;
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
-import io.delta.kernel.engine.CommitCoordinatorClientHandler;
-import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
-import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
-import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.storage.LogStore;
-import io.delta.storage.commit.CommitCoordinatorClient;
-import io.delta.storage.commit.CommitFailedException;
-import io.delta.storage.commit.TableDescriptor;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Default implementation of {@link CommitCoordinatorClientHandler} based on Hadoop APIs which uses
- * commit coordinator defined in delta-storage modules. It takes a Hadoop {@link Configuration}
- * object to interact with the commit coordinator client. The following optional configurations can
- * be set to customize the behavior of the client:
- *
- * 
- */
-public class DefaultCommitCoordinatorClientHandler implements CommitCoordinatorClientHandler {
-  private final Configuration hadoopConf;
-  private final CommitCoordinatorClient commitCoordinatorClient;
-
-  /**
-   * Create an instance of the default {@link DefaultCommitCoordinatorClientHandler} implementation.
-   *
-   * @param hadoopConf Configuration to use. List of options to customize the behavior of the client
-   *     can be found in the class documentation.
-   * @param name The identifier or name of the underlying commit coordinator client
-   * @param commitCoordinatorConf The configuration settings for the underlying commit coordinator
-   *     client which contains the necessary information to create the client such as the endpoint
-   *     etc.
-   */
-  public DefaultCommitCoordinatorClientHandler(
-      Configuration hadoopConf, String name, Map commitCoordinatorConf) {
-    this.hadoopConf = hadoopConf;
-    this.commitCoordinatorClient =
-        CommitCoordinatorProvider.getCommitCoordinatorClient(
-            hadoopConf, name, commitCoordinatorConf);
-  }
-
-  @Override
-  public Map registerTable(
-      String logPath,
-      long currentVersion,
-      AbstractMetadata currentMetadata,
-      AbstractProtocol currentProtocol) {
-    // TODO: Introduce table identifier concept in Table API in Kernel and plumb the
-    //  table identifier into `CommitCoordinatorClient` in all APIs.
-    return commitCoordinatorClient.registerTable(
-        new Path(logPath),
-        Optional.empty() /* table identifier */,
-        currentVersion,
-        StorageKernelAPIAdapter.toStorageAbstractMetadata(currentMetadata),
-        StorageKernelAPIAdapter.toStorageAbstractProtocol(currentProtocol));
-  }
-
-  @Override
-  public CommitResponse commit(
-      String logPath,
-      Map tableConf,
-      long commitVersion,
-      CloseableIterator actions,
-      UpdatedActions updatedActions)
-      throws io.delta.kernel.engine.coordinatedcommits.CommitFailedException {
-    Path path = new Path(logPath);
-    LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
-    try {
-      return StorageKernelAPIAdapter.toKernelAPICommitResponse(
-          commitCoordinatorClient.commit(
-              logStore,
-              hadoopConf,
-              new TableDescriptor(path, Optional.empty() /* table identifier */, tableConf),
-              commitVersion,
-              new Iterator() {
-                @Override
-                public boolean hasNext() {
-                  return actions.hasNext();
-                }
-
-                @Override
-                public String next() {
-                  return JsonUtils.rowToJson(actions.next());
-                }
-              },
-              StorageKernelAPIAdapter.toStorageUpdatedActions(updatedActions)));
-    } catch (CommitFailedException e) {
-      throw StorageKernelAPIAdapter.toKernelAPICommitFailedException(e);
-    }
-  }
-
-  @Override
-  public GetCommitsResponse getCommits(
-      String logPath, Map tableConf, Long startVersion, Long endVersion) {
-    TableDescriptor tableDesc = new TableDescriptor(new Path(logPath), Optional.empty(), tableConf);
-    return StorageKernelAPIAdapter.toKernelAPIGetCommitsResponse(
-        commitCoordinatorClient.getCommits(tableDesc, startVersion, endVersion));
-  }
-
-  @Override
-  public void backfillToVersion(
-      String logPath, Map tableConf, long version, Long lastKnownBackfilledVersion)
-      throws IOException {
-    Path path = new Path(logPath);
-    LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
-    TableDescriptor tableDesc =
-        new TableDescriptor(path, Optional.empty() /* table identifier */, tableConf);
-    commitCoordinatorClient.backfillToVersion(
-        logStore, hadoopConf, tableDesc, version, lastKnownBackfilledVersion);
-  }
-
-  @Override
-  public boolean semanticEquals(CommitCoordinatorClientHandler other) {
-    return other instanceof DefaultCommitCoordinatorClientHandler
-        && commitCoordinatorClient.semanticEquals(
-            ((DefaultCommitCoordinatorClientHandler) other).commitCoordinatorClient);
-  }
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
index 610b9d62a55..7b9172c19f1 100644
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
+++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java
@@ -16,7 +16,6 @@
 package io.delta.kernel.defaults.engine;
 
 import io.delta.kernel.engine.*;
-import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 
 /** Default implementation of {@link Engine} based on Hadoop APIs. */
@@ -47,12 +46,6 @@ public ParquetHandler getParquetHandler() {
     return new DefaultParquetHandler(hadoopConf);
   }
 
-  @Override
-  public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
-      String name, Map conf) {
-    return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf);
-  }
-
   /**
    * Create an instance of {@link DefaultEngine}.
    *
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java
deleted file mode 100644
index 71b67edb597..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import io.delta.storage.commit.CommitCoordinatorClient;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-
-/** A builder interface for {@link CommitCoordinatorClient}. */
-public abstract class CommitCoordinatorBuilder {
-  private Configuration initHadoopConf;
-
-  public CommitCoordinatorBuilder(Configuration initHadoopConf) {
-    this.initHadoopConf = initHadoopConf;
-  }
-  /** Name of the commit-coordinator */
-  public abstract String getName();
-
-  /** Returns a commit-coordinator client based on the given conf */
-  public abstract CommitCoordinatorClient build(Map conf);
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java
deleted file mode 100644
index 4239b2bb390..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import static io.delta.kernel.defaults.internal.DefaultEngineErrors.canNotInstantiateCommitCoordinatorBuilder;
-
-import io.delta.kernel.defaults.engine.DefaultCommitCoordinatorClientHandler;
-import io.delta.storage.commit.CommitCoordinatorClient;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory to get the correct {@link CommitCoordinatorClient} for a table which is used by {@link
- * DefaultCommitCoordinatorClientHandler} to get the commit coordinator client using the {@code
- * getCommitCoordinatorClient} method.
- */
-public class CommitCoordinatorProvider {
-  private static final Logger logger = LoggerFactory.getLogger(CommitCoordinatorProvider.class);
-
-  /**
-   * Returns a {@link CommitCoordinatorClient} for the given `hadoopConf`, `name` and
-   * `commitCoordinatorConf`. Caller can set `io.delta.kernel.commitCoordinatorBuilder.{@code
-   * name}.impl` to specify the {@link CommitCoordinatorBuilder} implementation to use for {@code
-   * name}.
-   *
-   * @param hadoopConf {@link Configuration} to use for creating the {@link
-   *     CommitCoordinatorBuilder}.
-   * @param name Name of the commit-coordinator.
-   * @param commitCoordinatorConf Configuration for building the commit coordinator used by the
-   *     {@link CommitCoordinatorBuilder}.
-   * @return {@link CommitCoordinatorClient} instance.
-   */
-  public static CommitCoordinatorClient getCommitCoordinatorClient(
-      Configuration hadoopConf, String name, Map commitCoordinatorConf) {
-    // Check if the CommitCoordinatorBuilder implementation is set in the configuration.
-    String classNameFromConfig = hadoopConf.get(getCommitCoordinatorNameConfKey(name));
-    if (classNameFromConfig != null) {
-      CommitCoordinatorBuilder builder =
-          createCommitCoordinatorBuilder(classNameFromConfig, hadoopConf, "from config");
-      return builder.build(commitCoordinatorConf);
-    } else {
-      throw new IllegalArgumentException("Unknown commit-coordinator: " + name);
-    }
-  }
-
-  /**
-   * Configuration key for setting the CommitCoordinatorBuilder implementation for a name. ex:
-   * `io.delta.kernel.commitCoordinatorBuilder.in-memory.impl` ->
-   * `io.delta.storage.InMemoryCommitCoordinatorBuilder`
-   */
-  static String getCommitCoordinatorNameConfKey(String name) {
-    return "io.delta.kernel.commitCoordinatorBuilder." + name + ".impl";
-  }
-
-  /** Utility method to get the CommitCoordinatorBuilder class from the class name. */
-  private static Class getCommitCoordinatorBuilderClass(
-      String commitCoordinatorBuilderClassName) throws ClassNotFoundException {
-    return Class.forName(commitCoordinatorBuilderClassName)
-        .asSubclass(CommitCoordinatorBuilder.class);
-  }
-
-  private static CommitCoordinatorBuilder createCommitCoordinatorBuilder(
-      String className, Configuration hadoopConf, String context) {
-    try {
-      return getCommitCoordinatorBuilderClass(className)
-          .getConstructor(Configuration.class)
-          .newInstance(hadoopConf);
-    } catch (Exception e) {
-      String msgTemplate = "Failed to instantiate CommitCoordinatorBuilder class ({}): {}";
-      logger.error(msgTemplate, context, className, e);
-      throw canNotInstantiateCommitCoordinatorBuilder(className, context, e);
-    }
-  }
-}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java
deleted file mode 100644
index 53e379741ab..00000000000
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/StorageKernelAPIAdapter.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits;
-
-import io.delta.kernel.engine.coordinatedcommits.*;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
-import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
-import io.delta.kernel.utils.FileStatus;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Adaptor to convert between Kernel's commit coordinator classes and the equivalent classes in
- * delta-storage.
- */
-public class StorageKernelAPIAdapter {
-  public static io.delta.storage.commit.UpdatedActions toStorageUpdatedActions(
-      UpdatedActions updatedActions) {
-    if (updatedActions == null) {
-      return null;
-    }
-    return new io.delta.storage.commit.UpdatedActions(
-        toStorageAbstractCommitInfo(updatedActions.getCommitInfo()),
-        toStorageAbstractMetadata(updatedActions.getNewMetadata()),
-        toStorageAbstractProtocol(updatedActions.getNewProtocol()),
-        toStorageAbstractMetadata(updatedActions.getOldMetadata()),
-        toStorageAbstractProtocol(updatedActions.getOldProtocol()));
-  }
-
-  public static CommitResponse toKernelAPICommitResponse(
-      io.delta.storage.commit.CommitResponse response) {
-    return new CommitResponse(toKernelAPICommit(response.getCommit()));
-  }
-
-  public static Commit toKernelAPICommit(io.delta.storage.commit.Commit commit) {
-    return new Commit(
-        commit.getVersion(),
-        toKernelAPIFileStatus(commit.getFileStatus()),
-        commit.getCommitTimestamp());
-  }
-
-  public static FileStatus toKernelAPIFileStatus(org.apache.hadoop.fs.FileStatus hadoopFileStatus) {
-    return FileStatus.of(
-        hadoopFileStatus.getPath().toString(),
-        hadoopFileStatus.getLen(),
-        hadoopFileStatus.getModificationTime());
-  }
-
-  public static GetCommitsResponse toKernelAPIGetCommitsResponse(
-      io.delta.storage.commit.GetCommitsResponse response) {
-    List commits =
-        response.getCommits().stream()
-            .map(StorageKernelAPIAdapter::toKernelAPICommit)
-            .collect(Collectors.toList());
-    return new GetCommitsResponse(commits, response.getLatestTableVersion());
-  }
-
-  public static CommitFailedException toKernelAPICommitFailedException(
-      io.delta.storage.commit.CommitFailedException e) {
-    return new CommitFailedException(e.getRetryable(), e.getConflict(), e.getMessage());
-  }
-
-  public static io.delta.storage.commit.actions.AbstractMetadata toStorageAbstractMetadata(
-      AbstractMetadata metadata) {
-    return new io.delta.storage.commit.actions.AbstractMetadata() {
-      @Override
-      public String getId() {
-        return metadata.getId();
-      }
-
-      @Override
-      public String getName() {
-        return metadata.getName();
-      }
-
-      @Override
-      public String getDescription() {
-        return metadata.getDescription();
-      }
-
-      @Override
-      public String getProvider() {
-        return metadata.getProvider();
-      }
-
-      @Override
-      public Map getFormatOptions() {
-        return metadata.getFormatOptions();
-      }
-
-      @Override
-      public String getSchemaString() {
-        return metadata.getSchemaString();
-      }
-
-      @Override
-      public List getPartitionColumns() {
-        return metadata.getPartitionColumns();
-      }
-
-      @Override
-      public Map getConfiguration() {
-        return metadata.getConfiguration();
-      }
-
-      @Override
-      public Long getCreatedTime() {
-        return metadata.getCreatedTime().orElse(null);
-      }
-    };
-  }
-
-  public static io.delta.storage.commit.actions.AbstractProtocol toStorageAbstractProtocol(
-      AbstractProtocol protocol) {
-    return new io.delta.storage.commit.actions.AbstractProtocol() {
-      @Override
-      public int getMinReaderVersion() {
-        return protocol.getMinReaderVersion();
-      }
-
-      @Override
-      public int getMinWriterVersion() {
-        return protocol.getMinWriterVersion();
-      }
-
-      @Override
-      public Set getReaderFeatures() {
-        return protocol.getReaderFeatures();
-      }
-
-      @Override
-      public Set getWriterFeatures() {
-        return protocol.getWriterFeatures();
-      }
-    };
-  }
-
-  public static io.delta.storage.commit.actions.AbstractCommitInfo toStorageAbstractCommitInfo(
-      AbstractCommitInfo commitInfo) {
-    return commitInfo::getCommitTimestamp;
-  }
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
index c3fe8acaca1..d30d5d1f92a 100644
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
+++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
@@ -18,11 +18,12 @@ package io.delta.kernel.defaults
 
 import java.io.File
 import io.delta.kernel.Table
-import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine, ExpressionHandler, FileSystemClient}
+import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient}
 import io.delta.kernel.data.ColumnarBatch
 import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler}
 import io.delta.kernel.expressions.Predicate
 import io.delta.kernel.internal.checkpoints.Checkpointer
+import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
 import io.delta.kernel.internal.fs.Path
 import io.delta.kernel.internal.util.FileNames
 import io.delta.kernel.internal.util.Utils.toCloseableIterator
@@ -366,9 +367,6 @@ class MetricsEngine(config: Configuration) extends Engine {
   override def getFileSystemClient: FileSystemClient = impl.getFileSystemClient
 
   override def getParquetHandler: MetricsParquetHandler = parquetHandler
-
-  override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
-  CommitCoordinatorClientHandler = impl.getCommitCoordinatorClientHandler(name, conf)
 }
 
 /**
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala
deleted file mode 100644
index 9feb2ea04ba..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProviderSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import CommitCoordinatorProvider._
-import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
-import io.delta.kernel.defaults.utils.TestUtils
-import io.delta.kernel.internal.actions.Metadata
-import io.delta.kernel.internal.TableConfig
-import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitResponse, GetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
-import io.delta.storage.LogStore
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.scalatest.funsuite.AnyFunSuite
-
-import java.{lang, util}
-import java.util.{Collections, Optional}
-import scala.collection.convert.ImplicitConversions.`map AsScala`
-import scala.collection.JavaConverters._
-
-class CommitCoordinatorProviderSuite extends AnyFunSuite with TestUtils {
-  test("getCommitCoordinator - builder returns same object") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set(
-      getCommitCoordinatorNameConfKey("cc-x"),
-      classOf[Builder1].getName)
-    val cc1 = getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url1").asJava)
-    assert(cc1.isInstanceOf[TestCommitCoordinatorClient1])
-    val cc1_again = getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url1").asJava)
-    assert(cc1 eq cc1_again)
-    val cc2 =
-      getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url2", "a" -> "b").asJava)
-    assert(cc2.isInstanceOf[TestCommitCoordinatorClient2])
-    // If builder receives a config which doesn't have expected params, then it can throw exception.
-    intercept[IllegalArgumentException] {
-      getCommitCoordinatorClient(hadoopConf, "cc-x", Map("url" -> "url3").asJava)
-    }
-  }
-
-  test("getCommitCoordinatorClient - builder returns new object each time") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set(getCommitCoordinatorNameConfKey("cc-name"), classOf[Builder2].getName)
-    val cc1 = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("url" -> "url1").asJava)
-    assert(cc1.isInstanceOf[TestCommitCoordinatorClient1])
-    val cc1_again = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("url" -> "url1").asJava)
-    assert(cc1 ne cc1_again)
-  }
-
-  test("Semantic Equality works as expected on CommitCoordinatorClientHandler") {
-
-    withTempDirAndEngine( { (tablePath, engine) =>
-      // Different CommitCoordinatorHandler with same keys should be semantically equal.
-      val obj1 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
-      val obj2 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
-      assert(obj1 != obj2)
-      assert(obj1.semanticEquals(obj2))
-
-      // Different CommitCoordinatorHandler with different keys should be semantically unequal.
-      val obj3 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url2").asJava)
-      assert(obj1 != obj3)
-      assert(!obj1.semanticEquals(obj3))
-    }, Map(getCommitCoordinatorNameConfKey("cc-name") -> classOf[Builder3].getName))
-  }
-
-  test("CommitCoordinatorClientHandler works as expected") {
-    withTempDirAndEngine( { (tablePath, engine) =>
-      val hadoopConf = new Configuration()
-      hadoopConf.set(getCommitCoordinatorNameConfKey("cc-name"), classOf[Builder4].getName)
-
-      // Different CommitCoordinatorHandler with same keys should be semantically equal.
-      val obj1 = engine.getCommitCoordinatorClientHandler("cc-name", Map("key" -> "url1").asJava)
-      val obj2 = getCommitCoordinatorClient(hadoopConf, "cc-name", Map("key" -> "url1").asJava)
-
-      assert(
-        obj1.registerTable("logPath", 1, null, null) ===
-          obj2.registerTable(new Path("logPath"), Optional.empty(), 1, null, null))
-
-      val tableDesc =
-        new TableDescriptor(new Path("logPath"), Optional.empty(), Collections.emptyMap())
-      assert(
-        obj1.getCommits("logPath", Collections.emptyMap(), 1, 2).getLatestTableVersion ===
-          obj2.getCommits(tableDesc, 1, 2).getLatestTableVersion)
-
-      assert(
-        obj1.commit("logPath", Collections.emptyMap(), 1, null, null).getCommit.getVersion ===
-          obj2
-            .commit(null, null, tableDesc, 1, null, null)
-            .getCommit
-            .getVersion)
-
-      val ex = intercept[UnsupportedOperationException] {
-        obj1.backfillToVersion("logPath", null, 1, null)
-      }
-
-      assert(
-        ex.getMessage.contains(
-          "BackfillToVersion not implemented in TestCommitCoordinatorClient for logPath"))
-    }, Map(getCommitCoordinatorNameConfKey("cc-name") -> classOf[Builder4].getName))
-  }
-
-  test("set CommitCoordinator config to a class that doesn't extend CommitCoordinator") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set(getCommitCoordinatorNameConfKey("fake"), "java.lang.String")
-    val e = intercept[IllegalArgumentException](
-      getCommitCoordinatorClient(hadoopConf, "fake", Collections.emptyMap())
-    )
-    assert(e.getMessage.contains(
-      "Can not instantiate `CommitCoordinatorBuilder` class (from config): %s"
-        .format("java.lang.String")))
-  }
-}
-
-protected trait TestCommitCoordinatorClientBase extends CommitCoordinatorClient {
-  override def registerTable(
-    logPath: Path,
-    tableIdentifier: Optional[TableIdentifier],
-    currentVersion: Long,
-    currentMetadata: AbstractMetadata,
-    currentProtocol: AbstractProtocol): util.Map[String, String] = {
-    throw new UnsupportedOperationException("Not implemented")
-  }
-
-  override def commit(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    commitVersion: Long,
-    actions: util.Iterator[String],
-    updatedActions: UpdatedActions): CommitResponse = {
-    throw new UnsupportedOperationException("Not implemented")
-  }
-
-  override def getCommits(
-    tableDesc: TableDescriptor,
-    startVersion: lang.Long,
-    endVersion: lang.Long = null): GetCommitsResponse =
-    new GetCommitsResponse(Collections.emptyList(), -1)
-
-  override def backfillToVersion(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    version: Long,
-    lastKnownBackfilledVersion: lang.Long): Unit = {}
-
-  override def semanticEquals(other: CommitCoordinatorClient): Boolean = this == other
-}
-
-// Test 1
-// Builder that  returns same object
-private[coordinatedcommits] class TestCommitCoordinatorClient1
-  extends TestCommitCoordinatorClientBase
-private[coordinatedcommits] class TestCommitCoordinatorClient2
-  extends TestCommitCoordinatorClientBase
-
-object TestCommitCoordinatorClientInstances {
-  val cc1 = new TestCommitCoordinatorClient1()
-  val cc2 = new TestCommitCoordinatorClient2()
-}
-
-class Builder1(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    conf.getOrElse("url", "") match {
-      case "url1" => TestCommitCoordinatorClientInstances.cc1
-      case "url2" => TestCommitCoordinatorClientInstances.cc2
-      case _ => throw new IllegalArgumentException("Invalid url")
-    }
-  }
-  override def getName: String = "cc-x"
-}
-
-// Test 2
-// Builder that returns new object each time
-class Builder2(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    conf.getOrElse("url", "") match {
-      case "url1" => new TestCommitCoordinatorClient1()
-      case _ => throw new IllegalArgumentException("Invalid url")
-    }
-  }
-  override def getName: String = "cc-name"
-}
-
-// Test 3
-// Commit Coordinator Client with semanticEquals implemented for testing
-class TestCommitCoordinatorClient3(val key: String) extends TestCommitCoordinatorClientBase {
-  override def semanticEquals(other: CommitCoordinatorClient): Boolean =
-    other.isInstanceOf[TestCommitCoordinatorClient3] &&
-      other.asInstanceOf[TestCommitCoordinatorClient3].key == key
-}
-
-// Builder that builds TestCommitCoordinatorClient3
-class Builder3(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    new TestCommitCoordinatorClient3(conf("key"))
-  }
-  override def getName: String = "cc-name"
-}
-
-// Test 4
-// Commit Coordinator Client with all methods implemented for testing the usage of
-// CommitCoordinatorClientHandler
-class TestCommitCoordinatorClient4 extends TestCommitCoordinatorClientBase {
-  val fileStatus = new FileStatus()
-  fileStatus.setPath(new Path("logPath"))
-  override def registerTable(
-    logPath: Path,
-    tableIdentifier: Optional[TableIdentifier],
-    currentVersion: Long,
-    currentMetadata: AbstractMetadata,
-    currentProtocol: AbstractProtocol): util.Map[String, String] = {
-    Map("tableKey" -> "tableValue").asJava
-  }
-
-  override def getCommits(
-    tableDesc: TableDescriptor,
-    startVersion: lang.Long,
-    endVersion: lang.Long = null): GetCommitsResponse = {
-    new GetCommitsResponse(
-      List(new Commit(-1, fileStatus, -1)).asJava, -1)
-  }
-
-  override def commit(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    commitVersion: Long,
-    actions: util.Iterator[String],
-    updatedActions: UpdatedActions): CommitResponse = {
-    new CommitResponse(new Commit(-1, fileStatus, -1))
-  }
-
-  override def backfillToVersion(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    version: Long,
-    lastKnownBackfilledVersion: lang.Long): Unit = {
-    throw new UnsupportedOperationException(
-      "BackfillToVersion not implemented in TestCommitCoordinatorClient" +
-        " for %s".format(tableDesc.getLogPath))
-  }
-}
-
-// Builder that builds TestCommitCoordinatorClient4
-class Builder4(hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
-  lazy val coordinator = new TestCommitCoordinatorClient4()
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    coordinator
-  }
-  override def getName: String = "cc-name"
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
deleted file mode 100644
index 2709fef5a3d..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
-import io.delta.kernel.defaults.utils.TestRow
-import io.delta.kernel.internal.TableConfig._
-import io.delta.kernel.Table
-import io.delta.kernel.data.Row
-import io.delta.kernel.defaults.engine.DefaultCommitCoordinatorClientHandler
-import io.delta.kernel.engine.Engine
-import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
-import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol, SingleAction}
-import io.delta.kernel.internal.actions.SingleAction.{createMetadataSingleAction, FULL_SCHEMA}
-import io.delta.kernel.internal.fs.{Path => KernelPath}
-import io.delta.kernel.internal.snapshot.{SnapshotManager, TableCommitCoordinatorClientHandler}
-import io.delta.kernel.internal.util.{CoordinatedCommitsUtils, FileNames}
-import io.delta.kernel.internal.util.Preconditions.checkArgument
-import io.delta.kernel.internal.util.Utils.{closeCloseables, singletonCloseableIterator, toCloseableIterator}
-import io.delta.kernel.utils.{CloseableIterator, FileStatus}
-import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse, InMemoryCommitCoordinator, TableDescriptor, TableIdentifier, UpdatedActions, CoordinatedCommitsUtils => CCU}
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import io.delta.storage.LogStore
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import java.{lang, util}
-import java.util.{Collections, Optional}
-import scala.collection.convert.ImplicitConversions.`iterator asScala`
-import scala.collection.JavaConverters._
-import scala.math
-
-class CoordinatedCommitsSuite extends DeltaTableWriteSuiteBase
-  with CoordinatedCommitsTestUtils {
-
-  private val trackingInMemoryBatchSize10Config = Map(
-    CommitCoordinatorProvider.getCommitCoordinatorNameConfKey("tracking-in-memory") ->
-      classOf[TrackingInMemoryCommitCoordinatorBuilder].getName,
-    InMemoryCommitCoordinatorBuilder.BATCH_SIZE_CONF_KEY -> "10")
-
-  def setupCoordinatedCommitFilesForTest(
-    engine: Engine,
-    tablePath: String,
-    coordinatorName: String = "tracking-in-memory",
-    coordinatorConf: String = "{}",
-    tableConfToOverwrite: String = null,
-    versionConvertToCC: Long = 0L,
-    coordinatedCommitNum: Long = 3L,
-    checkpointVersion: Long = -1L,
-    deleteVersion: Long = -1L): Unit = {
-    assert(checkpointVersion < versionConvertToCC)
-    val versionToDelete = math.max(versionConvertToCC + 1, deleteVersion)
-
-    val handler =
-      engine.getCommitCoordinatorClientHandler(
-        coordinatorName, OBJ_MAPPER.readValue(coordinatorConf, classOf[util.Map[String, String]]))
-    val logPath = new Path("file:" + tablePath, "_delta_log")
-    val tableSpark = Table.forPath(engine, tablePath)
-    val totalCommitNum = coordinatedCommitNum + versionConvertToCC
-
-    (0L until totalCommitNum).foreach(version => {
-      spark.range(
-        version * 10, version * 10 + 10).write.format("delta").mode("append").save(tablePath)
-    })
-    checkAnswer(
-      spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)),
-      (0L until totalCommitNum * 10L).map(TestRow(_)))
-
-    var tableConf: util.Map[String, String] = null
-
-    /** Rewrite the FS to CC conversion commit and move coordinated commits to _commits folder */
-    (0L until totalCommitNum).foreach{ version =>
-      val commitFilePath = getHadoopDeltaFile(logPath, version)
-
-      if (version == versionConvertToCC) {
-        tableConf = handler.registerTable(
-          logPath.toString,
-          version - 1L,
-          CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(getEmptyMetadata),
-          CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(1, 1)))
-        val tableConfString = if (tableConfToOverwrite != null) {
-          tableConfToOverwrite
-        } else {
-          OBJ_MAPPER.writeValueAsString(tableConf)
-        }
-        val rows = addCoordinatedCommitToMetadataRow(
-          engine,
-          commitFilePath,
-          tableSpark.getSnapshotAsOfVersion(engine, version).asInstanceOf[SnapshotImpl],
-          Map(
-            TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> coordinatorName,
-            TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> coordinatorConf,
-            TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> tableConfString))
-        writeConvertToCCCommit(engine, logPath, rows, version)
-      } else if (version > versionConvertToCC) {
-        val rows = getRowsFromFile(engine, commitFilePath)
-        commit(logPath.toString, tableConf, version, version, rows, handler)
-        if (version >= versionToDelete) {
-          logPath.getFileSystem(hadoopConf).delete(commitFilePath)
-        }
-      } else if (version == checkpointVersion) {
-        tableSpark.checkpoint(engine, version)
-      }
-    }
-  }
-
-  def testWithCoordinatorCommits(
-    testName: String,
-    hadoopConf: Map[String, String] = Map.empty)(f: (String, Engine) => Unit): Unit = {
-    test(testName) {
-      InMemoryCommitCoordinatorBuilder.clearInMemoryInstances()
-      withTempDirAndEngine(f, hadoopConf)
-    }
-  }
-
-  testWithCoordinatorCommits("cold snapshot initialization", trackingInMemoryBatchSize10Config) {
-    (tablePath, engine) =>
-      setupCoordinatedCommitFilesForTest(engine, tablePath)
-
-      val table = Table.forPath(engine, tablePath)
-      for (version <- 0L to 1L) {
-        val snapshot = table.getSnapshotAsOfVersion(engine, version)
-        val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
-        checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
-      }
-
-      TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
-      val snapshot2 = table.getLatestSnapshot(engine)
-      val result2 = readSnapshot(snapshot2, snapshot2.getSchema(engine), null, null, engine)
-      checkAnswer(result2, (0L to 29L).map(TestRow(_)))
-      assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
-  }
-
-  testWithCoordinatorCommits(
-    "snapshot read should use coordinated commit related properties properly",
-    Map(CommitCoordinatorProvider.getCommitCoordinatorNameConfKey("test-coordinator") ->
-      classOf[TestCommitCoordinatorBuilder].getName)) { (tablePath, engine) =>
-        setupCoordinatedCommitFilesForTest(
-          engine,
-          tablePath,
-          coordinatorName = "test-coordinator",
-          coordinatorConf = OBJ_MAPPER.writeValueAsString(
-            TestCommitCoordinator.EXP_COORDINATOR_CONF))
-
-        val table = Table.forPath(engine, tablePath)
-        val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
-        val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
-        checkAnswer(result, (0L to 29L).map(TestRow(_)))
-        assert(snapshot.getTableCommitCoordinatorClientHandlerOpt(engine).isPresent)
-        assert(
-          snapshot
-            .getTableCommitCoordinatorClientHandlerOpt(engine)
-            .get()
-            .semanticEquals(
-              engine.getCommitCoordinatorClientHandler(
-                "test-coordinator", TestCommitCoordinator.EXP_COORDINATOR_CONF)))
-  }
-
-  testWithCoordinatorCommits(
-    "snapshot read fails if we try to put bad value for COORDINATED_COMMITS_TABLE_CONF",
-    trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
-      setupCoordinatedCommitFilesForTest(
-        engine,
-        tablePath,
-        tableConfToOverwrite = """{"key1": "string_value", "key2Int": "2""")
-
-      val table = Table.forPath(engine, tablePath)
-      intercept[RuntimeException] {
-        table.getLatestSnapshot(engine)
-      }
-  }
-
-  testWithCoordinatorCommits(
-    "snapshot read with checkpoint before table converted to coordinated commit table",
-    trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
-      setupCoordinatedCommitFilesForTest(
-        engine,
-        tablePath,
-        versionConvertToCC = 2L,
-        coordinatedCommitNum = 2L,
-        checkpointVersion = 1L)
-
-      val table = Table.forPath(engine, tablePath)
-      for (version <- 0L to 2L) {
-        val snapshot = table.getSnapshotAsOfVersion(engine, version)
-        val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
-        checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
-      }
-
-      TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
-      val snapshot3 = table.getLatestSnapshot(engine)
-      val result3 = readSnapshot(snapshot3, snapshot3.getSchema(engine), null, null, engine)
-      checkAnswer(result3, (0L to 39L).map(TestRow(_)))
-      assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
-  }
-
-  testWithCoordinatorCommits(
-    "snapshot read with overlap between filesystem based commits and coordinated commits",
-    trackingInMemoryBatchSize10Config) { (tablePath, engine) =>
-      setupCoordinatedCommitFilesForTest(
-        engine,
-        tablePath,
-        versionConvertToCC = 2L,
-        coordinatedCommitNum = 4L,
-        deleteVersion = 4L)
-
-      val table = Table.forPath(engine, tablePath)
-      for (version <- 0L to 4L) {
-        val snapshot = table.getSnapshotAsOfVersion(engine, version)
-        val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
-        checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
-      }
-
-      TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
-      val snapshot5 = table.getLatestSnapshot(engine)
-      val result5 = readSnapshot(snapshot5, snapshot5.getSchema(engine), null, null, engine)
-      checkAnswer(result5, (0L to 59L).map(TestRow(_)))
-      assert(TrackingCommitCoordinatorClient.numGetCommitsCalled.get === 1)
-  }
-
-  testWithCoordinatorCommits(
-    "getSnapshotAt with coordinated commits enabled", trackingInMemoryBatchSize10Config) {
-      (tablePath, engine) =>
-        setupCoordinatedCommitFilesForTest(
-          engine, tablePath, versionConvertToCC = 2L)
-
-        val table = Table.forPath(engine, tablePath)
-        for (version <- 0L to 4L) {
-          val snapshot = table.getSnapshotAsOfVersion(engine, version)
-          val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine)
-          checkAnswer(result, (0L to version * 10L + 9L).map(TestRow(_)))
-        }
-  }
-
-  testWithCoordinatorCommits(
-    "versionToLoad higher than possible", trackingInMemoryBatchSize10Config) {
-      (tablePath, engine) =>
-        setupCoordinatedCommitFilesForTest(
-          engine, tablePath, versionConvertToCC = 2L)
-        val table = Table.forPath(engine, tablePath)
-        val e = intercept[RuntimeException] {
-          table.getSnapshotAsOfVersion(engine, 5L)
-        }
-        assert(e.getMessage.contains(
-          "Cannot load table version 5 as it does not exist. The latest available version is 4"))
-  }
-
-  def getRowsFromFile(engine: Engine, delta: Path): CloseableIterator[Row] = {
-    val file = FileStatus.of(delta.toString, 0, 0)
-    val columnarBatches =
-      engine.getJsonHandler.readJsonFiles(
-        singletonCloseableIterator(file),
-        SingleAction.FULL_SCHEMA,
-        Optional.empty())
-
-
-    var allRowsIterators = List.empty[Row]
-
-    while (columnarBatches.hasNext) {
-      val batch = columnarBatches.next()
-      val rows = batch.getRows
-      while (rows.hasNext) {
-        val row = rows.next()
-        allRowsIterators = allRowsIterators :+ row
-      }
-    }
-
-   toCloseableIterator(allRowsIterators.iterator.asJava)
-  }
-
-  def addCoordinatedCommitToMetadataRow(
-    engine: Engine,
-    delta: Path,
-    snapshot: SnapshotImpl,
-    configurations: Map[String, String]): CloseableIterator[Row] = {
-    var rows = getRowsFromFile(engine, delta)
-    val metadata = snapshot.getMetadata.withNewConfiguration(configurations.asJava)
-    var hasMetadataRow = false
-    rows = rows.map(row => {
-      val metadataOrd = row.getSchema.indexOf("metaData")
-      if (row.isNullAt(metadataOrd)) {
-        row
-      } else {
-        hasMetadataRow = true
-        createMetadataSingleAction(metadata.toRow)
-      }
-    })
-    if (!hasMetadataRow) {
-      toCloseableIterator((rows.toIterator ++ singletonCloseableIterator(
-        createMetadataSingleAction(metadata.toRow)).toIterator).asJava)
-    } else {
-      rows
-    }
-  }
-}
-
-object TestCommitCoordinator {
-  val EXP_TABLE_CONF: util.Map[String, String] = Map(
-    "tableKey1" -> "string_value",
-    "tableKey2Int" -> "2",
-    "tableKey3ComplexStr" -> "\"hello\""
-  ).asJava
-
-  val EXP_COORDINATOR_CONF: util.Map[String, String] = Map(
-    "coordinatorKey1" -> "string_value",
-    "coordinatorKey2Int" -> "2",
-    "coordinatorKey3ComplexStr" -> "\"hello\"").asJava
-
-  val COORDINATOR = new TestCommitCoordinatorClient()
-}
-
-/**
- * A [[CommitCoordinatorClient]] that tests can use to check the coordinator configuration and
- * table configuration.
- */
-class TestCommitCoordinatorClient extends InMemoryCommitCoordinator(10) {
-  override def registerTable(
-    logPath: Path,
-    tableIdentifier: Optional[TableIdentifier],
-    currentVersion: Long,
-    currentMetadata: AbstractMetadata,
-    currentProtocol: AbstractProtocol): util.Map[String, String] = {
-    super.registerTable(logPath, tableIdentifier, currentVersion, currentMetadata, currentProtocol)
-    TestCommitCoordinator.EXP_TABLE_CONF
-  }
-  override def getCommits(
-    tableDesc: TableDescriptor,
-    startVersion: lang.Long,
-    endVersion: lang.Long = null): GetCommitsResponse = {
-    checkArgument(tableDesc.getTableConf == TestCommitCoordinator.EXP_TABLE_CONF)
-    super.getCommits(tableDesc, startVersion, endVersion)
-  }
-  override def commit(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    commitVersion: Long,
-    actions: util.Iterator[String],
-    updatedActions: UpdatedActions): CommitResponse = {
-    checkArgument(tableDesc.getTableConf == TestCommitCoordinator.EXP_TABLE_CONF)
-    super.commit(logStore, hadoopConf, tableDesc, commitVersion, actions, updatedActions)
-  }
-}
-
-class TestCommitCoordinatorBuilder(
-  hadoopConf: Configuration) extends CommitCoordinatorBuilder(hadoopConf) {
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    checkArgument(conf == TestCommitCoordinator.EXP_COORDINATOR_CONF)
-    TestCommitCoordinator.COORDINATOR
-  }
-  override def getName: String = "test-coordinator"
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala
deleted file mode 100644
index 61379564770..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import io.delta.kernel.data.Row
-
-import java.{lang, util}
-import io.delta.storage.commit.{CommitCoordinatorClient, InMemoryCommitCoordinator, Commit => StorageCommit, CommitResponse => StorageCommitResponse, GetCommitsResponse => StorageGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions => StorageUpdatedActions}
-import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
-import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine}
-import io.delta.kernel.internal.actions.{CommitInfo, Format, Metadata, Protocol}
-import io.delta.kernel.internal.TableConfig
-import io.delta.kernel.internal.util.{CoordinatedCommitsUtils, FileNames, VectorUtils}
-import io.delta.kernel.internal.util.VectorUtils.{stringArrayValue, stringVector}
-import io.delta.kernel.utils.CloseableIterator
-import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse, UpdatedActions}
-import io.delta.kernel.types.{LongType, StringType, StructType}
-import io.delta.storage.LogStore
-import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import java.util.{Collections, Optional}
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
-
-trait CoordinatedCommitsTestUtils {
-
-  val hadoopConf = new Configuration()
-  def getEmptyMetadata: Metadata = {
-    new Metadata(
-      util.UUID.randomUUID().toString,
-      Optional.empty(),
-      Optional.empty(),
-      new Format(),
-      "",
-      null,
-      stringArrayValue(Collections.emptyList()),
-      Optional.empty(),
-      VectorUtils.stringStringMapValue(Collections.emptyMap())
-    )
-  }
-
-  def getProtocol(minReaderVersion: Int, minWriterVersion: Int): Protocol = {
-    new Protocol(
-      minReaderVersion, minWriterVersion, Collections.emptyList(), Collections.emptyList())
-  }
-
-  def getCommitInfo(newTimestamp: Long): CommitInfo = {
-    new CommitInfo(
-      Optional.of(newTimestamp),
-      -1,
-      null,
-      null,
-      Collections.emptyMap(),
-      true,
-      null,
-      Collections.emptyMap()
-    )
-  }
-
-  def commit(
-    logPath: String,
-    tableConf: util.Map[String, String],
-    version: Long,
-    timestamp: Long,
-    commit: CloseableIterator[Row],
-    commitCoordinatorClientHandler: CommitCoordinatorClientHandler): Commit = {
-    val updatedCommitInfo = getCommitInfo(timestamp)
-    val updatedActions = if (version == 0) {
-      getUpdatedActionsForZerothCommit(updatedCommitInfo)
-    } else {
-      getUpdatedActionsForNonZerothCommit(updatedCommitInfo)
-    }
-    commitCoordinatorClientHandler.commit(
-      logPath,
-      tableConf,
-      version,
-      commit,
-      updatedActions).getCommit
-  }
-
-  def getHadoopDeltaFile(logPath: Path, version: Long): Path = {
-    new Path(FileNames.deltaFile(new io.delta.kernel.internal.fs.Path(logPath.toString), version))
-  }
-
-  def writeConvertToCCCommit(
-    engine: Engine, logPath: Path, commit: CloseableIterator[Row], version: Long): Unit = {
-    createLogPath(engine, logPath)
-    engine.getJsonHandler.writeJsonFileAtomically(
-      getHadoopDeltaFile(logPath, version).toString,
-      commit,
-      true)
-  }
-
-  def createLogPath(engine: Engine, logPath: Path): Unit = {
-    // New table, create a delta log directory
-    if (!engine.getFileSystemClient.mkdirs(logPath.toString)) {
-      throw new RuntimeException("Failed to create delta log directory: " + logPath)
-    }
-  }
-
-  def getUpdatedActionsForZerothCommit(
-    commitInfo: CommitInfo,
-    oldMetadata: Metadata = getEmptyMetadata): UpdatedActions = {
-    val newMetadataConfiguration =
-      Map(TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "in-memory",
-        TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}")
-    val newMetadata = oldMetadata.withNewConfiguration(newMetadataConfiguration.asJava)
-    new UpdatedActions(
-      CoordinatedCommitsUtils.convertCommitInfoToAbstractCommitInfo(commitInfo),
-      CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(newMetadata),
-      CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(3, 7)),
-      CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(oldMetadata),
-      CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(getProtocol(3, 7)))
-  }
-
-  def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo): UpdatedActions = {
-    val updatedActions = getUpdatedActionsForZerothCommit(commitInfo)
-    new UpdatedActions(
-      updatedActions.getCommitInfo,
-      updatedActions.getNewMetadata,
-      updatedActions.getNewProtocol,
-      updatedActions.getNewMetadata, // oldMetadata is replaced with newMetadata
-      updatedActions.getOldProtocol)
-  }
-
-  def getVersionTimestampSchema: StructType = {
-    new StructType()
-      .add("version", LongType.LONG)
-      .add("timestamp", LongType.LONG)
-  }
-
-  def getCommitRows(engine: Engine, version: Long, timestamp: Long): CloseableIterator[Row] = {
-    val input = Seq(
-      s"""{
-        | "version":$version,
-        | "timestamp":$timestamp
-        |}
-        |""".stripMargin.linesIterator.mkString)
-
-    engine.getJsonHandler.parseJson(
-      stringVector(input.asJava), getVersionTimestampSchema, Optional.empty()).getRows
-  }
-}
-
-case class TrackingInMemoryCommitCoordinatorBuilder(hadoopConf: Configuration)
-  extends CommitCoordinatorBuilder(hadoopConf) {
-  override def getName: String = "tracking-in-memory"
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    new TrackingCommitCoordinatorClient(
-      new InMemoryCommitCoordinatorBuilder(hadoopConf).build(conf)
-        .asInstanceOf[InMemoryCommitCoordinator])
-  }
-}
-
-object TrackingCommitCoordinatorClient {
-  val numCommitsCalled = new AtomicInteger(0)
-  val numGetCommitsCalled = new AtomicInteger(0)
-  val numBackfillToVersionCalled = new AtomicInteger(0)
-  val numRegisterTableCalled = new AtomicInteger(0)
-
-  private val insideOperation = new ThreadLocal[Boolean] {
-    override def initialValue(): Boolean = false
-  }
-}
-
-class TrackingCommitCoordinatorClient(delegatingCommitCoordinatorClient: InMemoryCommitCoordinator)
-  extends CommitCoordinatorClient {
-
-  def recordOperation[T](op: String)(f: => T): T = {
-    val oldInsideOperation = TrackingCommitCoordinatorClient.insideOperation.get()
-    try {
-      if (!TrackingCommitCoordinatorClient.insideOperation.get()) {
-        op match {
-          case "commit" => TrackingCommitCoordinatorClient.numCommitsCalled.incrementAndGet()
-          case "getCommits" => TrackingCommitCoordinatorClient.numGetCommitsCalled.incrementAndGet()
-          case "backfillToVersion" =>
-            TrackingCommitCoordinatorClient.numBackfillToVersionCalled.incrementAndGet()
-          case "registerTable" =>
-            TrackingCommitCoordinatorClient.numRegisterTableCalled.incrementAndGet()
-          case _ => ()
-        }
-      }
-      TrackingCommitCoordinatorClient.insideOperation.set(true)
-      f
-    } finally {
-      TrackingCommitCoordinatorClient.insideOperation.set(oldInsideOperation)
-    }
-  }
-
-  override def commit(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    commitVersion: Long,
-    actions: util.Iterator[String],
-    updatedActions: StorageUpdatedActions): StorageCommitResponse = recordOperation("commit") {
-    delegatingCommitCoordinatorClient.commit(
-      logStore,
-      hadoopConf,
-      tableDesc,
-      commitVersion,
-      actions,
-      updatedActions)
-  }
-
-  override def getCommits(
-    tableDesc: TableDescriptor,
-    startVersion: lang.Long,
-    endVersion: lang.Long = null): StorageGetCommitsResponse = recordOperation("getCommits") {
-    delegatingCommitCoordinatorClient.getCommits(tableDesc, startVersion, endVersion)
-  }
-
-  override def backfillToVersion(
-    logStore: LogStore,
-    hadoopConf: Configuration,
-    tableDesc: TableDescriptor,
-    version: Long,
-    lastKnownBackfilledVersion: lang.Long): Unit = recordOperation("backfillToVersion") {
-    delegatingCommitCoordinatorClient.backfillToVersion(
-      logStore, hadoopConf, tableDesc, version, lastKnownBackfilledVersion)
-  }
-
-  override def semanticEquals(other: CommitCoordinatorClient): Boolean = this == other
-
-  def reset(): Unit = {
-    TrackingCommitCoordinatorClient.numCommitsCalled.set(0)
-    TrackingCommitCoordinatorClient.numGetCommitsCalled.set(0)
-    TrackingCommitCoordinatorClient.numBackfillToVersionCalled.set(0)
-  }
-
-  override def registerTable(
-    logPath: Path,
-    tableIdentifier: Optional[TableIdentifier],
-    currentVersion: Long,
-    currentMetadata: AbstractMetadata,
-    currentProtocol: AbstractProtocol):
-  util.Map[String, String] = recordOperation("registerTable") {
-    delegatingCommitCoordinatorClient.registerTable(
-      logPath, tableIdentifier, currentVersion, currentMetadata, currentProtocol)
-  }
-}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala
deleted file mode 100644
index 118029f6107..00000000000
--- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (2024) The Delta Lake Project Authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.delta.kernel.defaults.internal.coordinatedcommits
-
-import java.util
-
-import io.delta.storage.commit.{CommitCoordinatorClient, InMemoryCommitCoordinator}
-import org.apache.hadoop.conf.Configuration
-
-/**
- * The [[InMemoryCommitCoordinatorBuilder]] class is responsible for creating singleton instances of
- * [[InMemoryCommitCoordinator]] with the specified batchSize.
- *
- * For testing purposes, a test can clear the instances of [[InMemoryCommitCoordinator]] by calling
- * [[InMemoryCommitCoordinatorBuilder.clearInMemoryInstances()]], configure the
- * [[InMemoryCommitCoordinatorBuilder]] and batchSize in hadoopConf passed to the engine. In this
- * way, the [[InMemoryCommitCoordinator]] instances can be used by Kernel read and write across
- * the test.
- */
-class InMemoryCommitCoordinatorBuilder(hadoopConf: Configuration)
-  extends CommitCoordinatorBuilder(hadoopConf) {
-  private val batchSize =
-    hadoopConf.getLong(InMemoryCommitCoordinatorBuilder.BATCH_SIZE_CONF_KEY, 1)
-
-  /** Name of the commit-coordinator */
-  override def getName: String = "in-memory"
-
-  /** Returns a commit-coordinator based on the given conf */
-  override def build(conf: util.Map[String, String]): CommitCoordinatorClient = {
-    if (InMemoryCommitCoordinatorBuilder.batchSizeMap.containsKey(batchSize)) {
-      InMemoryCommitCoordinatorBuilder.batchSizeMap.get(batchSize)
-    } else {
-      val coordinator = new PredictableUuidInMemoryCommitCoordinatorClient(batchSize)
-      InMemoryCommitCoordinatorBuilder.batchSizeMap.put(batchSize, coordinator)
-      coordinator
-    }
-  }
-}
-
-/**
- * The [[InMemoryCommitCoordinatorBuilder]] companion object is responsible for storing the
- * singleton instances of [[InMemoryCommitCoordinator]] based on the batchSize. This is useful for
- * checking the state of the instances in UTs.
- */
-object InMemoryCommitCoordinatorBuilder {
-  val BATCH_SIZE_CONF_KEY = "delta.kernel.default.coordinatedCommits.inMemoryCoordinator.batchSize"
-  val batchSizeMap: util.Map[Long, InMemoryCommitCoordinator] =
-    new util.HashMap[Long, InMemoryCommitCoordinator]()
-
-  // Visible only for UTs
-  private[defaults] def clearInMemoryInstances(): Unit = {
-    batchSizeMap.clear()
-  }
-}
-
-class PredictableUuidInMemoryCommitCoordinatorClient(batchSize: Long)
-  extends InMemoryCommitCoordinator(batchSize) {
-
-  var nextUuidSuffix = 1L
-  override def generateUUID(): String = {
-    nextUuidSuffix += 1
-    s"uuid-${nextUuidSuffix - 1}"
-  }
-}