Skip to content

Commit

Permalink
[Kernel] Add an end2end prototype of Coordinated Commit read support …
Browse files Browse the repository at this point in the history
…in kernel
  • Loading branch information
EstherBear committed Jul 7, 2024
1 parent ad8d1cb commit 1b577a2
Show file tree
Hide file tree
Showing 33 changed files with 2,289 additions and 27 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
57 changes: 57 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file
*/
public class Commit {

private long version;

private FileStatus fileStatus;

private long commitTimestamp;

public Commit(long version, FileStatus fileStatus, long commitTimestamp) {
this.version = version;
this.fileStatus = fileStatus;
this.commitTimestamp = commitTimestamp;
}

public long getVersion() {
return version;
}

public FileStatus getFileStatus() {
return fileStatus;
}

public long getCommitTimestamp() {
return commitTimestamp;
}

public Commit withFileStatus(FileStatus fileStatus) {
return new Commit(version, fileStatus, commitTimestamp);
}

public Commit withCommitTimestamp(long commitTimestamp) {
return new Commit(version, fileStatus, commitTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}
*
* <pre>
* | retryable | conflict | meaning |
* | no | no | something bad happened (e.g. auth failure) |
* | no | yes | permanent transaction conflict (e.g. multi-table commit failed) |
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
* </pre>
*/
public class CommitFailedException extends Exception {

private boolean retryable;

private boolean conflict;

private String message;

public CommitFailedException(boolean retryable, boolean conflict, String message) {
this.retryable = retryable;
this.conflict = conflict;
this.message = message;
}

public boolean getRetryable() {
return retryable;
}

public boolean getConflict() {
return conflict;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
*/
public class CommitResponse {

private Commit commit;

public CommitResponse(Commit commit) {
this.commit = commit;
}

public Commit getCommit() {
return commit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.List;
import java.util.Map;

/**
* Response container for
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
* String, Map, Long, Long)}.
*/
public class GetCommitsResponse {
private List<Commit> commits;

private long latestTableVersion;

public GetCommitsResponse(List<Commit> commits, long latestTableVersion) {
this.commits = commits;
this.latestTableVersion = latestTableVersion;
}

public List<Commit> getCommits() {
return commits;
}

public long getLatestTableVersion() {
return latestTableVersion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.commit.actions.AbstractCommitInfo;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;

/**
* A container class to inform the CommitCoordinatorClientHandler about any changes in
* Protocol/Metadata
*/
public class UpdatedActions {
private AbstractCommitInfo commitInfo;

private AbstractMetadata newMetadata;

private AbstractProtocol newProtocol;

private AbstractMetadata oldMetadata;

private AbstractProtocol oldProtocol;

public UpdatedActions(
AbstractCommitInfo commitInfo,
AbstractMetadata newMetadata,
AbstractProtocol newProtocol,
AbstractMetadata oldMetadata,
AbstractProtocol oldProtocol) {
this.commitInfo = commitInfo;
this.newMetadata = newMetadata;
this.newProtocol = newProtocol;
this.oldMetadata = oldMetadata;
this.oldProtocol = oldProtocol;
}

public AbstractCommitInfo getCommitInfo() {
return commitInfo;
}

public AbstractMetadata getNewMetadata() {
return newMetadata;
}

public AbstractProtocol getNewProtocol() {
return newProtocol;
}

public AbstractMetadata getOldMetadata() {
return oldMetadata;
}

public AbstractProtocol getOldProtocol() {
return oldProtocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

/**
* Interface for objects that represents the base information for a commit.
* Commits need to provide an in-commit timestamp. This timestamp is used
* to specify the exact time the commit happened and determines the target
* version for time-based time travel queries.
*/
public interface AbstractCommitInfo {

/**
* Get the timestamp of the commit as millis after the epoch.
*/
long getCommitTimestamp();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

import java.util.List;
import java.util.Map;

/**
* Interface for metadata actions in Delta. The metadata defines the metadata
* of the table.
*/
public interface AbstractMetadata {

/**
* A unique table identifier.
*/
String getId();

/**
* User-specified table identifier.
*/
String getName();

/**
* User-specified table description.
*/
String getDescription();

/** The table provider format. */
String getProvider();

/** The format options */
Map<String, String> getFormatOptions();

/**
* The table schema in string representation.
*/
String getSchemaString();

/**
* List of partition columns.
*/
List<String> getPartitionColumns();

/**
* The table properties defined on the table.
*/
Map<String, String> getConfiguration();

/**
* Timestamp for the creation of this metadata.
*/
Long getCreatedTime();
}
Loading

0 comments on commit 1b577a2

Please sign in to comment.