Skip to content

Commit

Permalink
[Kernel] Remove Coordinated Commits from public API (#3938)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

We are re-thinking the design of the Coordinated Commits table feature
(currently still in RFC). Thus, we should remove it from the public
Kernel API for Delta 3.3 release.

To summarize the changes of this PR

- I remove `getCommitCoordinatorClientHandler` from the `Engine`
interface
- I move various previously `public` CC interfaces and classes to be
`internal` now
- `SnapshotImpl::getTableCommitCoordinatorClientHandlerOpt` is hardcoded
to return an empty optional
- Delete failing test suites and unapplicable utils

## How was this patch tested?

Existing CI tests.

## Does this PR introduce _any_ user-facing changes?

We remove coordinated commits from the public kernel API.
  • Loading branch information
scottsand-db authored Dec 13, 2024
1 parent 2f5673e commit fc81d12
Show file tree
Hide file tree
Showing 26 changed files with 36 additions and 1,500 deletions.
22 changes: 0 additions & 22 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.delta.kernel.annotation.Evolving;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table.
Expand Down Expand Up @@ -58,27 +57,6 @@ public interface Engine {
*/
ParquetHandler getParquetHandler();

/**
* Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
*
* <p>{@link CommitCoordinatorClientHandler} helps Kernel perform commits to a table which is
* owned by a commit coordinator.
*
* @see <a
* href="https://github.com/delta-io/delta/blob/master/protocol_rfcs/managed-commits.md#sample-commit-owner-api">Coordinated
* commit protocol table feature</a>.
* <p>This method creates and returns an implementation of {@link
* CommitCoordinatorClientHandler} based on the provided name and configuration of the
* underlying commit coordinator client.
* @param name The identifier or name of the underlying commit coordinator client
* @param conf The configuration settings for the underlying commit coordinator client
* @return An implementation of {@link CommitCoordinatorClientHandler} configured for the
* specified client
* @since 3.3.0
*/
CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf);

/** Get the engine's {@link MetricsReporter} instances to push reports to. */
default List<MetricsReporter> getMetricsReporters() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.DomainMetadata;
Expand Down Expand Up @@ -180,18 +179,6 @@ public Optional<Long> getLatestTransactionVersion(Engine engine, String applicat
*/
public Optional<TableCommitCoordinatorClientHandler> getTableCommitCoordinatorClientHandlerOpt(
Engine engine) {
return COORDINATED_COMMITS_COORDINATOR_NAME
.fromMetadata(metadata)
.map(
commitCoordinatorStr -> {
CommitCoordinatorClientHandler handler =
engine.getCommitCoordinatorClientHandler(
commitCoordinatorStr,
COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata));
return new TableCommitCoordinatorClientHandler(
handler,
logPath.toString(),
COORDINATED_COMMITS_TABLE_CONF.fromMetadata(metadata));
});
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file. It contains the version of the commit, the file status of the
* commit, and the timestamp of the commit. This is used when we want to get the commit information
* from the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} and {@link
* io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits} APIs.
* from the {@link CommitCoordinatorClientHandler#commit} and {@link
* CommitCoordinatorClientHandler#getCommits} APIs.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.coordinatedcommits.*;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}
* Exception raised by {@link CommitCoordinatorClientHandler#commit}
*
* <pre>
* | retryable | conflict | meaning |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
* Response container for {@link CommitCoordinatorClientHandler#commit}.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import java.util.List;
import java.util.Map;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
* String, Map, Long, Long)}. Holds all the commits that have not been backfilled as per the commit
* coordinator.
* Response container for {@link CommitCoordinatorClientHandler#getCommits( String, Map, Long,
* Long)}. Holds all the commits that have not been backfilled as per the commit coordinator.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;

/**
* A container class to inform the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler}
* about any changes in Protocol/Metadata
* A container class to inform the {@link CommitCoordinatorClientHandler} about any changes in
* Protocol/Metadata
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@

import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.engine.coordinatedcommits.Commit;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.coordinatedcommits.Commit;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package io.delta.kernel.internal.snapshot;

import io.delta.kernel.data.Row;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler;
import io.delta.kernel.internal.coordinatedcommits.CommitFailedException;
import io.delta.kernel.internal.coordinatedcommits.CommitResponse;
import io.delta.kernel.internal.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.internal.coordinatedcommits.UpdatedActions;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.delta.kernel.internal.util;

import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import java.util.*;

public class CoordinatedCommitsUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import java.{lang => javaLang}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
import io.delta.kernel.engine.CommitCoordinatorClientHandler
import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse}
import io.delta.kernel.exceptions.InvalidTableException
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.actions.CommitInfo
import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile}
import io.delta.kernel.internal.coordinatedcommits.{Commit, CommitCoordinatorClientHandler, CommitResponse, GetCommitsResponse}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager, TableCommitCoordinatorClientHandler}
import io.delta.kernel.internal.util.{FileNames, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.kernel.test
import io.delta.kernel.engine._
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
import io.delta.kernel.expressions.{Column, Expression, ExpressionEvaluator, Predicate, PredicateEvaluator}
import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
import io.delta.kernel.types.{DataType, StructType}
import io.delta.kernel.utils.{CloseableIterator, DataFileStatus, FileStatus}

Expand Down Expand Up @@ -70,11 +71,6 @@ trait MockEngineUtils {
override def getParquetHandler: ParquetHandler =
Option(parquetHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))

override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
CommitCoordinatorClientHandler =
Option(commitCoordinatorClientHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))
}
}
}
Expand Down
Loading

0 comments on commit fc81d12

Please sign in to comment.