From e3e72751f57347975843b8afa772d02cc988ab48 Mon Sep 17 00:00:00 2001 From: Qianru Lao Date: Tue, 25 Jun 2024 11:06:10 -0700 Subject: [PATCH] [Kernel] Add an end2end prototype of Coordinated Commit read support in kernel --- build.sbt | 2 +- .../java/io/delta/kernel/commit/Commit.java | 57 ++++ .../kernel/commit/CommitFailedException.java | 55 +++ .../delta/kernel/commit/CommitResponse.java | 33 ++ .../kernel/commit/GetCommitsResponse.java | 44 +++ .../delta/kernel/commit/UpdatedActions.java | 70 ++++ .../commit/actions/AbstractCommitInfo.java | 31 ++ .../commit/actions/AbstractMetadata.java | 68 ++++ .../commit/actions/AbstractProtocol.java | 46 +++ .../CommitCoordinatorClientHandler.java | 154 +++++++++ .../java/io/delta/kernel/engine/Engine.java | 11 + .../delta/kernel/internal/SnapshotImpl.java | 11 +- .../io/delta/kernel/internal/TableConfig.java | 86 +++++ .../kernel/internal/actions/CommitInfo.java | 23 ++ .../kernel/internal/actions/Metadata.java | 22 +- .../kernel/internal/actions/Protocol.java | 46 +++ .../internal/snapshot/SnapshotManager.java | 140 +++++++- .../delta/kernel/internal/util/FileNames.java | 29 +- .../internal/SnapshotManagerSuite.scala | 24 +- .../delta/kernel/test/MockEngineUtils.scala | 8 +- ...DefaultCommitCoordinatorClientHandler.java | 292 ++++++++++++++++ .../kernel/defaults/engine/DefaultEngine.java | 9 + ...tchBackfillingCommitCoordinatorClient.java | 216 ++++++++++++ .../CommitCoordinatorBuilder.java | 29 ++ .../CommitCoordinatorProvider.java | 62 ++++ .../CoordinatedCommitsUtils.java | 182 ++++++++++ .../InMemoryCommitCoordinator.java | 252 ++++++++++++++ .../InMemoryCommitCoordinatorBuilder.java | 42 +++ .../LogReplayEngineMetricsSuite.scala | 6 +- .../CommitCoordinatorClientSuite.scala | 321 ++++++++++++++++++ .../CoordinatedCommitsSuite.scala | 296 ++++++++++++++++ .../CoordinatedCommitsTestUtils.scala | 212 ++++++++++++ .../InMemoryCommitCoordinatorSuite.scala | 178 ++++++++++ .../commit/CommitCoordinatorClient.java | 5 +- 34 files changed, 3031 insertions(+), 31 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitFailedException.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitResponse.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/GetCommitsResponse.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/UpdatedActions.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractCommitInfo.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractMetadata.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractProtocol.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsUtils.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinator.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.java create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorClientSuite.scala create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorSuite.scala diff --git a/build.sbt b/build.sbt index 3928ec9c3c5..98a18d9be0f 100644 --- a/build.sbt +++ b/build.sbt @@ -453,8 +453,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api")) libraryDependencies ++= Seq( "org.roaringbitmap" % "RoaringBitmap" % "0.9.25", "org.slf4j" % "slf4j-api" % "1.7.36", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", - "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "junit" % "junit" % "4.13.2" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java new file mode 100644 index 00000000000..daecc9ee726 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java @@ -0,0 +1,57 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit; + +import io.delta.kernel.utils.FileStatus; + +/** + * Representation of a commit file + */ +public class Commit { + + private long version; + + private FileStatus fileStatus; + + private long commitTimestamp; + + public Commit(long version, FileStatus fileStatus, long commitTimestamp) { + this.version = version; + this.fileStatus = fileStatus; + this.commitTimestamp = commitTimestamp; + } + + public long getVersion() { + return version; + } + + public FileStatus getFileStatus() { + return fileStatus; + } + + public long getCommitTimestamp() { + return commitTimestamp; + } + + public Commit withFileStatus(FileStatus fileStatus) { + return new Commit(version, fileStatus, commitTimestamp); + } + + public Commit withCommitTimestamp(long commitTimestamp) { + return new Commit(version, fileStatus, commitTimestamp); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitFailedException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitFailedException.java new file mode 100644 index 00000000000..d86d006dac8 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitFailedException.java @@ -0,0 +1,55 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit; + +/** + * Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} + * + *
+ *  | retryable | conflict  | meaning                                                         |
+ *  |   no      |   no      | something bad happened (e.g. auth failure)                      |
+ *  |   no      |   yes     | permanent transaction conflict (e.g. multi-table commit failed) |
+ *  |   yes     |   no      | transient error (e.g. network hiccup)                           |
+ *  |   yes     |   yes     | physical conflict (allowed to rebase and retry)                 |
+ *  
+ */ +public class CommitFailedException extends Exception { + + private boolean retryable; + + private boolean conflict; + + private String message; + + public CommitFailedException(boolean retryable, boolean conflict, String message) { + this.retryable = retryable; + this.conflict = conflict; + this.message = message; + } + + public boolean getRetryable() { + return retryable; + } + + public boolean getConflict() { + return conflict; + } + + public String getMessage() { + return message; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitResponse.java new file mode 100644 index 00000000000..774e19795f4 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/CommitResponse.java @@ -0,0 +1,33 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit; + +/** + * Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}. + */ +public class CommitResponse { + + private Commit commit; + + public CommitResponse(Commit commit) { + this.commit = commit; + } + + public Commit getCommit() { + return commit; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/GetCommitsResponse.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/GetCommitsResponse.java new file mode 100644 index 00000000000..e7e3b1ed957 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/GetCommitsResponse.java @@ -0,0 +1,44 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit; + +import java.util.List; +import java.util.Map; + +/** + * Response container for + * {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits( + * String, Map, Long, Long)}. + */ +public class GetCommitsResponse { + private List commits; + + private long latestTableVersion; + + public GetCommitsResponse(List commits, long latestTableVersion) { + this.commits = commits; + this.latestTableVersion = latestTableVersion; + } + + public List getCommits() { + return commits; + } + + public long getLatestTableVersion() { + return latestTableVersion; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/UpdatedActions.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/UpdatedActions.java new file mode 100644 index 00000000000..6f5df66749e --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/UpdatedActions.java @@ -0,0 +1,70 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit; + +import io.delta.kernel.commit.actions.AbstractCommitInfo; +import io.delta.kernel.commit.actions.AbstractMetadata; +import io.delta.kernel.commit.actions.AbstractProtocol; + +/** + * A container class to inform the CommitCoordinatorClientHandler about any changes in + * Protocol/Metadata + */ +public class UpdatedActions { + private AbstractCommitInfo commitInfo; + + private AbstractMetadata newMetadata; + + private AbstractProtocol newProtocol; + + private AbstractMetadata oldMetadata; + + private AbstractProtocol oldProtocol; + + public UpdatedActions( + AbstractCommitInfo commitInfo, + AbstractMetadata newMetadata, + AbstractProtocol newProtocol, + AbstractMetadata oldMetadata, + AbstractProtocol oldProtocol) { + this.commitInfo = commitInfo; + this.newMetadata = newMetadata; + this.newProtocol = newProtocol; + this.oldMetadata = oldMetadata; + this.oldProtocol = oldProtocol; + } + + public AbstractCommitInfo getCommitInfo() { + return commitInfo; + } + + public AbstractMetadata getNewMetadata() { + return newMetadata; + } + + public AbstractProtocol getNewProtocol() { + return newProtocol; + } + + public AbstractMetadata getOldMetadata() { + return oldMetadata; + } + + public AbstractProtocol getOldProtocol() { + return oldProtocol; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractCommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractCommitInfo.java new file mode 100644 index 00000000000..3db6a87e177 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractCommitInfo.java @@ -0,0 +1,31 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit.actions; + +/** + * Interface for objects that represents the base information for a commit. + * Commits need to provide an in-commit timestamp. This timestamp is used + * to specify the exact time the commit happened and determines the target + * version for time-based time travel queries. + */ +public interface AbstractCommitInfo { + + /** + * Get the timestamp of the commit as millis after the epoch. + */ + long getCommitTimestamp(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractMetadata.java new file mode 100644 index 00000000000..4465f92a3ad --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractMetadata.java @@ -0,0 +1,68 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit.actions; + +import java.util.List; +import java.util.Map; + +/** + * Interface for metadata actions in Delta. The metadata defines the metadata + * of the table. + */ +public interface AbstractMetadata { + + /** + * A unique table identifier. + */ + String getId(); + + /** + * User-specified table identifier. + */ + String getName(); + + /** + * User-specified table description. + */ + String getDescription(); + + /** The table provider format. */ + String getProvider(); + + /** The format options */ + Map getFormatOptions(); + + /** + * The table schema in string representation. + */ + String getSchemaString(); + + /** + * List of partition columns. + */ + List getPartitionColumns(); + + /** + * The table properties defined on the table. + */ + Map getConfiguration(); + + /** + * Timestamp for the creation of this metadata. + */ + Long getCreatedTime(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractProtocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractProtocol.java new file mode 100644 index 00000000000..0ad4d384f8d --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/commit/actions/AbstractProtocol.java @@ -0,0 +1,46 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.commit.actions; + +import java.util.Set; + +/** + * Interface for protocol actions in Delta. The protocol defines the requirements + * that readers and writers of the table need to meet. + */ +public interface AbstractProtocol { + + /** + * The minimum reader version required to read the table. + */ + int getMinReaderVersion(); + + /** + * The minimum writer version required to read the table. + */ + int getMinWriterVersion(); + + /** + * The reader features that need to be supported to read the table. + */ + Set getReaderFeatures(); + + /** + * The writer features that need to be supported to write the table. + */ + Set getWriterFeatures(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java new file mode 100644 index 00000000000..189f655a91b --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/CommitCoordinatorClientHandler.java @@ -0,0 +1,154 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.engine; + +import java.io.IOException; +import java.util.Map; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.commit.Commit; +import io.delta.kernel.commit.CommitFailedException; +import io.delta.kernel.commit.CommitResponse; +import io.delta.kernel.commit.GetCommitsResponse; +import io.delta.kernel.commit.UpdatedActions; +import io.delta.kernel.commit.actions.AbstractMetadata; +import io.delta.kernel.commit.actions.AbstractProtocol; +import io.delta.kernel.data.Row; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Provides coordinated commits related functionalities to Delta Kernel. + * + * @since 3.0.0 + */ +@Evolving +public interface CommitCoordinatorClientHandler { + + /** + * API to register the table represented by the given `logPath` at the provided + * currentTableVersion with the commit coordinator this commit coordinator client represents. + *

+ * This API is called when the table is being converted from a file system table to a + * coordinated-commit table. + *

+ * When a new coordinated-commit table is being created, the currentTableVersion will be -1 and + * the upgrade commit needs to be a file system commit which will write the backfilled file + * directly. + * + * @param logPath The path to the delta log of the table that should be converted + * @param currentVersion The currentTableVersion is the version of the table just before + * conversion. currentTableVersion + 1 represents the commit that + * will do the conversion. This must be backfilled atomically. + * currentTableVersion + 2 represents the first commit after conversion. + * This will go through the CommitCoordinatorClient and the client is + * free to choose when it wants to backfill this commit. + * @param currentMetadata The metadata of the table at currentTableVersion + * @param currentProtocol The protocol of the table at currentTableVersion + * @return A map of key-value pairs which is issued by the commit coordinator to identify the + * table. This should be stored in the table's metadata. This information needs to be + * passed to the {@link #commit}, {@link #getCommits}, and {@link #backfillToVersion} + * APIs to identify the table. + */ + Map registerTable( + String logPath, + long currentVersion, + AbstractMetadata currentMetadata, + AbstractProtocol currentProtocol); + + /** + * API to commit the given set of actions to the table represented by logPath at the + * given commitVersion. + * + * @param logPath The path to the delta log of the table that should be committed to. + * @param tableConf The table configuration that was returned by the commit coordinator + * client during registration. + * @param commitVersion The version of the commit that is being committed. + * @param actions The actions that need to be committed. + * @param updatedActions The commit info and any metadata or protocol changes that are made + * as part of this commit. + * @return CommitResponse which contains the file status of the committed commit file. If the + * commit is already backfilled, then the file status could be omitted from the response + * and the client could retrieve the information by itself. + */ + CommitResponse commit( + String logPath, + Map tableConf, + long commitVersion, + CloseableIterator actions, + UpdatedActions updatedActions) throws IOException, CommitFailedException; + + /** + * API to get the unbackfilled commits for the table represented by the given logPath. + * Commits older than startVersion or newer than endVersion (if given) are ignored. The + * returned commits are contiguous and in ascending version order. + * + * Note that the first version returned by this API may not be equal to startVersion. This + * happens when some versions starting from startVersion have already been backfilled and so + * the commit coordinator may have stopped tracking them. + * + * The returned latestTableVersion is the maximum commit version ratified by the commit + * coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit + * coordinator never ratified any version, i.e. it never accepted any unbackfilled commit. + * + * @param tablePath The path to the delta log of the table for which the unbackfilled + * commits should be retrieved. + * @param tableConf The table configuration that was returned by the commit coordinator + * during registration. + * @param startVersion The minimum version of the commit that should be returned. Can be null. + * @param endVersion The maximum version of the commit that should be returned. Can be null. + * @return GetCommitsResponse which has a list of {@link Commit}s and the latestTableVersion + * which is tracked by {@link CommitCoordinatorClientHandler}. + */ + GetCommitsResponse getCommits( + String tablePath, + Map tableConf, + Long startVersion, + Long endVersion); + + /** + * API to ask the commit coordinator client to backfill all commits up to {@code version} + * and notify the commit coordinator. + * + * If this API returns successfully, that means the backfill must have been completed, although + * the commit coordinator may not be aware of it yet. + * + * @param logPath The path to the delta log of the table that should be backfilled. + * @param tableConf The table configuration that was returned by the commit coordinator + * during registration. + * @param version The version till which the commit coordinator client should backfill. + * @param lastKnownBackfilledVersion The last known version that was backfilled before this API + * was called. If it is None or invalid, then the commit + * coordinator client should backfill from the beginning of + * the table. Can be null. + */ + void backfillToVersion( + String logPath, + Map tableConf, + long version, + Long lastKnownBackfilledVersion) throws IOException; + + /** + * Determines whether this CommitCoordinatorClient is semantically equal to another + * CommitCoordinatorClient. + * + * Semantic equality is determined by each CommitCoordinatorClient implementation based on + * whether the two instances can be used interchangeably when invoking any of the + * CommitCoordinatorClient APIs, such as {@link #commit}, {@link #getCommits}, etc. For example, + * both instances might be pointing to the same underlying endpoint. + */ + Boolean semanticEquals(CommitCoordinatorClientHandler other); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java index 3a7801dd797..c027c5c490a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java @@ -16,6 +16,8 @@ package io.delta.kernel.engine; +import java.util.Map; + import io.delta.kernel.annotation.Evolving; /** @@ -55,4 +57,13 @@ public interface Engine { * @return An implementation of {@link ParquetHandler}. */ ParquetHandler getParquetHandler(); + + /** + * Get the connector provided {@link CommitCoordinatorClientHandler}. + * + * @param name Name of the underlying commit coordinator client. + * @return An implementation of {@link CommitCoordinatorClientHandler}. + */ + CommitCoordinatorClientHandler getCommitCoordinatorClientHandler( + String name, Map conf); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 118f1dc20da..037b1dffc74 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -20,6 +20,7 @@ import io.delta.kernel.ScanBuilder; import io.delta.kernel.Snapshot; +import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.types.StructType; @@ -30,7 +31,7 @@ import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; -import static io.delta.kernel.internal.TableConfig.TOMBSTONE_RETENTION; +import static io.delta.kernel.internal.TableConfig.*; /** * Implementation of {@link Snapshot}. @@ -159,4 +160,12 @@ public long getTimestamp(Engine engine) { return logSegment.lastCommitTimestamp; } } + + public Optional getCommitCoordinatorClientHandlerOpt( + Engine engine) { + return COORDINATED_COMMITS_COORDINATOR_NAME.fromMetadata(metadata).map( + commitCoordinatorStr -> engine.getCommitCoordinatorClientHandler( + commitCoordinatorStr, + COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata))); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index b7c1586fc7a..117799da979 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -19,6 +19,10 @@ import java.util.function.Function; import java.util.function.Predicate; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.delta.kernel.exceptions.InvalidConfigurationValueException; import io.delta.kernel.exceptions.UnknownConfigurationException; import io.delta.kernel.internal.actions.Metadata; @@ -29,6 +33,8 @@ * from the table metadata. */ public class TableConfig { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + /** * The shortest duration we have to keep logically deleted data files around before deleting * them physically. @@ -105,6 +111,65 @@ public class TableConfig { "needs to be a long." ); + + /* + * This table property is used to track the commit-coordinator name for this table. If this + * property is not set, the table will be considered as file system table and commits will be + * done via atomically publishing the commit file. + */ + public static final TableConfig> COORDINATED_COMMITS_COORDINATOR_NAME = + new TableConfig<>( + "delta.coordinatedCommits.commitCoordinator-preview", + null, /* default values */ + Optional::ofNullable, + value -> true, + "The commit-coordinator name for this table. This is used to determine\n" + + "which implementation of commit-coordinator to use when committing\n" + + "to this table. If this property is not set, the table will be\n" + + "considered as file system table and commits will be done via\n" + + "atomically publishing the commit file.\n" + ); + + public static final TableConfig> COORDINATED_COMMITS_COORDINATOR_CONF = + new TableConfig<>( + "delta.coordinatedCommits.commitCoordinatorConf-preview", + null, /* default values */ + v -> { + if (v == null) { + return Collections.emptyMap(); + } + try { + return OBJECT_MAPPER.readValue( + v, new TypeReference>() {}); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }, + value -> true, + "A string-to-string map of configuration properties for the" + + " coordinated commits-coordinator." + ); + + public static final TableConfig> COORDINATED_COMMITS_TABLE_CONF = + new TableConfig<>( + "delta.coordinatedCommits.tableConf-preview", + null, /* default values */ + v -> { + if (v == null) { + return Collections.emptyMap(); + } + try { + return OBJECT_MAPPER.readValue( + v, new TypeReference>() {}); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }, + value -> true, + "A string-to-string map of configuration properties for" + + " describing the table to commit-coordinator." + ); + /** * All the valid properties that can be set on the table. */ @@ -115,6 +180,9 @@ public class TableConfig { addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED); addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION); addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP); + addConfig(this, COORDINATED_COMMITS_COORDINATOR_NAME); + addConfig(this, COORDINATED_COMMITS_COORDINATOR_CONF); + addConfig(this, COORDINATED_COMMITS_TABLE_CONF); }} ); @@ -158,6 +226,24 @@ public String getKey() { return key; } + /** + * Returns the default value of the table property. + * + * @return the default value of the table property + */ + public String getDefaultValue() { + return defaultValue; + } + + /** + * Returns the fromString function of the table property. + * + * @return the fromString function of the table property + */ + public Function getFromString() { + return fromString; + } + /** * Validates that the given properties have the delta prefix in the key name, and they are in * the set of valid properties. The caller should get the validated configurations and store the diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java index c229682b117..839e61f90d0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java @@ -105,6 +105,17 @@ public static CommitInfo fromColumnVector(ColumnVector vector, int rowId) { private final String txnId; private Optional inCommitTimestamp; + public static CommitInfo empty() { + return new CommitInfo( + Optional.empty(), + -1, + null, + null, + Collections.emptyMap(), + true, + null); + } + public CommitInfo( Optional inCommitTimestamp, long timestamp, @@ -122,6 +133,18 @@ public CommitInfo( this.txnId = txnId; } + public CommitInfo withTimestamp(long newTimestamp) { + return new CommitInfo( + Optional.of(newTimestamp), + newTimestamp, + this.engineInfo, + this.operation, + this.operationParameters, + this.isBlindAppend, + this.txnId + ); + } + public Optional getInCommitTimestamp() { return inCommitTimestamp; } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java index cb5cf95f0a3..483e65ab6b4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java @@ -19,15 +19,19 @@ import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; -import io.delta.kernel.data.*; +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; import io.delta.kernel.types.*; - import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.util.VectorUtils; import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; + public class Metadata { @@ -86,6 +90,20 @@ public static Metadata fromColumnVector( // Logical data schema excluding partition columns private final Lazy dataSchema; + public static Metadata empty() { + return new Metadata( + java.util.UUID.randomUUID().toString(), + Optional.empty(), + Optional.empty(), + new Format(), + "", + null, + stringArrayValue(Collections.emptyList()), + Optional.empty(), + VectorUtils.stringStringMapValue(Collections.emptyMap()) + ); + } + public Metadata( String id, Optional name, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index 564c9bd6471..d47dc98ab7f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -25,6 +25,7 @@ import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.internal.util.VectorUtils; import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; @@ -56,6 +57,29 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { private final int minWriterVersion; private final List readerFeatures; private final List writerFeatures; + private final Lazy> readerAndWriterFeatureNames; + + public static Protocol empty() { + return new Protocol(3, 7); + } + + private static boolean supportsReaderFeatures(int minReaderVersion) { + return minReaderVersion >= 3; + } + + private static boolean supportsWriterFeatures(int minWriterVersion) { + return minWriterVersion >= 7; + } + + public Protocol(int minReaderVersion, int minWriterVersion) { + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + this.readerFeatures = + supportsReaderFeatures(minReaderVersion) ? Collections.emptyList() : null; + this.writerFeatures = + supportsWriterFeatures(minWriterVersion) ? Collections.emptyList() : null; + this.readerAndWriterFeatureNames = getLazyReaderAndWriterFeatureNames(); + } public Protocol( int minReaderVersion, @@ -66,6 +90,7 @@ public Protocol( this.minWriterVersion = minWriterVersion; this.readerFeatures = readerFeatures; this.writerFeatures = writerFeatures; + this.readerAndWriterFeatureNames = getLazyReaderAndWriterFeatureNames(); } public int getMinReaderVersion() { @@ -125,4 +150,25 @@ public Protocol withNewWriterFeatures( this.readerFeatures == null ? null : new ArrayList<>(this.readerFeatures), newWriterFeatures); } + + /** + * Check if a `feature` is supported by this protocol. This means the protocol supports + * table features and references the feature. + */ + public boolean isFeatureSupported(String feature) { + return readerAndWriterFeatureNames.get().contains(feature); + } + + private Lazy> getLazyReaderAndWriterFeatureNames() { + return new Lazy<>(() -> { + Set names = new HashSet<>(); + if (readerFeatures != null) { + names.addAll(readerFeatures); + } + if (writerFeatures != null) { + names.addAll(writerFeatures); + } + return names; + }); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 102305d715d..c977aa8e212 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -19,6 +19,7 @@ import java.io.*; import java.nio.file.FileAlreadyExistsException; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -29,6 +30,8 @@ import org.slf4j.LoggerFactory; import io.delta.kernel.*; +import io.delta.kernel.commit.Commit; +import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.CheckpointAlreadyExistsException; import io.delta.kernel.exceptions.InvalidTableException; @@ -44,6 +47,7 @@ import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Tuple2; +import static io.delta.kernel.internal.TableConfig.COORDINATED_COMMITS_TABLE_CONF; import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore; import static io.delta.kernel.internal.fs.Path.getName; @@ -130,9 +134,15 @@ public Snapshot getSnapshotAt( Engine engine, long version) throws TableNotFoundException { + SnapshotImpl upperBoundSnapshot = getSnapshotAtInit(engine); + Optional logSegmentOpt = getLogSegmentForVersion(engine, Optional.empty(), /* startCheckpointOpt */ - Optional.of(version) /* versionToLoadOpt */); + Optional.of(version) /* versionToLoadOpt */, + upperBoundSnapshot.getCommitCoordinatorClientHandlerOpt(engine), + /* commitCoordinatorClientHandlerOpt */ + COORDINATED_COMMITS_TABLE_CONF.fromMetadata(upperBoundSnapshot.getMetadata()) + /* coordinatedCommitsTableConf */); return logSegmentOpt .map(logSegment -> createSnapshot(logSegment, engine)) @@ -285,7 +295,9 @@ private Optional> listFromOrNone( protected final Optional> listDeltaAndCheckpointFiles( Engine engine, long startVersion, - Optional versionToLoad) { + Optional versionToLoad, + Optional commitCoordinatorClientHandlerOpt, + Map coordinatedCommitsTableConf) { versionToLoad.ifPresent(v -> checkArgument( v >= startVersion, @@ -296,22 +308,34 @@ protected final Optional> listDeltaAndCheckpointFiles( )); logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad); - return listFromOrNone(engine, + List unbackfilledCommits = commitCoordinatorClientHandlerOpt + .map(commitCoordinatorClientHandler -> commitCoordinatorClientHandler + .getCommits( + logPath.toString(), + coordinatedCommitsTableConf, + startVersion, + versionToLoad.orElse(null)) + .getCommits()) + .orElse(Collections.emptyList()); + + + AtomicLong maxDeltaVersionSeen = new AtomicLong(startVersion - 1); + Optional> resultFromFsListingOpt = listFromOrNone(engine, startVersion).map(fileStatusesIter -> { final List output = new ArrayList<>(); while (fileStatusesIter.hasNext()) { final FileStatus fileStatus = fileStatusesIter.next(); + final String fileName = getName(fileStatus.getPath()); // Pick up all checkpoint and delta files - if (!isDeltaCommitOrCheckpointFile(getName(fileStatus.getPath()))) { + if (!isDeltaCommitOrCheckpointFile(fileName)) { continue; } // Checkpoint files of 0 size are invalid but may be ignored silently when read, // hence we drop them so that we never pick up such checkpoints. - if (FileNames.isCheckpointFile(getName(fileStatus.getPath())) && - fileStatus.getSize() == 0) { + if (FileNames.isCheckpointFile(fileName) && fileStatus.getSize() == 0) { continue; } @@ -333,11 +357,43 @@ protected final Optional> listDeltaAndCheckpointFiles( break; } + // Ideally listFromOrNone should return lexiographically sorted files amd so + // maxDeltaVersionSeen should be equal to fileVersion. + // But we are being defensive here and taking max of all the fileVersions seen. + if (FileNames.isCommitFile(fileName)) { + maxDeltaVersionSeen.set(Math.max( + maxDeltaVersionSeen.get(), + FileNames.deltaVersion(fileStatus.getPath()))); + } + output.add(fileStatus); } return output; }); + + if (!commitCoordinatorClientHandlerOpt.isPresent()) { + return resultFromFsListingOpt; + } + + List unbackfilledCommitsFiltered = new ArrayList<>(); + boolean dropConditionMet = false; + for (Commit commit : unbackfilledCommits) { + if (!dropConditionMet && commit.getVersion() <= maxDeltaVersionSeen.get()) { + continue; + } else { + dropConditionMet = true; + } + if (versionToLoad.isPresent() && commit.getVersion() > versionToLoad.get()) { + break; + } + unbackfilledCommitsFiltered.add(commit.getFileStatus()); + } + + return resultFromFsListingOpt.map(fsListing -> { + fsListing.addAll(unbackfilledCommitsFiltered); + return fsListing; + }); } /** @@ -353,15 +409,52 @@ private SnapshotImpl getSnapshotAtInit(Engine engine) logger.warn("{}: Last checkpoint file is missing or corrupted. " + "Will search for the checkpoint files directly.", tablePath); } - Optional logSegmentOpt = - getLogSegmentFrom(engine, lastCheckpointOpt); + Optional logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt); return logSegmentOpt - .map(logSegment -> createSnapshot( - logSegment, engine)) + .map(logSegment -> getUpdatedSnapshot( + engine, + Optional.empty(), /* oldSnapshotOpt */ + logSegment, + Optional.empty() /* initialCommitCoordinatorClientHandler */)) .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); } + private SnapshotImpl getUpdatedSnapshot( + Engine engine, + Optional oldSnapshotOpt, + LogSegment initialSegmentForNewSnapshot, + Optional initialCommitCoordinatorClientHandler) { + SnapshotImpl newSnapshot = createSnapshot(initialSegmentForNewSnapshot, engine); + + Optional newCommitCoordinatorClientHandlerOpt = + newSnapshot.getCommitCoordinatorClientHandlerOpt(engine); + boolean usedStaleCommitCoordinator = newCommitCoordinatorClientHandlerOpt + .map(newStore -> !initialCommitCoordinatorClientHandler.isPresent() || + !initialCommitCoordinatorClientHandler.get().semanticEquals(newStore)) + .orElse(false); + + if (usedStaleCommitCoordinator) { + Optional segmentOpt = getLogSegmentForVersion(engine, + Optional.empty(), /* startCheckpointOpt */ + newSnapshot.getLogSegment().checkpointVersionOpt /* versionToLoadOpt */, + newCommitCoordinatorClientHandlerOpt /* commitCoordinatorClientHandlerOpt */, + COORDINATED_COMMITS_TABLE_CONF.fromMetadata(newSnapshot.getMetadata()) + /* coordinatedCommitsTableConf */); + newSnapshot = segmentOpt + .map(segment -> { + if (oldSnapshotOpt.isPresent() + && oldSnapshotOpt.get().getLogSegment().equals(segment)) { + return oldSnapshotOpt.get(); + } else { + return createSnapshot(segment, engine); + } + }) + .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); + } + return newSnapshot; + } + private SnapshotImpl createSnapshot( LogSegment initSegment, Engine engine) { @@ -419,7 +512,9 @@ private Optional getLogSegmentFrom( Optional startingCheckpoint) { return getLogSegmentForVersion(engine, startingCheckpoint.map(x -> x.version), - Optional.empty()); + Optional.empty(), + Optional.empty(), + null); } /** @@ -441,7 +536,9 @@ private Optional getLogSegmentFrom( public Optional getLogSegmentForVersion( Engine engine, Optional startCheckpoint, - Optional versionToLoad) { + Optional versionToLoad, + Optional commitCoordinatorClientHandlerOpt, + Map coordinatedCommitsTableConf) { // Only use startCheckpoint if it is <= versionToLoad Optional startCheckpointToUse = startCheckpoint .filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get()); @@ -468,7 +565,12 @@ public Optional getLogSegmentForVersion( long startTimeMillis = System.currentTimeMillis(); final Optional> newFiles = - listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad); + listDeltaAndCheckpointFiles( + engine, + startVersion, + versionToLoad, + commitCoordinatorClientHandlerOpt, + coordinatedCommitsTableConf); logger.info("{}: Took {}ms to list the files after starting checkpoint", tablePath, System.currentTimeMillis() - startTimeMillis); @@ -478,7 +580,9 @@ public Optional getLogSegmentForVersion( return getLogSegmentForVersion(engine, startCheckpointToUse, versionToLoad, - newFiles); + newFiles, + commitCoordinatorClientHandlerOpt, + coordinatedCommitsTableConf); } finally { logger.info("{}: Took {}ms to construct a log segment", tablePath, @@ -494,7 +598,9 @@ protected Optional getLogSegmentForVersion( Engine engine, Optional startCheckpointOpt, Optional versionToLoadOpt, - Optional> filesOpt) { + Optional> filesOpt, + Optional commitCoordinatorClientHandlerOpt, + Map coordinatedCommitsTableConf) { final List newFiles; if (filesOpt.isPresent()) { newFiles = filesOpt.get(); @@ -532,7 +638,9 @@ protected Optional getLogSegmentForVersion( // DeltaLog singleton, so try listing from the first version return getLogSegmentForVersion(engine, Optional.empty(), - versionToLoadOpt); + versionToLoadOpt, + commitCoordinatorClientHandlerOpt, + coordinatedCommitsTableConf); } Tuple2, List> checkpointsAndDeltas = ListUtils diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index d8e015e603f..f9ff624842f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.UUID; import java.util.regex.Pattern; import io.delta.kernel.internal.fs.Path; @@ -29,6 +31,9 @@ private FileNames() {} private static final Pattern DELTA_FILE_PATTERN = Pattern.compile("\\d+\\.json"); + public static final Pattern UUID_DELTA_FILE_REGEX = + Pattern.compile("(\\d+)\\.([^\\.]+)\\.json"); + private static final Pattern CHECKPOINT_FILE_PATTERN = Pattern.compile( "(\\d+)\\.checkpoint((\\.\\d+\\.\\d+)?\\.parquet|\\.[^.]+\\.(json|parquet))"); @@ -43,6 +48,7 @@ private FileNames() {} Pattern.compile("(\\d+)\\.checkpoint\\.\\d+\\.\\d+\\.parquet"); public static final String SIDECAR_DIRECTORY = "_sidecars"; + public static final String COMMIT_SUBDIR = "_commits"; /** * Returns the delta (json format) path for a given delta file. @@ -51,6 +57,20 @@ public static String deltaFile(Path path, long version) { return String.format("%s/%020d.json", path, version); } + /** + * Returns the un-backfilled uuid formatted delta (json format) path for a given version. + * + * @param logPath The root path of the delta log. + * @param version The version of the delta file. + * @return The path to the un-backfilled delta file: logPath/_commits/version.uuid.json + */ + public static Path unbackfilledDeltaFile( + Path logPath, long version, Optional uuidString) { + Path basePath = commitDirPath(logPath); + String uuid = uuidString.orElse(UUID.randomUUID().toString()); + return new Path(basePath, String.format("%020d.%s.json", version, uuid)); + } + /** * Returns the version for the given delta path. */ @@ -161,7 +181,9 @@ public static boolean isV2CheckpointFile(String fileName) { public static boolean isCommitFile(String fileName) { - return DELTA_FILE_PATTERN.matcher(new Path(fileName).getName()).matches(); + String filename = new Path(fileName).getName(); + return DELTA_FILE_PATTERN.matcher(filename).matches() || + UUID_DELTA_FILE_REGEX.matcher(filename).matches(); } /** @@ -183,4 +205,9 @@ public static long getFileVersion(Path path) { ); } } + + /** Returns path to the sidecar directory */ + public static Path commitDirPath(Path logPath) { + return new Path(logPath, COMMIT_SUBDIR); + } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index 1cca46d51f7..a28600c971b 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -220,7 +220,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { new MockSidecarParquetHandler(expectedSidecars), new MockSidecarJsonHandler(expectedSidecars)), Optional.empty(), - versionToLoad + versionToLoad, + Optional.empty(), + null ) assert(logSegmentOpt.isPresent()) @@ -316,7 +318,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(files), startCheckpoint, - versionToLoad + versionToLoad, + Optional.empty(), + null ) } assert(e.getMessage.contains(expectedErrorMessageContains)) @@ -440,7 +444,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(Seq.empty), Optional.empty(), - Optional.empty() + Optional.empty(), + Optional.empty(), + null ) assert(!logSegmentOpt.isPresent()) } @@ -491,7 +497,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(listFrom(checkpointV)(_)), Optional.of(checkpointV), - Optional.empty() + Optional.empty(), + Optional.empty(), + null ) assert(logSegmentOpt.isPresent()) checkLogSegment( @@ -831,7 +839,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(deltas ++ corruptedCheckpoint ++ checkpoints), Optional.empty(), - Optional.empty() + Optional.empty(), + Optional.empty(), + null ) val checkpointVersion = validVersions.sorted.lastOption assert(logSegmentOpt.isPresent()) @@ -852,7 +862,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(Seq.empty), Optional.of(1), - Optional.empty() + Optional.empty(), + Optional.empty(), + null ) assert(!logSegmentOpt.isPresent()) } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala index dd49f2df15e..64ea445b4c9 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala @@ -52,7 +52,8 @@ trait MockEngineUtils { fileSystemClient: FileSystemClient = null, jsonHandler: JsonHandler = null, parquetHandler: ParquetHandler = null, - expressionHandler: ExpressionHandler = null): Engine = { + expressionHandler: ExpressionHandler = null, + commitCoordinatorClientHandler: CommitCoordinatorClientHandler = null): Engine = { new Engine() { override def getExpressionHandler: ExpressionHandler = Option(expressionHandler).getOrElse( @@ -69,6 +70,11 @@ trait MockEngineUtils { override def getParquetHandler: ParquetHandler = Option(parquetHandler).getOrElse( throw new UnsupportedOperationException("not supported in this test suite")) + + override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]): + CommitCoordinatorClientHandler = + Option(commitCoordinatorClientHandler).getOrElse( + throw new UnsupportedOperationException("not supported in this test suite")) } } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java new file mode 100644 index 00000000000..975fddf68e9 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java @@ -0,0 +1,292 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.delta.storage.LogStore; +import io.delta.storage.commit.CommitCoordinatorClient; +import io.delta.storage.commit.CommitFailedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.commit.Commit; +import io.delta.kernel.commit.CommitResponse; +import io.delta.kernel.commit.GetCommitsResponse; +import io.delta.kernel.commit.UpdatedActions; +import io.delta.kernel.commit.actions.AbstractCommitInfo; +import io.delta.kernel.commit.actions.AbstractMetadata; +import io.delta.kernel.commit.actions.AbstractProtocol; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.CommitCoordinatorClientHandler; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorProvider; +import io.delta.kernel.defaults.internal.json.JsonUtils; +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; + +/** + * Default implementation of {@link CommitCoordinatorClientHandler} based on Hadoop APIs. + * It takes a Hadoop {@link Configuration} object to interact with the commit coordinator client. + * The following optional configurations can be set to customize the behavior of the client: + *

    + *
  • {@code io.delta.kernel.logStore..impl} - The class name of the custom + * {@link LogStore} implementation to use for operations on storage systems with the + * specified {@code scheme}. For example, to use a custom {@link LogStore} for S3 storage + * objects: + *
    {@code
    + *     
    + *       io.delta.kernel.logStore.s3.impl
    + *       com.example.S3LogStore
    + *     
    + *     }
    + * If not set, the default LogStore implementation for the scheme will be used. + *
  • + *
  • {@code delta.enableFastS3AListFrom} - Set to {@code true} to enable fast listing + * functionality when using a {@link LogStore} created for S3 storage objects. + *
  • + *
+ */ +public class DefaultCommitCoordinatorClientHandler implements CommitCoordinatorClientHandler { + private final Configuration hadoopConf; + private final CommitCoordinatorClient commitCoordinatorClient; + + /** + * Create an instance of the default {@link DefaultCommitCoordinatorClientHandler} + * implementation. + * + * @param hadoopConf Configuration to use. List of options to customize the behavior of + * the client can be found in the class documentation. + */ + public DefaultCommitCoordinatorClientHandler( + Configuration hadoopConf, String name, Map conf) { + this.hadoopConf = hadoopConf; + this.commitCoordinatorClient = CommitCoordinatorProvider + .getCommitCoordinatorClient(name, conf); + } + + @Override + public Map registerTable( + String logPath, + long currentVersion, + AbstractMetadata currentMetadata, + AbstractProtocol currentProtocol) { + return commitCoordinatorClient.registerTable( + new Path(logPath), + currentVersion, + (io.delta.storage.commit.actions.AbstractMetadata) currentMetadata, + (io.delta.storage.commit.actions.AbstractProtocol) currentProtocol); + } + + @Override + public CommitResponse commit( + String logPath, + Map tableConf, + long commitVersion, + CloseableIterator actions, + UpdatedActions updatedActions) + throws IOException, io.delta.kernel.commit.CommitFailedException { + Path path = new Path(logPath); + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); + try { + return convertCommitResponse(commitCoordinatorClient.commit( + logStore, + hadoopConf, + path, + tableConf, + commitVersion, + new Iterator() { + @Override + public boolean hasNext() { + return actions.hasNext(); + } + + @Override + public String next() { + return JsonUtils.rowToJson(actions.next()); + } + }, + convertUpdatedActions(updatedActions))); + } catch (CommitFailedException e) { + throw new io.delta.kernel.commit.CommitFailedException( + e.getRetryable(), e.getConflict(), e.getMessage()); + } + } + + @Override + public GetCommitsResponse getCommits( + String tablePath, + Map tableConf, + Long startVersion, + Long endVersion) { + return convertGetCommitsResponse(commitCoordinatorClient.getCommits( + new Path(tablePath), + tableConf, + startVersion, + endVersion)); + } + + @Override + public void backfillToVersion( + String logPath, + Map tableConf, + long version, + Long lastKnownBackfilledVersion) throws IOException { + Path path = new Path(logPath); + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); + commitCoordinatorClient.backfillToVersion( + logStore, + hadoopConf, + path, + tableConf, + version, + lastKnownBackfilledVersion); + } + + @Override + public Boolean semanticEquals(CommitCoordinatorClientHandler other) { + return commitCoordinatorClient.semanticEquals( + ((DefaultCommitCoordinatorClientHandler) other).getCommitCoordinatorClient()); + } + + public CommitCoordinatorClient getCommitCoordinatorClient() { + return commitCoordinatorClient; + } + + private io.delta.storage.commit.UpdatedActions convertUpdatedActions( + UpdatedActions updatedActions) { + if (updatedActions == null) { + return null; + } + return new io.delta.storage.commit.UpdatedActions( + convertAbstractCommitInfo(updatedActions.getCommitInfo()), + convertAbstractMetadata(updatedActions.getNewMetadata()), + convertAbstractProtocol(updatedActions.getNewProtocol()), + convertAbstractMetadata(updatedActions.getOldMetadata()), + convertAbstractProtocol(updatedActions.getOldProtocol())); + } + + private CommitResponse convertCommitResponse(io.delta.storage.commit.CommitResponse response) { + return new CommitResponse(convertCommit(response.getCommit())); + } + + private Commit convertCommit(io.delta.storage.commit.Commit commit) { + return new Commit( + commit.getVersion(), + convertFileStatus(commit.getFileStatus()), + commit.getCommitTimestamp()); + } + + private FileStatus convertFileStatus(org.apache.hadoop.fs.FileStatus hadoopFileStatus) { + return FileStatus.of( + hadoopFileStatus.getPath().toString(), + hadoopFileStatus.getLen(), + hadoopFileStatus.getModificationTime()); + } + + private GetCommitsResponse convertGetCommitsResponse( + io.delta.storage.commit.GetCommitsResponse response) { + List commits = response.getCommits().stream() + .map(this::convertCommit) + .collect(Collectors.toList()); + return new GetCommitsResponse(commits, response.getLatestTableVersion()); + } + + private io.delta.storage.commit.actions.AbstractMetadata convertAbstractMetadata( + AbstractMetadata metadata) { + return new io.delta.storage.commit.actions.AbstractMetadata() { + @Override + public String getId() { + return metadata.getId(); + } + + @Override + public String getName() { + return metadata.getName(); + } + + @Override + public String getDescription() { + return metadata.getDescription(); + } + + @Override + public String getProvider() { + return metadata.getProvider(); + } + + @Override + public Map getFormatOptions() { + return metadata.getFormatOptions(); + } + + @Override + public String getSchemaString() { + return metadata.getSchemaString(); + } + + @Override + public List getPartitionColumns() { + return metadata.getPartitionColumns(); + } + + @Override + public Map getConfiguration() { + return metadata.getConfiguration(); + } + + @Override + public Long getCreatedTime() { + return metadata.getCreatedTime(); + } + }; + } + + private io.delta.storage.commit.actions.AbstractProtocol convertAbstractProtocol( + AbstractProtocol protocol) { + return new io.delta.storage.commit.actions.AbstractProtocol() { + @Override + public int getMinReaderVersion() { + return protocol.getMinReaderVersion(); + } + + @Override + public int getMinWriterVersion() { + return protocol.getMinWriterVersion(); + } + + @Override + public Set getReaderFeatures() { + return protocol.getReaderFeatures(); + } + + @Override + public Set getWriterFeatures() { + return protocol.getWriterFeatures(); + } + }; + } + + private io.delta.storage.commit.actions.AbstractCommitInfo convertAbstractCommitInfo( + AbstractCommitInfo commitInfo) { + return commitInfo::getCommitTimestamp; + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java index 13a920dc38d..c1a22feb816 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java @@ -15,6 +15,8 @@ */ package io.delta.kernel.defaults.engine; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import io.delta.kernel.engine.*; @@ -50,6 +52,12 @@ public ParquetHandler getParquetHandler() { return new DefaultParquetHandler(hadoopConf); } + @Override + public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler( + String name, Map conf) { + return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf); + } + /** * Create an instance of {@link DefaultEngine}. * @@ -59,4 +67,5 @@ public ParquetHandler getParquetHandler() { public static DefaultEngine create(Configuration hadoopConf) { return new DefaultEngine(hadoopConf); } + } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.java new file mode 100644 index 00000000000..a3b0538b008 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.java @@ -0,0 +1,216 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; + +import io.delta.storage.CloseableIterator; +import io.delta.storage.LogStore; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.CommitCoordinatorClient; +import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.CommitResponse; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.UpdatedActions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract {@link CommitCoordinatorClient} which triggers backfills every n commits. + * - every commit version which satisfies `commitVersion % batchSize == 0` will trigger a backfill. + */ +public abstract class AbstractBatchBackfillingCommitCoordinatorClient + implements CommitCoordinatorClient { + + protected static final Logger logger = + LoggerFactory.getLogger(AbstractBatchBackfillingCommitCoordinatorClient.class); + + /** + * Size of batch that should be backfilled. So every commit version which satisfies + * `commitVersion % batchSize == 0` will trigger a backfill. + */ + protected long batchSize; + + /** + * Commit a given `commitFile` to the table represented by given `logPath` at the + * given `commitVersion` + */ + protected abstract CommitResponse commitImpl( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map coordinatedCommitsTableConf, + long commitVersion, + FileStatus commitFile, + long commitTimestamp) throws CommitFailedException; + + @Override + public CommitResponse commit( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map coordinatedCommitsTableConf, + long commitVersion, + Iterator actions, + UpdatedActions updatedActions) throws CommitFailedException, IOException { + Path tablePath = CoordinatedCommitsUtils.getTablePath(logPath); + if (commitVersion == 0) { + throw new CommitFailedException( + false, false, "Commit version 0 must go via filesystem."); + } + logger.info("Attempting to commit version {} on table {}", commitVersion, tablePath); + FileSystem fs = logPath.getFileSystem(hadoopConf); + if (batchSize <= 1) { + // Backfill until `commitVersion - 1` + logger.info("Making sure commits are backfilled until {}" + + " version for table {}", commitVersion - 1, tablePath); + backfillToVersion( + logStore, + hadoopConf, + logPath, + coordinatedCommitsTableConf, + commitVersion - 1, + null); + } + + // Write new commit file in _commits directory + FileStatus fileStatus = CoordinatedCommitsUtils.writeCommitFile( + logStore, hadoopConf, logPath.toString(), commitVersion, actions, generateUUID()); + + // Do the actual commit + long commitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp(); + CommitResponse commitResponse = + commitImpl( + logStore, + hadoopConf, + logPath, + coordinatedCommitsTableConf, + commitVersion, + fileStatus, + commitTimestamp); + + boolean mcToFsConversion = isCoordinatedCommitsToFSConversion( + commitVersion, updatedActions); + // Backfill if needed + if (batchSize <= 1) { + // Always backfill when batch size is configured as 1 + backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus); + Path targetFile = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, commitVersion); + FileStatus targetFileStatus = fs.getFileStatus(targetFile); + Commit newCommit = commitResponse.getCommit().withFileStatus(targetFileStatus); + return new CommitResponse(newCommit); + } else if (commitVersion % batchSize == 0 || mcToFsConversion) { + logger.info("Making sure commits are backfilled till {} version for table {}", + commitVersion, + tablePath); + backfillToVersion( + logStore, + hadoopConf, + logPath, + coordinatedCommitsTableConf, + commitVersion, + null); + } + logger.info("Commit {} done successfully on table {}", commitVersion, tablePath); + return commitResponse; + } + + @Override + public void backfillToVersion( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map coordinatedCommitsTableConf, + long version, + Long lastKnownBackfilledVersion) throws IOException { + // Confirm the last backfilled version by checking the backfilled delta file's existence. + if (lastKnownBackfilledVersion != null) { + try { + FileSystem fs = logPath.getFileSystem(hadoopConf); + if (!fs.exists(CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version))) { + lastKnownBackfilledVersion = null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + Long startVersion = null; + if (lastKnownBackfilledVersion != null) { + startVersion = lastKnownBackfilledVersion + 1; + } + GetCommitsResponse commitsResponse = + getCommits(logPath,coordinatedCommitsTableConf, startVersion, version); + for (Commit commit : commitsResponse.getCommits()) { + backfill(logStore, hadoopConf, logPath, commit.getVersion(), commit.getFileStatus()); + } + } + + protected String generateUUID() { + return UUID.randomUUID().toString(); + } + + /** Backfills a given `fileStatus` to `version`.json */ + protected void backfill( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + long version, + FileStatus fileStatus) throws IOException { + Path targetFile = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version); + logger.info("Backfilling commit " + fileStatus.getPath() + " to " + targetFile); + CloseableIterator commitContentIterator = logStore + .read(fileStatus.getPath(), hadoopConf); + try { + logStore.write( + targetFile, + commitContentIterator, + false, + hadoopConf); + registerBackfill(logPath, version); + } catch (FileAlreadyExistsException e) { + logger.info("The backfilled file " + targetFile + " already exists."); + } finally { + commitContentIterator.close(); + } + } + + /** + * Callback to tell the CommitCoordinator that all commits <= `backfilledVersion` are + * backfilled. + */ + protected abstract void registerBackfill(Path logPath, long backfilledVersion); + + private boolean isCoordinatedCommitsToFSConversion( + long commitVersion, UpdatedActions updatedActions) { + boolean oldMetadataHasCoordinatedCommits = + CoordinatedCommitsUtils + .getCommitCoordinatorName(updatedActions.getOldMetadata()).isPresent(); + boolean newMetadataHasCoordinatedCommits = + CoordinatedCommitsUtils + .getCommitCoordinatorName(updatedActions.getNewMetadata()).isPresent(); + return oldMetadataHasCoordinatedCommits + && !newMetadataHasCoordinatedCommits + && commitVersion > 0; + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java new file mode 100644 index 00000000000..26b1ca8a6d4 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.util.*; + +import io.delta.storage.commit.CommitCoordinatorClient; + +/** A builder interface for {@link CommitCoordinatorClient} */ +public interface CommitCoordinatorBuilder { + /** Name of the commit-coordinator */ + String getName(); + + /** Returns a commit-coordinator client based on the given conf */ + CommitCoordinatorClient build(Map conf); +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java new file mode 100644 index 00000000000..4ac31dad306 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorProvider.java @@ -0,0 +1,62 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.util.HashMap; +import java.util.Map; + +import io.delta.storage.commit.CommitCoordinatorClient; + +/** Factory to get the correct {@link CommitCoordinatorClient} for a table */ +public class CommitCoordinatorProvider { + // mapping from different commit-coordinator names to the corresponding + // {@link CommitCoordinatorBuilder}s. + private static final Map nameToBuilderMapping = + new HashMap<>(); + + /** + * Registers a new {@link CommitCoordinatorBuilder} with the {@link CommitCoordinatorProvider}. + */ + public static synchronized void registerBuilder( + CommitCoordinatorBuilder commitCoordinatorBuilder) { + String name = commitCoordinatorBuilder.getName(); + if (nameToBuilderMapping.containsKey(name)) { + throw new IllegalArgumentException( + "commit-coordinator: " + + name + + " already registered with builder " + + commitCoordinatorBuilder.getClass().getName()); + } else { + nameToBuilderMapping.put(name, commitCoordinatorBuilder); + } + } + + /** Returns a {@link CommitCoordinatorClient} for the given `name` and `conf` */ + public static synchronized CommitCoordinatorClient getCommitCoordinatorClient( + String name, Map conf) { + CommitCoordinatorBuilder builder = nameToBuilderMapping.get(name); + if (builder == null) { + throw new IllegalArgumentException("Unknown commit-coordinator: " + name); + } else { + return builder.build(conf); + } + } + + // Visible only for UTs + protected static synchronized void clearBuilders() { + nameToBuilderMapping.clear(); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsUtils.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsUtils.java new file mode 100644 index 00000000000..78f7f8ec8f2 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsUtils.java @@ -0,0 +1,182 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.io.IOException; +import java.util.*; +import java.util.function.Function; + +import io.delta.storage.LogStore; +import io.delta.storage.commit.actions.AbstractCommitInfo; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.internal.TableConfig; +import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.internal.util.VectorUtils; +import static io.delta.kernel.internal.TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME; + +public class CoordinatedCommitsUtils { + + /** + * Write a UUID-based commit file for the specified version to the table at logPath. + */ + public static FileStatus writeCommitFile( + LogStore logStore, + Configuration hadoopConf, + String logPath, + long commitVersion, + Iterator actions, + String uuid) throws IOException { + Path commitPath = new Path( + FileNames.unbackfilledDeltaFile( + new io.delta.kernel.internal.fs.Path(logPath), + commitVersion, + Optional.of(uuid)).toString()); + FileSystem fs = commitPath.getFileSystem(hadoopConf); + if (!fs.exists(commitPath.getParent())) { + fs.mkdirs(commitPath.getParent()); + } + logStore.write(commitPath, actions, false, hadoopConf); + return commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath); + } + + /** + * Get the table path from the provided log path. + */ + public static Path getTablePath(Path logPath) { + return logPath.getParent(); + } + + /** + * Helper method to recover the saved value of `tableConfig` from `abstractMetadata`. + * Return defaultValue if the key is not in the configuration. + */ + public static T fromAbstractMetadataAndTableConfig( + AbstractMetadata abstractMetadata, TableConfig tableConfig) { + Map conf = abstractMetadata.getConfiguration(); + String value = conf.getOrDefault(tableConfig.getKey(), tableConfig.getDefaultValue()); + Function fromString = tableConfig.getFromString(); + return fromString.apply(value); + } + + /** + * Get the commit coordinator name from the provided abstract metadata. + */ + public static Optional getCommitCoordinatorName(AbstractMetadata abstractMetadata) { + return fromAbstractMetadataAndTableConfig( + abstractMetadata, COORDINATED_COMMITS_COORDINATOR_NAME); + } + + /** + * Get the hadoop file path for the delta file for the specified version. + * + * @param logPath The root path of the delta log. + * @param version The version of the delta file. + * @return The hadoop file path for the delta file. + */ + public static Path getHadoopDeltaFile(Path logPath, long version) { + return new Path(FileNames + .deltaFile(new io.delta.kernel.internal.fs.Path(logPath.toString()), version)); + } + + public static AbstractMetadata convertMetadataToAbstractMetadata(Metadata metadata) { + return new AbstractMetadata() { + @Override + public String getId() { + return metadata.getId(); + } + + @Override + public String getName() { + return metadata.getName().orElse(null); + } + + @Override + public String getDescription() { + return metadata.getDescription().orElse(null); + } + + @Override + public String getProvider() { + return metadata.getFormat().getProvider(); + } + + @Override + public Map getFormatOptions() { + // Assuming Format class has a method to get format options + return metadata.getFormat().getOptions(); + } + + @Override + public String getSchemaString() { + // Assuming Metadata class has a method to get schema string + return metadata.getSchemaString(); + } + + @Override + public List getPartitionColumns() { + // Assuming Metadata class has a method to get partition columns + return VectorUtils.toJavaList(metadata.getPartitionColumns()); + } + + @Override + public Map getConfiguration() { + return metadata.getConfiguration(); + } + + @Override + public Long getCreatedTime() { + return metadata.getCreatedTime().orElse(null); + } + }; + } + + public static AbstractProtocol convertProtocolToAbstractProtocol(Protocol protocol) { + return new AbstractProtocol() { + @Override + public int getMinReaderVersion() { + return protocol.getMinReaderVersion(); + } + + @Override + public int getMinWriterVersion() { + return protocol.getMinWriterVersion(); + } + + @Override + public Set getReaderFeatures() { + return new HashSet<>(protocol.getReaderFeatures()); + } + + @Override + public Set getWriterFeatures() { + return new HashSet<>(protocol.getWriterFeatures()); + } + }; + } + + public static AbstractCommitInfo convertCommitInfoToAbstractCommitInfo(CommitInfo commitInfo) { + return () -> commitInfo.getInCommitTimestamp().orElse(commitInfo.getTimestamp()); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinator.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinator.java new file mode 100644 index 00000000000..52837f6dd49 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinator.java @@ -0,0 +1,252 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +import io.delta.storage.LogStore; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.CommitCoordinatorClient; +import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.CommitResponse; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.internal.util.Tuple2; + +public class InMemoryCommitCoordinator extends AbstractBatchBackfillingCommitCoordinatorClient { + + /** + * @param maxCommitVersion represents the max commit version known for the table. This is + * initialized at the time of pre-registration and updated whenever a + * commit is successfully added to the commit-coordinator. + * @param active represents whether this commit-coordinator has ratified any commit or not. + * |----------------------------|------------------|---------------------------| + * | State | maxCommitVersion | active | + * |----------------------------|------------------|---------------------------| + * | Table is pre-registered | currentVersion+1 | false | + * |----------------------------|------------------|---------------------------| + * | Table is pre-registered | X | true | + * | and more commits are done | | | + * |----------------------------|------------------|---------------------------| + */ + private ConcurrentHashMap perTableMap; + + public InMemoryCommitCoordinator(long batchSize) { + this.batchSize = batchSize; + this.perTableMap = new ConcurrentHashMap<>(); + } + + private class PerTableData { + private long maxCommitVersion; + private boolean active; + private TreeMap commitsMap; + private ReentrantReadWriteLock lock; + + PerTableData(long maxCommitVersion) { + this(maxCommitVersion, false); + } + + PerTableData(long maxCommitVersion, boolean active) { + this.maxCommitVersion = maxCommitVersion; + this.active = active; + this.commitsMap = new TreeMap<>(); + this.lock = new ReentrantReadWriteLock(); + } + + public void updateLastRatifiedCommit(long commitVersion) { + this.active = true; + this.maxCommitVersion = commitVersion; + } + + public long lastRatifiedCommitVersion() { + return this.active ? this.maxCommitVersion : -1; + } + + public long getMaxCommitVersion() { + return maxCommitVersion; + } + + public TreeMap getCommitsMap() { + return commitsMap; + } + } + + /** + * This method acquires a write lock, validates the commit version is next in line, + * updates commit maps, and releases the lock. + * + */ + @Override + protected CommitResponse commitImpl( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map coordinatedCommitsTableConf, + long commitVersion, + FileStatus commitFile, + long commitTimestamp) throws CommitFailedException { + Tuple2 ret = + addToMap(logPath, commitVersion, commitFile, commitTimestamp); + if (ret._2 != null) { + throw ret._2; + } else { + return ret._1; + } + } + + private Tuple2 addToMap( + Path logPath, + long commitVersion, + FileStatus commitFile, + long commitTimestamp) { + + return withWriteLock(logPath, () -> { + PerTableData tableData = perTableMap.get(logPath.toString()); + long expectedVersion = tableData.maxCommitVersion + 1; + if (commitVersion != expectedVersion) { + return new Tuple2<>(null, new CommitFailedException( + commitVersion < expectedVersion, + commitVersion < expectedVersion, + "Commit version " + + commitVersion + + " is not valid. Expected version: " + + expectedVersion)); + } + + Commit commit = new Commit(commitVersion, commitFile, commitTimestamp); + tableData.commitsMap.put(commitVersion, commit); + tableData.updateLastRatifiedCommit(commitVersion); + + logger.info("Added commit file " + commitFile.getPath() + " to commit-coordinator."); + return new Tuple2<>(new CommitResponse(commit), null); + }); + } + + @Override + public GetCommitsResponse getCommits( + Path logPath, + Map coordinatedCommitsTableConf, + Long startVersion, + Long endVersion) { + return withReadLock(logPath, () -> { + PerTableData tableData = perTableMap.get(logPath.toString()); + Optional startVersionOpt = Optional.ofNullable(startVersion); + Optional endVersionOpt = Optional.ofNullable(endVersion); + long effectiveStartVersion = startVersionOpt.orElse(0L); + // Calculate the end version for the range, or use the last key if endVersion is not + // provided + long effectiveEndVersion = endVersionOpt.orElseGet(() -> + tableData.commitsMap.isEmpty() + ? effectiveStartVersion : tableData.commitsMap.lastKey()); + SortedMap commitsInRange = tableData.commitsMap.subMap( + effectiveStartVersion, effectiveEndVersion + 1); + return new GetCommitsResponse( + new ArrayList<>(commitsInRange.values()), + tableData.lastRatifiedCommitVersion()); + }); + } + + @Override + protected void registerBackfill(Path logPath, long backfilledVersion) { + withWriteLock(logPath, () -> { + PerTableData tableData = perTableMap.get(logPath.toString()); + if (backfilledVersion > tableData.lastRatifiedCommitVersion()) { + throw new IllegalArgumentException( + "Unexpected backfill version: " + backfilledVersion + ". " + + "Max backfill version: " + tableData.getMaxCommitVersion()); + } + // Remove keys with versions less than or equal to 'untilVersion' + Iterator iterator = tableData.getCommitsMap().keySet().iterator(); + while (iterator.hasNext()) { + Long version = iterator.next(); + if (version <= backfilledVersion) { + iterator.remove(); + } else { + break; + } + } + return null; + }); + } + + @Override + public Map registerTable( + Path logPath, + long currentVersion, + AbstractMetadata currentMetadata, + AbstractProtocol currentProtocol) { + PerTableData newPerTableData = new PerTableData(currentVersion + 1); + perTableMap.compute(logPath.toString(), (key, existingData) -> { + if (existingData != null) { + if (existingData.lastRatifiedCommitVersion() != -1) { + throw new IllegalStateException( + "Table " + logPath + " already exists in the commit-coordinator."); + } + // If lastRatifiedCommitVersion is -1 i.e. the commit-coordinator has never + // attempted any commit for this table => this table was just pre-registered. If + // there is another pre-registration request for an older version, we reject it and + // table can't go backward. + if (currentVersion < existingData.getMaxCommitVersion()) { + throw new IllegalStateException( + "Table " + logPath + " already registered with commit-coordinator"); + } + } + return newPerTableData; + }); + return Collections.emptyMap(); + } + + @Override + public Boolean semanticEquals(CommitCoordinatorClient other) { + return this.equals(other); + } + + private T withReadLock(Path logPath, Supplier operation) { + PerTableData tableData = perTableMap.get(logPath.toString()); + if (tableData == null) { + throw new IllegalArgumentException("Unknown table " + logPath + "."); + } + ReentrantReadWriteLock.ReadLock lock = tableData.lock.readLock(); + lock.lock(); + try { + return operation.get(); + } finally { + lock.unlock(); + } + } + + private T withWriteLock(Path logPath, Supplier operation) { + PerTableData tableData = perTableMap.get(logPath.toString()); + if (tableData == null) { + throw new IllegalArgumentException("Unknown table " + logPath + "."); + } + ReentrantReadWriteLock.WriteLock lock = tableData.lock.writeLock(); + lock.lock(); + try { + return operation.get(); + } finally { + lock.unlock(); + } + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.java new file mode 100644 index 00000000000..dfb903098f7 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorBuilder.java @@ -0,0 +1,42 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits; + +import java.util.Map; + +import io.delta.storage.commit.CommitCoordinatorClient; + +import io.delta.kernel.internal.lang.Lazy; + +public class InMemoryCommitCoordinatorBuilder implements CommitCoordinatorBuilder { + private final long batchSize; + private Lazy inMemoryStore; + + public InMemoryCommitCoordinatorBuilder(long batchSize) { + this.batchSize = batchSize; + this.inMemoryStore = new Lazy<>(() -> new InMemoryCommitCoordinator(batchSize)); + } + + /** Name of the commit-coordinator */ + public String getName() { + return "in-memory"; + } + + /** Returns a commit-coordinator based on the given conf */ + public CommitCoordinatorClient build(Map conf) { + return inMemoryStore.get(); + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala index 0f54b810bf5..c3fe8acaca1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala @@ -18,7 +18,7 @@ package io.delta.kernel.defaults import java.io.File import io.delta.kernel.Table -import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} +import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.data.ColumnarBatch import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler} import io.delta.kernel.expressions.Predicate @@ -37,6 +37,7 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession import java.nio.file.Files +import java.util import java.util.Optional import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer @@ -365,6 +366,9 @@ class MetricsEngine(config: Configuration) extends Engine { override def getFileSystemClient: FileSystemClient = impl.getFileSystemClient override def getParquetHandler: MetricsParquetHandler = parquetHandler + + override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]): + CommitCoordinatorClientHandler = impl.getCommitCoordinatorClientHandler(name, conf) } /** diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorClientSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorClientSuite.scala new file mode 100644 index 00000000000..0efb4534cc2 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CommitCoordinatorClientSuite.scala @@ -0,0 +1,321 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.internal.coordinatedcommits + +import io.delta.kernel.defaults.DeltaTableWriteSuiteBase +import io.delta.kernel.internal.actions.Metadata +import io.delta.kernel.internal.TableConfig +import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitResponse, GetCommitsResponse, UpdatedActions} +import io.delta.storage.LogStore +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} + +import java.{lang, util} +import java.util.Collections +import scala.collection.convert.ImplicitConversions.`map AsScala` +import scala.collection.JavaConverters._ + +class CommitCoordinatorClientSuite extends DeltaTableWriteSuiteBase + with CoordinatedCommitsTestUtils { + + protected trait TestCommitCoordinatorClientBase extends CommitCoordinatorClient { + override def registerTable( + logPath: Path, + currentVersion: Long, + currentMetadata: AbstractMetadata, + currentProtocol: AbstractProtocol): util.Map[String, String] = { + throw new UnsupportedOperationException("Not implemented") + } + + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + commitVersion: Long, + actions: util.Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + throw new UnsupportedOperationException("Not implemented") + } + + override def getCommits( + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + startVersion: lang.Long, + endVersion: lang.Long = null): GetCommitsResponse = + new GetCommitsResponse(Collections.emptyList(), -1) + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + version: Long, + lastKnownBackfilledVersion: lang.Long): Unit = {} + + override def semanticEquals(other: CommitCoordinatorClient): lang.Boolean = this == other + } + + private class TestCommitCoordinatorClient1 extends TestCommitCoordinatorClientBase + private class TestCommitCoordinatorClient2 extends TestCommitCoordinatorClientBase + + test("registering multiple commit-coordinator builders with same name") { + CommitCoordinatorProvider.clearBuilders() + object Builder1 extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = null + override def getName: String = "builder-1" + } + object BuilderWithSameName extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = null + override def getName: String = "builder-1" + } + object Builder3 extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = null + override def getName: String = "builder-3" + } + CommitCoordinatorProvider.registerBuilder(Builder1) + intercept[Exception] { + CommitCoordinatorProvider.registerBuilder(BuilderWithSameName) + } + CommitCoordinatorProvider.registerBuilder(Builder3) + } + + test("getCommitCoordinator - builder returns same object") { + CommitCoordinatorProvider.clearBuilders() + object Builder1 extends CommitCoordinatorBuilder { + val cs1 = new TestCommitCoordinatorClient1() + val cs2 = new TestCommitCoordinatorClient2() + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + conf.getOrElse("url", "") match { + case "url1" => cs1 + case "url2" => cs2 + case _ => throw new IllegalArgumentException("Invalid url") + } + } + override def getName: String = "cs-x" + } + CommitCoordinatorProvider.registerBuilder(Builder1) + val cs1 = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-x", Map("url" -> "url1").asJava) + assert(cs1.isInstanceOf[TestCommitCoordinatorClient1]) + val cs1_again = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-x", Map("url" -> "url1").asJava) + assert(cs1 eq cs1_again) + val cs2 = CommitCoordinatorProvider + .getCommitCoordinatorClient("cs-x", Map("url" -> "url2", "a" -> "b").asJava) + assert(cs2.isInstanceOf[TestCommitCoordinatorClient2]) + // If builder receives a config which doesn't have expected params, then it can throw exception. + intercept[IllegalArgumentException] { + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-x", Map("url" -> "url3").asJava) + } + } + + test("getCommitCoordinatorClient - builder returns new object each time") { + CommitCoordinatorProvider.clearBuilders() + object Builder1 extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + conf.getOrElse("url", "") match { + case "url1" => new TestCommitCoordinatorClient1() + case _ => throw new IllegalArgumentException("Invalid url") + } + } + override def getName: String = "cs-name" + } + CommitCoordinatorProvider.registerBuilder(Builder1) + val cs1 = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-name", Map("url" -> "url1").asJava) + assert(cs1.isInstanceOf[TestCommitCoordinatorClient1]) + val cs1_again = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-name", Map("url" -> "url1").asJava) + assert(cs1 ne cs1_again) + } + + test("Coordinated Commit Related Properties from Metadata") { + CommitCoordinatorProvider.clearBuilders() + val m1 = Metadata.empty().withNewConfiguration( + Map( + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> + """{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""", + TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> + """{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""").asJava + ) + assert(TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(m1) === + Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava) + + assert(TableConfig.COORDINATED_COMMITS_TABLE_CONF.fromMetadata(m1) === + Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava) + + val m2 = Metadata.empty().withNewConfiguration( + Map( + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> + """{"key1": "string_value", "key2Int": "2""", + TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> + """{"key1": "string_value", "key2Int": "2""").asJava + ) + intercept[RuntimeException] { + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(m2) + } + intercept[RuntimeException] { + TableConfig.COORDINATED_COMMITS_TABLE_CONF.fromMetadata(m2) + } + } + + test("Semantic Equality works as expected on CommitCoordinatorClients") { + CommitCoordinatorProvider.clearBuilders() + class TestCommitCoordinatorClient(val key: String) extends TestCommitCoordinatorClientBase { + override def semanticEquals(other: CommitCoordinatorClient): lang.Boolean = + other.isInstanceOf[TestCommitCoordinatorClient] && + other.asInstanceOf[TestCommitCoordinatorClient].key == key + } + object Builder extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + new TestCommitCoordinatorClient(conf("key")) + } + override def getName: String = "cs-name" + } + CommitCoordinatorProvider.registerBuilder(Builder) + + // Different CommitCoordinator with same keys should be semantically equal. + val obj1 = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-name", Map("key" -> "url1").asJava) + val obj2 = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-name", Map("key" -> "url1").asJava) + assert(obj1 != obj2) + assert(obj1.semanticEquals(obj2)) + + // Different CommitCoordinator with different keys should be semantically unequal. + val obj3 = + CommitCoordinatorProvider.getCommitCoordinatorClient("cs-name", Map("key" -> "url2").asJava) + assert(obj1 != obj3) + assert(!obj1.semanticEquals(obj3)) + } + + test("Semantic Equality works as expected on CommitCoordinatorClientHandler") { + CommitCoordinatorProvider.clearBuilders() + class TestCommitCoordinatorClient(val key: String) extends TestCommitCoordinatorClientBase { + override def semanticEquals(other: CommitCoordinatorClient): lang.Boolean = + other.isInstanceOf[TestCommitCoordinatorClient] && + other.asInstanceOf[TestCommitCoordinatorClient].key == key + } + object Builder extends CommitCoordinatorBuilder { + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + new TestCommitCoordinatorClient(conf("key")) + } + override def getName: String = "cs-name" + } + CommitCoordinatorProvider.registerBuilder(Builder) + + withTempDirAndEngine { (tablePath, engine) => + // Different CommitCoordinatorHandler with same keys should be semantically equal. + val obj1 = engine.getCommitCoordinatorClientHandler("cs-name", Map("key" -> "url1").asJava) + val obj2 = engine.getCommitCoordinatorClientHandler("cs-name", Map("key" -> "url1").asJava) + assert(obj1 != obj2) + assert(obj1.semanticEquals(obj2)) + + // Different CommitCoordinator with different keys should be semantically unequal. + val obj3 = engine.getCommitCoordinatorClientHandler("cs-name", Map("key" -> "url2").asJava) + assert(obj1 != obj3) + assert(!obj1.semanticEquals(obj3)) + } + } + + test("CommitCoordinatorClientHandler works as expected") { + CommitCoordinatorProvider.clearBuilders() + val fileStatus = new FileStatus() + fileStatus.setPath(new Path("logPath")) + class TestCommitCoordinatorClient extends TestCommitCoordinatorClientBase { + override def registerTable( + logPath: Path, + currentVersion: Long, + currentMetadata: AbstractMetadata, + currentProtocol: AbstractProtocol): util.Map[String, String] = { + Map("tableKey" -> "tableValue").asJava + } + + override def getCommits( + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + startVersion: lang.Long, + endVersion: lang.Long = null): GetCommitsResponse = { + new GetCommitsResponse( + List(new Commit(-1, fileStatus, -1)).asJava, -1) + } + + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + commitVersion: Long, + actions: util.Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + new CommitResponse(new Commit(-1, fileStatus, -1)) + } + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + version: Long, + lastKnownBackfilledVersion: lang.Long): Unit = { + throw new UnsupportedOperationException( + "BackfillToVersion not implemented in TestCommitCoordinatorClient for %s".format(logPath)) + } + } + object Builder extends CommitCoordinatorBuilder { + lazy val coordinator = new TestCommitCoordinatorClient() + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + coordinator + } + override def getName: String = "cs-name" + } + CommitCoordinatorProvider.registerBuilder(Builder) + + withTempDirAndEngine { (tablePath, engine) => + // Different CommitCoordinatorHandler with same keys should be semantically equal. + val obj1 = engine.getCommitCoordinatorClientHandler("cs-name", Map("key" -> "url1").asJava) + val obj2 = CommitCoordinatorProvider.getCommitCoordinatorClient( + "cs-name", Map("key" -> "url1").asJava) + + assert( + obj1.registerTable("logPath", 1, null, null) === + obj2.registerTable(new Path("logPath"), 1, null, null)) + + assert( + obj1.getCommits("logPath", Collections.emptyMap(), 1, 2).getLatestTableVersion === + obj2.getCommits( + new Path("logPath"), Collections.emptyMap(), 1, 2).getLatestTableVersion) + + assert( + obj1.commit("logPath", Collections.emptyMap(), 1, null, null).getCommit.getVersion === + obj2 + .commit(null, null, new Path("logPath"), Collections.emptyMap(), 1, null, null) + .getCommit + .getVersion) + + val ex = intercept[UnsupportedOperationException] { + obj1.backfillToVersion("logPath", null, 1, null) + } + + assert( + ex.getMessage.contains( + "BackfillToVersion not implemented in TestCommitCoordinatorClient for logPath")) + } + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala new file mode 100644 index 00000000000..28571b00a22 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -0,0 +1,296 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.internal.coordinatedcommits + +import io.delta.kernel.defaults.DeltaTableWriteSuiteBase +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider +import io.delta.kernel.defaults.utils.TestRow +import io.delta.kernel.internal.TableConfig._ +import io.delta.kernel.Table +import io.delta.kernel.internal.{SnapshotImpl, TableConfig} +import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol} +import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse, UpdatedActions} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.LogStore +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import java.{lang, util} +import java.util.{Collections, Optional} +import scala.collection.convert.ImplicitConversions.`iterator asScala` +import scala.collection.JavaConverters._ + +class CoordinatedCommitsSuite extends DeltaTableWriteSuiteBase + with CoordinatedCommitsTestUtils { + + test("helper method that recovers config from abstract metadata works properly") { + val m1 = Metadata.empty.withNewConfiguration( + Map(COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "string_value").asJava + ) + assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig( + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m1), + COORDINATED_COMMITS_COORDINATOR_NAME) === Optional.of("string_value")) + + val m2 = Metadata.empty.withNewConfiguration( + Map(COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "").asJava + ) + assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig( + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m2), + COORDINATED_COMMITS_COORDINATOR_NAME) === Optional.of("")) + + val m3 = Metadata.empty.withNewConfiguration( + Map(COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> + """{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""").asJava + ) + assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig( + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m3), + COORDINATED_COMMITS_COORDINATOR_CONF) === + Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava) + } + + test("cold snapshot initialization") { + CommitCoordinatorProvider.clearBuilders() + val builder = TrackingInMemoryCommitCoordinatorBuilder(10) + val commitCoordinatorClient = + builder.build(Collections.emptyMap()).asInstanceOf[TrackingCommitCoordinatorClient] + CommitCoordinatorProvider.registerBuilder(builder) + withTempDirAndEngine { (tablePath, engine) => + val logPath = new Path("file:" + tablePath, "_delta_log") + val table = Table.forPath(engine, tablePath) + + spark.range(0, 10).write.format("delta").mode("overwrite").save(tablePath) // version 0 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (0L to 9L).map(TestRow(_))) + spark.range(10, 20).write.format("delta").mode("overwrite").save(tablePath) // version 1 + spark.range(20, 30).write.format("delta").mode("append").save(tablePath) // version 2 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (10L to 29L).map(TestRow(_))) + + var tableConf: util.Map[String, String] = null + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + + (0 to 2).foreach{ version => + val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version) + + var rows = logStore.read(delta, hadoopConf).toList + + if (version == 0) { + rows = addCoordinatedCommitToMetadataRow( + logStore.read(delta, hadoopConf).toList, + Map( + TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "tracking-in-memory", + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}")) + tableConf = commitCoordinatorClient.registerTable( + logPath, + -1L, + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1))) + writeCommitZero(engine, logPath, rows.asJava) + } else { + commit(logPath, tableConf, version, version, rows.asJava, commitCoordinatorClient) + logPath.getFileSystem(hadoopConf).delete( + CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version)) + } + } + val snapshot0 = table.getSnapshotAsOfVersion(engine, 0) + val result0 = readSnapshot(snapshot0, snapshot0.getSchema(engine), null, null, engine) + checkAnswer(result0, (0L to 9L).map(TestRow(_))) + + val snapshot1 = table.getSnapshotAsOfVersion(engine, 1) + val result1 = readSnapshot(snapshot1, snapshot1.getSchema(engine), null, null, engine) + checkAnswer(result1, (10L to 19L).map(TestRow(_))) + + commitCoordinatorClient.numGetCommitsCalled.set(0) + val snapshot2 = table.getLatestSnapshot(engine) + val result2 = readSnapshot(snapshot2, snapshot2.getSchema(engine), null, null, engine) + checkAnswer(result2, (10L to 29L).map(TestRow(_))) + assert(commitCoordinatorClient.numGetCommitsCalled.get === 1) + } + } + + test("snapshot read should use coordinated commit related properties properly") { + CommitCoordinatorProvider.clearBuilders() + val expTableConf: util.Map[String, String] = Map( + "tableKey1" -> "string_value", + "tableKey2Int" -> "2", + "tableKey3ComplexStr" -> "\"hello\"").asJava + + val expCoordinatorConf: util.Map[String, String] = Map( + "coordinatorKey1" -> "string_value", + "coordinatorKey2Int" -> "2", + "coordinatorKey3ComplexStr" -> "\"hello\"").asJava + class TestCommitCoordinatorClient extends InMemoryCommitCoordinator(10) { + override def registerTable( + logPath: Path, + currentVersion: Long, + currentMetadata: AbstractMetadata, + currentProtocol: AbstractProtocol): util.Map[String, String] = { + super.registerTable(logPath, currentVersion, currentMetadata, currentProtocol) + expTableConf + } + override def getCommits( + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + startVersion: lang.Long, + endVersion: lang.Long = null): GetCommitsResponse = { + assert(coordinatedCommitsTableConf === expTableConf) + super.getCommits(logPath, coordinatedCommitsTableConf, startVersion, endVersion) + } + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + commitVersion: Long, + actions: util.Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + assert(coordinatedCommitsTableConf === expTableConf) + super.commit(logStore, hadoopConf, logPath, coordinatedCommitsTableConf, + commitVersion, actions, updatedActions) + } + } + + object Builder extends CommitCoordinatorBuilder { + private lazy val coordinator = new TestCommitCoordinatorClient() + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + assert(conf === expCoordinatorConf) + coordinator + } + override def getName: String = "test-coordinator" + } + CommitCoordinatorProvider.registerBuilder(Builder) + val commitCoordinatorClient = Builder.build(expCoordinatorConf) + withTempDirAndEngine { (tablePath, engine) => + val logPath = new Path("file:" + tablePath, "_delta_log") + val table = Table.forPath(engine, tablePath) + + spark.range(0, 10).write.format("delta").mode("overwrite").save(tablePath) // version 0 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (0L to 9L).map(TestRow(_))) + spark.range(10, 20).write.format("delta").mode("overwrite").save(tablePath) // version 1 + spark.range(20, 30).write.format("delta").mode("append").save(tablePath) // version 2 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (10L to 29L).map(TestRow(_))) + + var tableConf: util.Map[String, String] = null + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + + (0 to 2).foreach{ version => + val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version) + + var rows = logStore.read(delta, hadoopConf).toList + + if (version == 0) { + rows = addCoordinatedCommitToMetadataRow( + rows, + Map( + TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "test-coordinator", + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> + OBJ_MAPPER.writeValueAsString(expCoordinatorConf), + TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> + OBJ_MAPPER.writeValueAsString(expTableConf))) + tableConf = commitCoordinatorClient.registerTable( + logPath, + -1L, + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1))) + writeCommitZero(engine, logPath, rows.asJava) + assert (tableConf === expTableConf) + } else { + commit(logPath, tableConf, version, version, rows.asJava, commitCoordinatorClient) + logPath.getFileSystem(hadoopConf).delete( + CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version)) + } + } + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val result = readSnapshot(snapshot, snapshot.getSchema(engine), null, null, engine) + checkAnswer(result, (10L to 29L).map(TestRow(_))) + assert(snapshot.getCommitCoordinatorClientHandlerOpt(engine).isPresent) + assert( + snapshot + .getCommitCoordinatorClientHandlerOpt(engine) + .get() + .semanticEquals( + engine.getCommitCoordinatorClientHandler("test-coordinator", expCoordinatorConf))) + } + } + + test("snapshot read fails if we try to put bad value for COORDINATED_COMMITS_TABLE_CONF") { + CommitCoordinatorProvider.clearBuilders() + val builder = TrackingInMemoryCommitCoordinatorBuilder(10) + val commitCoordinatorClient = + builder.build(Collections.emptyMap()).asInstanceOf[TrackingCommitCoordinatorClient] + CommitCoordinatorProvider.registerBuilder(builder) + withTempDirAndEngine { (tablePath, engine) => + val logPath = new Path("file:" + tablePath, "_delta_log") + val table = Table.forPath(engine, tablePath) + + spark.range(0, 10).write.format("delta").mode("overwrite").save(tablePath) // version 0 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (0L to 9L).map(TestRow(_))) + spark.range(10, 20).write.format("delta").mode("overwrite").save(tablePath) // version 1 + spark.range(20, 30).write.format("delta").mode("append").save(tablePath) // version 2 + checkAnswer( + spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)), + (10L to 29L).map(TestRow(_))) + + var tableConf: util.Map[String, String] = null + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + + (0 to 2).foreach{ version => + val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version) + + var rows = logStore.read(delta, hadoopConf).toList + + if (version == 0) { + rows = addCoordinatedCommitToMetadataRow( + logStore.read(delta, hadoopConf).toList, + Map( + TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "tracking-in-memory", + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}", + TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> + """{"key1": "string_value", "key2Int": "2""")) + tableConf = commitCoordinatorClient.registerTable( + logPath, + -1L, + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1))) + writeCommitZero(engine, logPath, rows.asJava) + } else { + commit(logPath, tableConf, version, version, rows.asJava, commitCoordinatorClient) + logPath.getFileSystem(hadoopConf).delete( + CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version)) + } + } + intercept[RuntimeException] { + table.getLatestSnapshot(engine) + } + } + } + + def addCoordinatedCommitToMetadataRow( + rows: List[String], configurations: Map[String, String]): List[String] = rows.map(row => { + if (row.contains("metaData")) row.replace( + "\"configuration\":{}", + "\"configuration\":" + OBJ_MAPPER.writeValueAsString(configurations.asJava)) else row + }) +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala new file mode 100644 index 00000000000..b2ee9f31f10 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsTestUtils.scala @@ -0,0 +1,212 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits + +import java.{lang, util} +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider +import io.delta.kernel.engine.Engine +import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol} +import io.delta.kernel.internal.TableConfig +import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitResponse, GetCommitsResponse, UpdatedActions} +import io.delta.storage.LogStore +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import java.util.concurrent.atomic.AtomicInteger +import java.util.Collections +import scala.collection.JavaConverters._ + +trait CoordinatedCommitsTestUtils { + + val hadoopConf = new Configuration() + def commit( + logPath: Path, + tableConf: util.Map[String, String], + version: Long, + timestamp: Long, + commit: util.List[String], + commitCoordinatorClient: CommitCoordinatorClient): Commit = { + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + val updatedCommitInfo = CommitInfo.empty().withTimestamp(timestamp) + val updatedActions = if (version == 0) { + getUpdatedActionsForZerothCommit(updatedCommitInfo) + } else { + getUpdatedActionsForNonZerothCommit(updatedCommitInfo) + } + commitCoordinatorClient.commit( + logStore, + hadoopConf, + logPath, + tableConf, + version, + commit.iterator(), + updatedActions).getCommit + } + + def writeCommitZero(engine: Engine, logPath: Path, commit: util.List[String]): Unit = { + createLogPath(engine, logPath) + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + logStore.write( + CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, 0), + commit.iterator(), + true, + hadoopConf) + } + + def createLogPath(engine: Engine, logPath: Path): Unit = { + // New table, create a delta log directory + if (!engine.getFileSystemClient.mkdirs(logPath.toString)) { + throw new RuntimeException("Failed to create delta log directory: " + logPath) + } + } + + def getUpdatedActionsForZerothCommit( + commitInfo: CommitInfo, + oldMetadata: Metadata = Metadata.empty()): UpdatedActions = { + val newMetadataConfiguration = + Map(TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "tracking-in-memory", + TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}") + val newMetadata = oldMetadata.withNewConfiguration(newMetadataConfiguration.asJava) + new UpdatedActions( + CoordinatedCommitsUtils.convertCommitInfoToAbstractCommitInfo(commitInfo), + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(newMetadata), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(Protocol.empty()), + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(oldMetadata), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(Protocol.empty())) + } + + def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo): UpdatedActions = { + val updatedActions = getUpdatedActionsForZerothCommit(commitInfo) + new UpdatedActions( + updatedActions.getCommitInfo, + updatedActions.getNewMetadata, + updatedActions.getNewProtocol, + updatedActions.getNewMetadata, // oldMetadata is replaced with newMetadata + updatedActions.getOldProtocol) + } +} + +case class TrackingInMemoryCommitCoordinatorBuilder( + batchSize: Long, + defaultCommitCoordinatorClientOpt: Option[CommitCoordinatorClient] = None) + extends CommitCoordinatorBuilder { + lazy val trackingInMemoryCommitCoordinatorClient = + defaultCommitCoordinatorClientOpt.getOrElse { + new TrackingCommitCoordinatorClient( + new InMemoryCommitCoordinator(batchSize)) + } + + override def getName: String = "tracking-in-memory" + override def build(conf: util.Map[String, String]): CommitCoordinatorClient = { + trackingInMemoryCommitCoordinatorClient + } +} + +object TrackingCommitCoordinatorClient { + private val insideOperation = new ThreadLocal[Boolean] { + override def initialValue(): Boolean = false + } +} + +class TrackingCommitCoordinatorClient(delegatingCommitCoordinatorClient: InMemoryCommitCoordinator) + extends CommitCoordinatorClient { + + val numCommitsCalled = new AtomicInteger(0) + val numGetCommitsCalled = new AtomicInteger(0) + val numBackfillToVersionCalled = new AtomicInteger(0) + val numRegisterTableCalled = new AtomicInteger(0) + + def recordOperation[T](op: String)(f: => T): T = { + val oldInsideOperation = TrackingCommitCoordinatorClient.insideOperation.get() + try { + if (!TrackingCommitCoordinatorClient.insideOperation.get()) { + op match { + case "commit" => numCommitsCalled.incrementAndGet() + case "getCommits" => numGetCommitsCalled.incrementAndGet() + case "backfillToVersion" => numBackfillToVersionCalled.incrementAndGet() + case "registerTable" => numRegisterTableCalled.incrementAndGet() + case _ => () + } + } + TrackingCommitCoordinatorClient.insideOperation.set(true) + f + } finally { + TrackingCommitCoordinatorClient.insideOperation.set(oldInsideOperation) + } + } + + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + commitVersion: Long, + actions: util.Iterator[String], + updatedActions: UpdatedActions): CommitResponse = recordOperation("commit") { + delegatingCommitCoordinatorClient.commit( + logStore, + hadoopConf, + logPath, + coordinatedCommitsTableConf, + commitVersion, + actions, + updatedActions) + } + + override def getCommits( + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + startVersion: lang.Long, + endVersion: lang.Long = null): GetCommitsResponse = recordOperation("getCommits") { + delegatingCommitCoordinatorClient.getCommits( + logPath, coordinatedCommitsTableConf, startVersion, endVersion) + } + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + coordinatedCommitsTableConf: util.Map[String, String], + version: Long, + lastKnownBackfilledVersion: lang.Long): Unit = recordOperation("backfillToVersion") { + delegatingCommitCoordinatorClient.backfillToVersion( + logStore, + hadoopConf, + logPath, + coordinatedCommitsTableConf, + version, + lastKnownBackfilledVersion) + } + + override def semanticEquals(other: CommitCoordinatorClient): lang.Boolean = this == other + + def reset(): Unit = { + numCommitsCalled.set(0) + numGetCommitsCalled.set(0) + numBackfillToVersionCalled.set(0) + } + + override def registerTable( + logPath: Path, + currentVersion: Long, + currentMetadata: AbstractMetadata, + currentProtocol: AbstractProtocol): + util.Map[String, String] = recordOperation("registerTable") { + delegatingCommitCoordinatorClient.registerTable( + logPath, currentVersion, currentMetadata, currentProtocol) + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorSuite.scala new file mode 100644 index 00000000000..07fc7182cae --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/coordinatedcommits/InMemoryCommitCoordinatorSuite.scala @@ -0,0 +1,178 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.coordinatedcommits + +import io.delta.kernel.Table +import io.delta.kernel.defaults.engine.DefaultJsonHandler +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider +import io.delta.kernel.defaults.utils.DefaultVectorTestUtils +import io.delta.kernel.defaults.DeltaTableWriteSuiteBase +import io.delta.kernel.engine.Engine +import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol} +import io.delta.kernel.types.{StringType, StructType} +import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitFailedException, GetCommitsResponse} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import java.util +import java.util.{Collections, Optional} +import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions.{`iterator asScala`, `list asScalaBuffer`} + +abstract class InMemoryCommitCoordinatorSuite(batchSize: Int) extends DeltaTableWriteSuiteBase + with CoordinatedCommitsTestUtils + with DefaultVectorTestUtils { + + val jsonHandler = new DefaultJsonHandler(hadoopConf) + + private def assertGetCommitResponseEqual(x: GetCommitsResponse, y: GetCommitsResponse): Unit = { + assert(x.getLatestTableVersion == y.getLatestTableVersion) + assert(x.getCommits.size() == y.getCommits.size()) + for (i <- 0 until x.getCommits.size()) { + assert(x.getCommits.get(i).getVersion == y.getCommits.get(i).getVersion) + assert(x.getCommits.get(i).getFileStatus.getPath == y.getCommits.get(i).getFileStatus.getPath) + assert(x.getCommits.get(i).getFileStatus.getLen == y.getCommits.get(i).getFileStatus.getLen) + assert(x + .getCommits + .get(i) + .getFileStatus + .getModificationTime == y.getCommits.get(i).getFileStatus.getModificationTime) + assert(x.getCommits.get(i).getCommitTimestamp == y.getCommits.get(i).getCommitTimestamp) + } + } + + protected def assertBackfilled( + version: Long, + logPath: Path, + timestampOpt: Option[Long] = None): Unit = { + val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme) + val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version) + if (timestampOpt.isDefined) { + assert(logStore.read(delta, hadoopConf).toSeq == Seq(s"$version", s"${timestampOpt.get}")) + } else { + assert(logStore.read(delta, hadoopConf).take(1).toSeq == Seq(s"$version")) + } + } + + protected def registerBackfillOp( + commitCoordinatorClient: CommitCoordinatorClient, + logPath: Path, + version: Long): Unit = { + val inMemoryCS = commitCoordinatorClient.asInstanceOf[InMemoryCommitCoordinator] + inMemoryCS.registerBackfill(logPath, version) + } + + protected def validateBackfillStrategy( + engine: Engine, + commitCoordinatorClient: CommitCoordinatorClient, + logPath: Path, + tableConf: util.Map[String, String], + version: Long): Unit = { + val lastExpectedBackfilledVersion = (version - (version % batchSize)).toInt + val unbackfilledCommitVersionsAll = commitCoordinatorClient + .getCommits(logPath, tableConf, null, null) + .getCommits.map(_.getVersion) + val expectedVersions = lastExpectedBackfilledVersion + 1 to version.toInt + + assert(unbackfilledCommitVersionsAll == expectedVersions) + (0 to lastExpectedBackfilledVersion).foreach { v => + assertBackfilled(v, logPath, Some(v)) + } + } + + /** + * Checks that the commit coordinator state is correct in terms of + * - The latest table version in the commit coordinator is correct + * - All supposedly backfilled commits are indeed backfilled + * - The contents of the backfilled commits are correct (verified + * if commitTimestampOpt is provided) + * + * This can be overridden by implementing classes to implement + * more specific invariants. + */ + protected def assertInvariants( + logPath: Path, + tableConf: util.Map[String, String], + commitCoordinatorClient: CommitCoordinatorClient, + commitTimestampsOpt: Option[Array[Long]] = None): Unit = { + val maxUntrackedVersion: Int = { + val commitResponse = commitCoordinatorClient.getCommits(logPath, tableConf, null, null) + if (commitResponse.getCommits.isEmpty) { + commitResponse.getLatestTableVersion.toInt + } else { + assert( + commitResponse.getCommits.last.getVersion == commitResponse.getLatestTableVersion, + s"Max commit tracked by the commit coordinator ${commitResponse.getCommits.last} must " + + s"match latestTableVersion tracked by the commit coordinator " + + s"${commitResponse.getLatestTableVersion}." + ) + val minVersion = commitResponse.getCommits.head.getVersion + assert( + commitResponse.getLatestTableVersion - minVersion + 1 == commitResponse.getCommits.size, + "Commit map should have a contiguous range of unbackfilled commits." + ) + minVersion.toInt - 1 + } + } + (0 to maxUntrackedVersion).foreach { version => + assertBackfilled(version, logPath, commitTimestampsOpt.map(_(version))) + } + } + + test("test basic commit and backfill functionality") { + withTempDirAndEngine { (tablePath, engine) => + val cc = new InMemoryCommitCoordinatorBuilder(batchSize).build(Collections.emptyMap()) + val logPath = new Path(tablePath, "_delta_log") + + val tableConf = cc.registerTable( + logPath, + -1L, + CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()), + CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1))) + + val e = intercept[CommitFailedException] { + commit( + logPath, + tableConf, version = 0, timestamp = 0, util.Arrays.asList("0", "0"), cc) + } + assert(e.getMessage === "Commit version 0 must go via filesystem.") + writeCommitZero(engine, logPath, util.Arrays.asList("0", "0")) + assertGetCommitResponseEqual( + cc.getCommits(logPath, tableConf, null, null), + new GetCommitsResponse(Collections.emptyList(), -1)) + assertBackfilled(version = 0, logPath, Some(0L)) + + // Test backfilling functionality for commits 1 - 8 + (1 to 8).foreach { version => + commit( + logPath, + tableConf, + version, version, util.Arrays.asList(s"$version", s"$version"), cc) + validateBackfillStrategy(engine, cc, logPath, tableConf, version) + assert(cc.getCommits(logPath, tableConf, null, null).getLatestTableVersion == version) + } + + // Test that out-of-order backfill is rejected + intercept[IllegalArgumentException] { + registerBackfillOp(cc, logPath, 10) + } + assertInvariants(logPath, tableConf, cc) + } + } +} + +class InMemoryCommitCoordinator1Suite extends InMemoryCommitCoordinatorSuite(1) +class InMemoryCommitCoordinator5Suite extends InMemoryCommitCoordinatorSuite(5) diff --git a/storage/src/main/java/io/delta/storage/commit/CommitCoordinatorClient.java b/storage/src/main/java/io/delta/storage/commit/CommitCoordinatorClient.java index f37efb2cbb7..b4ff2e5b979 100644 --- a/storage/src/main/java/io/delta/storage/commit/CommitCoordinatorClient.java +++ b/storage/src/main/java/io/delta/storage/commit/CommitCoordinatorClient.java @@ -16,6 +16,7 @@ package io.delta.storage.commit; +import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -94,7 +95,7 @@ CommitResponse commit( Map tableConf, long commitVersion, Iterator actions, - UpdatedActions updatedActions); + UpdatedActions updatedActions) throws CommitFailedException, IOException; /** * API to get the unbackfilled commits for the table represented by the given logPath. @@ -148,7 +149,7 @@ void backfillToVersion( Path logPath, Map tableConf, long version, - Long lastKnownBackfilledVersion); + Long lastKnownBackfilledVersion) throws IOException; /** * Determines whether this CommitCoordinatorClient is semantically equal to another