Skip to content

Commit

Permalink
Assign baseRowID and defaultRowCommitVersion to AddFile action
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Dec 7, 2024
1 parent bb3956f commit c584707
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ public static ConcurrentWriteException concurrentDomainMetadataAction(
return new ConcurrentWriteException(message);
}

public static KernelException rowIDAssignmentWithoutStats() {
return new KernelException(
"All AddFile actions must have statistics that include the number of records "
+ "when writing to a Delta table with the 'rowTracking' table feature supported");
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ public LogSegment getLogSegment() {
return logSegment;
}

/**
* Returns the log replay object. Visible for testing, where we need to access all the active
* AddFiles for a snapshot.
*
* @return the {@link LogReplay} object
*/
public LogReplay getLogReplay() {
return logReplay;
}

public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
long minFileRetentionTimestampMillis =
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.delta.kernel.internal.DeltaErrors.*;
import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED;

import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.ColumnMapping;
Expand All @@ -40,6 +41,7 @@ public class TableFeatures {
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
add(ROW_TRACKING_FEATURE_NAME);
}
});

Expand All @@ -61,6 +63,9 @@ public class TableFeatures {
/** The feature name for domain metadata. */
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";

/** The feature name for row tracking. */
public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking";

/** The minimum writer version required to support table features. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

Expand Down Expand Up @@ -100,7 +105,8 @@ public static void validateReadSupportedTable(
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
* columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature
* enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down Expand Up @@ -197,6 +203,30 @@ public static boolean isDomainMetadataSupported(Protocol protocol) {
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
}

/**
* Check if the table protocol supports the "rowTracking" writer feature.
*
* @param protocol the protocol to check
* @return true if the protocol supports row tracking, false otherwise
*/
public static boolean isRowTrackingSupported(Protocol protocol) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
boolean rowTrackingSupported =
writerFeatures.contains(ROW_TRACKING_FEATURE_NAME)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
boolean domainMetadataSupported = isDomainMetadataSupported(protocol);

if (rowTrackingSupported && !domainMetadataSupported) {
// This should not happen. Row tracking should automatically bring in domain metadata.
throw new KernelException(
"Feature 'rowTracking' is supported but 'domainMetadata' is unsupported");
}
return rowTrackingSupported;
}

/**
* Get the minimum reader version required for a feature.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -145,6 +146,15 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
CommitInfo attemptCommitInfo = generateCommitAction(engine);
updateMetadataWithICTIfRequired(
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));

// For Row Tracking, we assign the base row ID and default row commit version to all AddFiles
// inside dataActions that do not have it yet. We also emit a DomainMetadata action if the
// row ID high watermark has changed.
RowTracking.emitDomainMetadataAction(protocol, readSnapshot, domainMetadatas, dataActions);
dataActions =
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
protocol, readSnapshot, commitAsVersion, dataActions);

int numRetries = 0;
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatistics;
import io.delta.kernel.utils.DataFileStatus;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;

/** Delta log action representing an `AddFile` */
Expand Down Expand Up @@ -55,12 +61,15 @@ public class AddFile {
.add(
"tags",
new MapType(StringType.STRING, StringType.STRING, true /* valueContainsNull */),
true /* nullable */);
true /* nullable */)
.add("baseRowId", LongType.LONG, true /* nullable */)
.add("defaultRowCommitVersion", LongType.LONG, true /* nullable */);

public static final StructType SCHEMA_WITH_STATS = SCHEMA_WITHOUT_STATS.add(JSON_STATS_FIELD);

/** Full schema of the {@code add} action in the Delta Log. */
public static final StructType FULL_SCHEMA = SCHEMA_WITH_STATS;

// There are more fields which are added when row-id tracking and clustering is enabled.
// When Kernel starts supporting row-ids and clustering, we should add those fields here.

Expand Down Expand Up @@ -93,4 +102,159 @@ public static Row convertDataFileStatus(
// any fields not present in the valueMap are considered null
return new GenericRow(FULL_SCHEMA, valueMap);
}

/**
* Utility to generate an {@link AddFile} action from an 'AddFile' {@link Row}.
*
* @param row the row to read
* @return the extracted {@link AddFile} action
*/
public static AddFile fromRow(Row row) {
if (row == null) {
return null;
}
checkArgument(
row.getSchema().equals(FULL_SCHEMA),
"Expected schema: %s, found: %s",
FULL_SCHEMA,
row.getSchema());
return new AddFile(
requireNonNull(row, COL_NAME_TO_ORDINAL.get("path"), "path")
.getString(COL_NAME_TO_ORDINAL.get("path")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("partitionValues"), "partitionValues")
.getMap(COL_NAME_TO_ORDINAL.get("partitionValues")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("size"), "size")
.getLong(COL_NAME_TO_ORDINAL.get("size")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("modificationTime"), "modificationTime")
.getLong(COL_NAME_TO_ORDINAL.get("modificationTime")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("dataChange"), "dataChange")
.getBoolean(COL_NAME_TO_ORDINAL.get("dataChange")),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("deletionVector"))
? null
: DeletionVectorDescriptor.fromRow(
row.getStruct(COL_NAME_TO_ORDINAL.get("deletionVector")))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("tags"))
? null
: row.getMap(COL_NAME_TO_ORDINAL.get("tags"))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("baseRowId"))
? null
: row.getLong(COL_NAME_TO_ORDINAL.get("baseRowId"))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"))
? null
: row.getLong(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("stats"))
? null
: DataFileStatistics.deserializeFromJson(
row.getString(COL_NAME_TO_ORDINAL.get("stats")))
.orElse(null)));
}

private final String path;
private final MapValue partitionValues;
private final long size;
private final long modificationTime;
private final boolean dataChange;
private final Optional<DeletionVectorDescriptor> deletionVector;
private final Optional<MapValue> tags;
private final Optional<Long> baseRowId;
private final Optional<Long> defaultRowCommitVersion;
private final Optional<DataFileStatistics> stats;

public AddFile(
String path,
MapValue partitionValues,
long size,
long modificationTime,
boolean dataChange,
Optional<DeletionVectorDescriptor> deletionVector,
Optional<MapValue> tags,
Optional<Long> baseRowId,
Optional<Long> defaultRowCommitVersion,
Optional<DataFileStatistics> stats) {
this.path = requireNonNull(path, "path is null");
this.partitionValues = requireNonNull(partitionValues, "partitionValues is null");
this.size = size;
this.modificationTime = modificationTime;
this.dataChange = dataChange;
this.deletionVector = deletionVector;
this.tags = tags;
this.baseRowId = baseRowId;
this.defaultRowCommitVersion = defaultRowCommitVersion;
this.stats = stats;
}

public Row toRow() {
Map<Integer, Object> valueMap = new HashMap<>();
valueMap.put(COL_NAME_TO_ORDINAL.get("path"), path);
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionValues"), partitionValues);
valueMap.put(COL_NAME_TO_ORDINAL.get("size"), size);
valueMap.put(COL_NAME_TO_ORDINAL.get("modificationTime"), modificationTime);
valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
deletionVector.ifPresent(dv -> valueMap.put(COL_NAME_TO_ORDINAL.get("deletionVector"), dv));
tags.ifPresent(tags -> valueMap.put(COL_NAME_TO_ORDINAL.get("tags"), tags));
baseRowId.ifPresent(rowId -> valueMap.put(COL_NAME_TO_ORDINAL.get("baseRowId"), rowId));
defaultRowCommitVersion.ifPresent(
commitVersion ->
valueMap.put(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), commitVersion));
stats.ifPresent(
stats -> valueMap.put(COL_NAME_TO_ORDINAL.get("stats"), stats.serializeAsJson()));
return new GenericRow(FULL_SCHEMA, valueMap);
}

public Optional<Long> getBaseRowId() {
return baseRowId;
}

public Optional<Long> getDefaultRowCommitVersion() {
return defaultRowCommitVersion;
}

public Optional<DataFileStatistics> getStats() {
return stats;
}

/**
* Creates a new AddFile instance with the specified base row ID.
*
* @param baseRowId the new base row ID to be assigned
* @return a new AddFile instance with the updated base row ID
*/
public AddFile withNewBaseRowId(long baseRowId) {
return new AddFile(
path,
partitionValues,
size,
modificationTime,
dataChange,
deletionVector,
tags,
Optional.of(baseRowId),
defaultRowCommitVersion,
stats);
}

/**
* Creates a new AddFile instance with the specified default row commit version.
*
* @param defaultRowCommitVersion the new default row commit version to be assigned
* @return a new AddFile instance with the updated default row commit version
*/
public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) {
return new AddFile(
path,
partitionValues,
size,
modificationTime,
dataChange,
deletionVector,
tags,
baseRowId,
Optional.of(defaultRowCommitVersion),
stats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public class RemoveFile {
.add("size", LongType.LONG, true /* nullable*/)
.add("stats", StringType.STRING, true /* nullable */)
.add("tags", new MapType(StringType.STRING, StringType.STRING, true), true /* nullable */)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
// There are more fields which are added when row-id tracking is enabled. When Kernel
// starts supporting row-ids, we should add those fields here.
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */)
.add("baseRowId", LongType.LONG, true /* nullable */)
.add("defaultRowCommitVersion", LongType.LONG, true /* nullable */);
// TODO: Currently, Kernel doesn't create RemoveFile actions internally, nor provides APIs for
// connectors to generate and commit them. Once we have the need for this, we should ensure
// that the baseRowId and defaultRowCommitVersion fields of RemoveFile actions are correctly
// populated to match the corresponding AddFile actions.
}
Loading

0 comments on commit c584707

Please sign in to comment.