Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Assign base row ID and default row commit version to AddFile #3894

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public static ConcurrentWriteException concurrentDomainMetadataAction(
return new ConcurrentWriteException(message);
}

public static KernelException rowIDAssignmentWithoutStats() {
return new KernelException(
"All AddFile actions must have statistics that include the number of records "
qiyuandong-db marked this conversation as resolved.
Show resolved Hide resolved
+ "when writing to a Delta table with the 'rowTracking' table feature supported");
}

public static KernelException rowTrackingSupportedWithDomainMetadataUnsupported() {
return new KernelException(
"Feature 'rowTracking' is supported and depends on feature 'domainMetadata',"
+ " but 'domainMetadata' is unsupported");
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Utilities to extract information out of the scan file rows returned by {@link
Expand Down Expand Up @@ -87,6 +88,11 @@ private InternalScanFileUtils() {}

private static final int ADD_FILE_DV_ORDINAL = ADD_FILE_SCHEMA.indexOf("deletionVector");

private static final int ADD_FILE_BASE_ROW_ID_ORDINAL = ADD_FILE_SCHEMA.indexOf("baseRowId");

private static final int ADD_FILE_DEFAULT_ROW_COMMIT_VERSION =
ADD_FILE_SCHEMA.indexOf("defaultRowCommitVersion");

private static final int TABLE_ROOT_ORDINAL = SCAN_FILE_SCHEMA.indexOf(TABLE_ROOT_COL_NAME);

public static final int ADD_FILE_STATS_ORDINAL = AddFile.SCHEMA_WITH_STATS.indexOf("stats");
Expand Down Expand Up @@ -190,4 +196,30 @@ public static DeletionVectorDescriptor getDeletionVectorDescriptorFromRow(Row sc
public static Column getPartitionValuesParsedRefInAddFile(String partitionColName) {
return new Column(new String[] {"add", "partitionValues_parsed", partitionColName});
}

/**
* Get the base row id from the given scan file row.
*
* @param scanFile {@link Row} representing one scan file.
* @return base row id if present, otherwise empty.
*/
public static Optional<Long> getBaseRowId(Row scanFile) {
Row addFile = getAddFileEntry(scanFile);
return addFile.isNullAt(ADD_FILE_BASE_ROW_ID_ORDINAL)
? Optional.empty()
: Optional.of(addFile.getLong(ADD_FILE_BASE_ROW_ID_ORDINAL));
}

/**
* Get the default row commit version from the given scan file row.
*
* @param scanFile {@link Row} representing one scan file.
* @return default row commit version if present, otherwise empty.
*/
public static Optional<Long> getDefaultRowCommitVersion(Row scanFile) {
Row addFile = getAddFileEntry(scanFile);
return addFile.isNullAt(ADD_FILE_DEFAULT_ROW_COMMIT_VERSION)
? Optional.empty()
: Optional.of(addFile.getLong(ADD_FILE_DEFAULT_ROW_COMMIT_VERSION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TableFeatures {
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
add(ROW_TRACKING_FEATURE_NAME);
}
});

Expand All @@ -61,6 +62,9 @@ public class TableFeatures {
/** The feature name for domain metadata. */
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";

/** The feature name for row tracking. */
qiyuandong-db marked this conversation as resolved.
Show resolved Hide resolved
public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking";

/** The minimum writer version required to support table features. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

Expand Down Expand Up @@ -100,7 +104,8 @@ public static void validateReadSupportedTable(
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
* columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature
* enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down Expand Up @@ -136,6 +141,12 @@ public static void validateWriteSupportedTable(
throw unsupportedWriterFeature(tablePath, writerFeature);
}
}
// Eventually we may have a way to declare and enforce dependencies between features.
// By putting this check for row tracking here, it makes it easier to spot that row
// tracking defines such a dependency that can be implicitly checked.
if (isRowTrackingSupported(protocol) && !isDomainMetadataSupported(protocol)) {
throw DeltaErrors.rowTrackingSupportedWithDomainMetadataUnsupported();
}
break;
default:
throw unsupportedWriterProtocol(tablePath, minWriterVersion);
Expand Down Expand Up @@ -189,12 +200,17 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
* @return true if the "domainMetadata" feature is supported, false otherwise
*/
public static boolean isDomainMetadataSupported(Protocol protocol) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
return isWriterFeatureSupported(protocol, DOMAIN_METADATA_FEATURE_NAME);
}

/**
* Check if the table protocol supports the "rowTracking" writer feature.
*
* @param protocol the protocol to check
* @return true if the protocol supports row tracking, false otherwise
*/
public static boolean isRowTrackingSupported(Protocol protocol) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: where (probably in future PRs?) will you check that RowTracking is enabled (which is strictly stronger than supported?

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not be checking whether row tracking is enabled in this PR or in planned future ones. The current goal is to add a minimal implementation to ensure row tracking is supported in Delta Kernel. Addressing the enabled requirement is outside the current scope.

I imagine that in the future, we may need to check it is enabled when 1) reconstructing stable row ID / row commit version during reads, and 2) preserving stable row ID / row commit version during writes (e.g., for handling UPDATE and DELETE operations).

return isWriterFeatureSupported(protocol, ROW_TRACKING_FEATURE_NAME);
}

/**
Expand Down Expand Up @@ -253,4 +269,13 @@ private static void validateNoInvariants(StructType tableSchema) {
throw columnInvariantsNotSupported();
}
}

private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
return writerFeatures.contains(featureName)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -145,6 +146,18 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
CommitInfo attemptCommitInfo = generateCommitAction(engine);
updateMetadataWithICTIfRequired(
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));

// If row tracking is supported, assign base row IDs and default row commit versions to any
// AddFile actions that do not yet have them. If the row ID high watermark changes, emit a
// DomainMetadata action to update it.
if (TableFeatures.isRowTrackingSupported(protocol)) {
RowTracking.createNewHighWaterMarkIfNeeded(readSnapshot, dataActions)
.ifPresent(domainMetadatas::add);
dataActions =
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
readSnapshot, commitAsVersion, dataActions);
}

int numRetries = 0;
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
Expand Down
Loading
Loading