From e94da899e3b19f1b22dc5935e6e10458bd1a8dc7 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Sun, 8 Dec 2024 23:37:17 +0100 Subject: [PATCH 1/5] Assign baseRowId/defaultRowCommitVersion to AddFile and RemoveFile actions --- .../io/delta/kernel/internal/DeltaErrors.java | 6 + .../delta/kernel/internal/SnapshotImpl.java | 10 + .../delta/kernel/internal/TableFeatures.java | 32 +- .../kernel/internal/TransactionImpl.java | 10 + .../kernel/internal/actions/AddFile.java | 168 +++++++++- .../kernel/internal/actions/RemoveFile.java | 10 +- .../internal/rowtracking/RowTracking.java | 166 ++++++++++ .../RowTrackingMetadataDomain.java | 2 + .../kernel/utils/DataFileStatistics.java | 24 ++ .../kernel/internal/TableFeaturesSuite.scala | 6 +- .../kernel/defaults/RowTrackingSuite.scala | 308 ++++++++++++++++++ 11 files changed, 734 insertions(+), 8 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 2b4e0599df3..47fec3751c5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -293,6 +293,12 @@ public static ConcurrentWriteException concurrentDomainMetadataAction( return new ConcurrentWriteException(message); } + public static KernelException rowIDAssignmentWithoutStats() { + return new KernelException( + "All AddFile actions must have statistics that include the number of records " + + "when writing to a Delta table with the 'rowTracking' table feature supported"); + } + /* ------------------------ HELPER METHODS ----------------------------- */ private static String formatTimestamp(long millisSinceEpochUTC) { return new Timestamp(millisSinceEpochUTC).toInstant().toString(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 72ee67492fa..54ac6563110 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -151,6 +151,16 @@ public LogSegment getLogSegment() { return logSegment; } + /** + * Returns the log replay object. Visible for testing, where we need to access all the active + * AddFiles for a snapshot. + * + * @return the {@link LogReplay} object + */ + public LogReplay getLogReplay() { + return logReplay; + } + public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { long minFileRetentionTimestampMillis = System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index 88fbabbd0d2..e9953647646 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -19,6 +19,7 @@ import static io.delta.kernel.internal.DeltaErrors.*; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.util.ColumnMapping; @@ -40,6 +41,7 @@ public class TableFeatures { add("typeWidening-preview"); add("typeWidening"); add(DOMAIN_METADATA_FEATURE_NAME); + add(ROW_TRACKING_FEATURE_NAME); } }); @@ -61,6 +63,9 @@ public class TableFeatures { /** The feature name for domain metadata. */ public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; + /** The feature name for row tracking. */ + public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking"; + /** The minimum writer version required to support table features. */ public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7; @@ -100,7 +105,8 @@ public static void validateReadSupportedTable( *
  • protocol writer version 1. *
  • protocol writer version 2 only with appendOnly feature enabled. *
  • protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code - * columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled. + * columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature + * enabled. * * * @param protocol Table protocol @@ -197,6 +203,30 @@ public static boolean isDomainMetadataSupported(Protocol protocol) { && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; } + /** + * Check if the table protocol supports the "rowTracking" writer feature. + * + * @param protocol the protocol to check + * @return true if the protocol supports row tracking, false otherwise + */ + public static boolean isRowTrackingSupported(Protocol protocol) { + List writerFeatures = protocol.getWriterFeatures(); + if (writerFeatures == null) { + return false; + } + boolean rowTrackingSupported = + writerFeatures.contains(ROW_TRACKING_FEATURE_NAME) + && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; + boolean domainMetadataSupported = isDomainMetadataSupported(protocol); + + if (rowTrackingSupported && !domainMetadataSupported) { + // This should not happen. Row tracking should automatically bring in domain metadata. + throw new KernelException( + "Feature 'rowTracking' is supported but 'domainMetadata' is unsupported"); + } + return rowTrackingSupported; + } + /** * Get the minimum reader version required for a feature. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 267517651c7..9b8601b26ce 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -32,6 +32,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; +import io.delta.kernel.internal.rowtracking.RowTracking; import io.delta.kernel.internal.util.*; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; @@ -145,6 +146,15 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data CommitInfo attemptCommitInfo = generateCommitAction(engine); updateMetadataWithICTIfRequired( engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine)); + + // If row tracking is supported, assign base row IDs and default row commit versions to any + // AddFile or RemoveFile actions that do not yet have them. If the row ID high watermark + // changes, emit a DomainMetadata action. + RowTracking.updateHighWaterMark(protocol, readSnapshot, domainMetadatas, dataActions); + dataActions = + RowTracking.assignBaseRowIdAndDefaultRowCommitVersion( + protocol, readSnapshot, commitAsVersion, dataActions); + int numRetries = 0; do { logger.info("Committing transaction as version = {}.", commitAsVersion); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 0e652396e8a..58e4631ffb4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -16,18 +16,24 @@ package io.delta.kernel.internal.actions; import static io.delta.kernel.internal.util.InternalUtils.relativizePath; +import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; +import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Literal; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.types.*; +import io.delta.kernel.utils.DataFileStatistics; import io.delta.kernel.utils.DataFileStatus; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.stream.IntStream; /** Delta log action representing an `AddFile` */ @@ -55,12 +61,15 @@ public class AddFile { .add( "tags", new MapType(StringType.STRING, StringType.STRING, true /* valueContainsNull */), - true /* nullable */); + true /* nullable */) + .add("baseRowId", LongType.LONG, true /* nullable */) + .add("defaultRowCommitVersion", LongType.LONG, true /* nullable */); public static final StructType SCHEMA_WITH_STATS = SCHEMA_WITHOUT_STATS.add(JSON_STATS_FIELD); /** Full schema of the {@code add} action in the Delta Log. */ public static final StructType FULL_SCHEMA = SCHEMA_WITH_STATS; + // There are more fields which are added when row-id tracking and clustering is enabled. // When Kernel starts supporting row-ids and clustering, we should add those fields here. @@ -93,4 +102,161 @@ public static Row convertDataFileStatus( // any fields not present in the valueMap are considered null return new GenericRow(FULL_SCHEMA, valueMap); } + + /** + * Utility to generate an {@link AddFile} action from an 'AddFile' {@link Row}. + * + * @param row the row to read + * @return the extracted {@link AddFile} action + */ + public static AddFile fromRow(Row row) { + if (row == null) { + return null; + } + + checkArgument( + row.getSchema().equals(FULL_SCHEMA), + "Expected schema: %s, found: %s", + FULL_SCHEMA, + row.getSchema()); + + return new AddFile( + requireNonNull(row, COL_NAME_TO_ORDINAL.get("path"), "path") + .getString(COL_NAME_TO_ORDINAL.get("path")), + requireNonNull(row, COL_NAME_TO_ORDINAL.get("partitionValues"), "partitionValues") + .getMap(COL_NAME_TO_ORDINAL.get("partitionValues")), + requireNonNull(row, COL_NAME_TO_ORDINAL.get("size"), "size") + .getLong(COL_NAME_TO_ORDINAL.get("size")), + requireNonNull(row, COL_NAME_TO_ORDINAL.get("modificationTime"), "modificationTime") + .getLong(COL_NAME_TO_ORDINAL.get("modificationTime")), + requireNonNull(row, COL_NAME_TO_ORDINAL.get("dataChange"), "dataChange") + .getBoolean(COL_NAME_TO_ORDINAL.get("dataChange")), + Optional.ofNullable( + row.isNullAt(COL_NAME_TO_ORDINAL.get("deletionVector")) + ? null + : DeletionVectorDescriptor.fromRow( + row.getStruct(COL_NAME_TO_ORDINAL.get("deletionVector")))), + Optional.ofNullable( + row.isNullAt(COL_NAME_TO_ORDINAL.get("tags")) + ? null + : row.getMap(COL_NAME_TO_ORDINAL.get("tags"))), + Optional.ofNullable( + row.isNullAt(COL_NAME_TO_ORDINAL.get("baseRowId")) + ? null + : row.getLong(COL_NAME_TO_ORDINAL.get("baseRowId"))), + Optional.ofNullable( + row.isNullAt(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion")) + ? null + : row.getLong(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"))), + Optional.ofNullable( + row.isNullAt(COL_NAME_TO_ORDINAL.get("stats")) + ? null + : DataFileStatistics.deserializeFromJson( + row.getString(COL_NAME_TO_ORDINAL.get("stats"))) + .orElse(null))); + } + + private final String path; + private final MapValue partitionValues; + private final long size; + private final long modificationTime; + private final boolean dataChange; + private final Optional deletionVector; + private final Optional tags; + private final Optional baseRowId; + private final Optional defaultRowCommitVersion; + private final Optional stats; + + public AddFile( + String path, + MapValue partitionValues, + long size, + long modificationTime, + boolean dataChange, + Optional deletionVector, + Optional tags, + Optional baseRowId, + Optional defaultRowCommitVersion, + Optional stats) { + this.path = requireNonNull(path, "path is null"); + this.partitionValues = requireNonNull(partitionValues, "partitionValues is null"); + this.size = size; + this.modificationTime = modificationTime; + this.dataChange = dataChange; + this.deletionVector = deletionVector; + this.tags = tags; + this.baseRowId = baseRowId; + this.defaultRowCommitVersion = defaultRowCommitVersion; + this.stats = stats; + } + + public Row toRow() { + Map valueMap = new HashMap<>(); + valueMap.put(COL_NAME_TO_ORDINAL.get("path"), path); + valueMap.put(COL_NAME_TO_ORDINAL.get("partitionValues"), partitionValues); + valueMap.put(COL_NAME_TO_ORDINAL.get("size"), size); + valueMap.put(COL_NAME_TO_ORDINAL.get("modificationTime"), modificationTime); + valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange); + deletionVector.ifPresent(dv -> valueMap.put(COL_NAME_TO_ORDINAL.get("deletionVector"), dv)); + tags.ifPresent(tags -> valueMap.put(COL_NAME_TO_ORDINAL.get("tags"), tags)); + baseRowId.ifPresent(rowId -> valueMap.put(COL_NAME_TO_ORDINAL.get("baseRowId"), rowId)); + defaultRowCommitVersion.ifPresent( + commitVersion -> + valueMap.put(COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), commitVersion)); + stats.ifPresent( + stats -> valueMap.put(COL_NAME_TO_ORDINAL.get("stats"), stats.serializeAsJson())); + return new GenericRow(FULL_SCHEMA, valueMap); + } + + public Optional getBaseRowId() { + return baseRowId; + } + + public Optional getDefaultRowCommitVersion() { + return defaultRowCommitVersion; + } + + public Optional getStats() { + return stats; + } + + /** + * Creates a new AddFile instance with the specified base row ID. + * + * @param baseRowId the new base row ID to be assigned + * @return a new AddFile instance with the updated base row ID + */ + public AddFile withNewBaseRowId(long baseRowId) { + return new AddFile( + path, + partitionValues, + size, + modificationTime, + dataChange, + deletionVector, + tags, + Optional.of(baseRowId), + defaultRowCommitVersion, + stats); + } + + /** + * Creates a new AddFile instance with the specified default row commit version. + * + * @param defaultRowCommitVersion the new default row commit version to be assigned + * @return a new AddFile instance with the updated default row commit version + */ + public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) { + return new AddFile( + path, + partitionValues, + size, + modificationTime, + dataChange, + deletionVector, + tags, + baseRowId, + Optional.of(defaultRowCommitVersion), + stats); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RemoveFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RemoveFile.java index b3d7388c4b7..62608a6b901 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RemoveFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RemoveFile.java @@ -33,7 +33,11 @@ public class RemoveFile { .add("size", LongType.LONG, true /* nullable*/) .add("stats", StringType.STRING, true /* nullable */) .add("tags", new MapType(StringType.STRING, StringType.STRING, true), true /* nullable */) - .add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */); - // There are more fields which are added when row-id tracking is enabled. When Kernel - // starts supporting row-ids, we should add those fields here. + .add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */) + .add("baseRowId", LongType.LONG, true /* nullable */) + .add("defaultRowCommitVersion", LongType.LONG, true /* nullable */); + // TODO: Currently, Kernel doesn't create RemoveFile actions internally, nor provides APIs for + // connectors to generate and commit them. Once we have the need for this, we should ensure + // that the baseRowId and defaultRowCommitVersion fields of RemoveFile actions are correctly + // populated to match the corresponding AddFile actions. } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java new file mode 100644 index 00000000000..35843a4ef0e --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -0,0 +1,166 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.rowtracking; + +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.TableFeatures; +import io.delta.kernel.internal.actions.*; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.DataFileStatistics; +import java.io.IOException; +import java.util.List; + +/** A collection of helper methods for working with row tracking. */ +public class RowTracking { + private RowTracking() { + // Empty constructor to prevent instantiation of this class + } + + private static final int ADD_FILE_ORDINAL = SingleAction.FULL_SCHEMA.indexOf("add"); + + /** + * Assigns base row IDs and default row commit versions to {@link AddFile} actions before + * committing. If row tracking is not supported by the given protocol, returns the input actions + * unchanged. + * + *

    When needed, it sets each {@link AddFile} action’s base row ID based on the current high + * watermark and increments the watermark accordingly. If a default row commit version is missing, + * it assigns the provided commit version. + * + * @param protocol the protocol to check for row tracking support + * @param snapshot the current snapshot of the table + * @param commitVersion the version of the commit for default row commit version assignment + * @param dataActions the {@link CloseableIterable} of data actions to process + * @return an {@link CloseableIterable} of data actions with base row IDs and default row commit + * versions assigned + */ + public static CloseableIterable assignBaseRowIdAndDefaultRowCommitVersion( + Protocol protocol, + SnapshotImpl snapshot, + long commitVersion, + CloseableIterable dataActions) { + if (!TableFeatures.isRowTrackingSupported(protocol)) { + return dataActions; + } + + return new CloseableIterable() { + @Override + public void close() throws IOException { + dataActions.close(); + } + + @Override + public CloseableIterator iterator() { + // Used to keep track of the current high watermark as we iterate through the data actions. + // Use a one-element array here to allow for mutation within the lambda. + final long[] currRowIdHighWatermark = {readRowIdHighWaterMark(snapshot)}; + return dataActions + .iterator() + .map( + row -> { + // Non-AddFile actions are returned unchanged + if (row.isNullAt(ADD_FILE_ORDINAL)) { + return row; + } + + AddFile addFile = AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL)); + + // Assign base row ID if missing + if (!addFile.getBaseRowId().isPresent()) { + final long numRecords = getNumRecords(addFile); + addFile = addFile.withNewBaseRowId(currRowIdHighWatermark[0] + 1L); + currRowIdHighWatermark[0] += numRecords; + } + + // Assign default row commit version if missing + if (!addFile.getDefaultRowCommitVersion().isPresent()) { + addFile = addFile.withNewDefaultRowCommitVersion(commitVersion); + } + + // Return a new AddFile row with assigned baseRowId/defaultRowCommitVersion + return SingleAction.createAddFileSingleAction(addFile.toRow()); + }); + } + }; + } + + /** + * Emits a {@link DomainMetadata} action if the row ID high watermark has changed due to newly + * processed {@link AddFile} actions. + * + * @param protocol the protocol to check for row tracking support + * @param snapshot the current snapshot of the table + * @param domainMetadatas the list of domain metadata actions to append to if needed + * @param dataActions the iterable of data actions that may update the high watermark + */ + public static void updateHighWaterMark( + Protocol protocol, + SnapshotImpl snapshot, + List domainMetadatas, + CloseableIterable dataActions) { + if (!TableFeatures.isRowTrackingSupported(protocol)) { + return; + } + + final long prevRowIdHighWatermark = readRowIdHighWaterMark(snapshot); + // Use a one-element array here to allow for mutation within the lambda. + final long[] newRowIdHighWatermark = {prevRowIdHighWatermark}; + + dataActions.forEach( + row -> { + if (!row.isNullAt(ADD_FILE_ORDINAL)) { + newRowIdHighWatermark[0] += + getNumRecords(AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL))); + } + }); + + // Emit a DomainMetadata action to update the high watermark if it has changed + if (newRowIdHighWatermark[0] != prevRowIdHighWatermark) { + domainMetadatas.add( + new RowTrackingMetadataDomain(newRowIdHighWatermark[0]).toDomainMetadata()); + } + } + + /** + * Reads the current row ID high watermark from the snapshot, or returns a default value if + * missing. + * + * @param snapshot the snapshot to read + * @return the row ID high watermark + */ + public static long readRowIdHighWaterMark(SnapshotImpl snapshot) { + return RowTrackingMetadataDomain.fromSnapshot(snapshot) + .map(RowTrackingMetadataDomain::getRowIdHighWaterMark) + .orElse(RowTrackingMetadataDomain.MISSING_ROW_ID_HIGH_WATERMARK); + } + + /** + * Retrieves the number of records from the AddFile's statistics. We error out if statistics are + * missing. + * + * @param addFile the AddFile action + * @return the number of records + */ + private static long getNumRecords(AddFile addFile) { + return addFile + .getStats() + .map(DataFileStatistics::getNumRecords) + .orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java index 5c11a04ac05..42956673ee3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java @@ -26,6 +26,8 @@ public final class RowTrackingMetadataDomain extends JsonMetadataDomain { public static final String DOMAIN_NAME = "delta.rowTracking"; + public static final long MISSING_ROW_ID_HIGH_WATERMARK = -1L; + /** * Creates an instance of {@link RowTrackingMetadataDomain} from a JSON configuration string. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java index 41006deffec..6c3fb1cdd78 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java @@ -19,8 +19,10 @@ import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.internal.util.JsonUtils; import java.util.Collections; import java.util.Map; +import java.util.Optional; /** Statistics about data file in a Delta Lake table. */ public class DataFileStatistics { @@ -101,4 +103,26 @@ public String serializeAsJson() { // For now just serialize the number of records. return "{\"numRecords\":" + numRecords + "}"; } + + /** + * Deserialize the statistics from a JSON string. For now only the number of records is + * deserialized, the rest of the statistics are not supported yet. + * + * @param json Data statistics JSON string to deserialize. + * @return An {@link Optional} containing the deserialized {@link DataFileStatistics} if present. + */ + public static Optional deserializeFromJson(String json) { + Map keyValueMap = JsonUtils.parseJSONKeyValueMap(json); + + // For now just deserialize the number of records which will be used by row tracking. + String numRecordsStr = keyValueMap.get("numRecords"); + if (numRecordsStr == null) { + return Optional.empty(); + } + long numRecords = Long.parseLong(numRecordsStr); + DataFileStatistics stats = + new DataFileStatistics( + numRecords, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + return Optional.of(stats); + } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index a98d39840d0..f373b2f2865 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -69,7 +69,7 @@ class TableFeaturesSuite extends AnyFunSuite { } Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening", - "domainMetadata") + "domainMetadata", "rowTracking") .foreach { supportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") { checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature)) @@ -77,8 +77,8 @@ class TableFeaturesSuite extends AnyFunSuite { } Seq("invariants", "checkConstraints", "generatedColumns", "allowColumnDefaults", "changeDataFeed", - "identityColumns", "deletionVectors", "rowTracking", "timestampNtz", - "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", + "identityColumns", "deletionVectors", "timestampNtz", "v2Checkpoint", "icebergCompatV1", + "icebergCompatV2", "clustering", "vacuumProtocolCheck").foreach { unsupportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $unsupportedWriterFeature") { checkUnsupported(createTestProtocol(minWriterVersion = 7, unsupportedWriterFeature)) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala new file mode 100644 index 00000000000..55d084d152f --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala @@ -0,0 +1,308 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase +import io.delta.kernel.engine.Engine +import io.delta.kernel.exceptions.KernelException +import io.delta.kernel.expressions.Literal +import io.delta.kernel.internal.{SnapshotImpl, TableImpl} +import io.delta.kernel.internal.actions.{AddFile, Protocol, SingleAction} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain +import io.delta.kernel.internal.util.VectorUtils +import io.delta.kernel.types.StructType +import io.delta.kernel.types.LongType.LONG +import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} + +import java.util +import java.util.{Collections, Optional} +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { + + private def setWriterFeatureSupported( + engine: Engine, + tablePath: String, + schema: StructType = testSchema, + writerFeatures: Seq[String]): Unit = { + val protocol = new Protocol( + 3, // minReaderVersion + 7, // minWriterVersion + Collections.emptyList(), // readerFeatures + writerFeatures.asJava // writerFeatures + ) + + val protocolAction = SingleAction.createProtocolSingleAction(protocol.toRow) + val txn = createTxn(engine, tablePath, isNewTable = false, schema, Seq.empty) + txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(protocolAction).asJava.iterator()))) + } + + private def createTableWithRowTrackingSupported( + engine: Engine, + tablePath: String, + schema: StructType = testSchema): Unit = { + createTxn(engine, tablePath, isNewTable = true, schema, Seq.empty) + .commit(engine, emptyIterable()) + setWriterFeatureSupported(engine, tablePath, schema, Seq("domainMetadata", "rowTracking")) + } + + private def verifyBaseRowIDs(engine: Engine, tablePath: String, expectedValue: Seq[Long]) = { + val table = TableImpl.forPath(engine, tablePath) + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + + val AddFileActionsBatches = snapshot.getLogReplay.getAddFilesAsColumnarBatches( + engine, + false, /* shouldReadStats */ + Optional.empty() + ).asScala + + val modificationTimeOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("modificationTime") + val baseRowIdOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("baseRowId") + + val sortedBaseRowIds = AddFileActionsBatches + .flatMap(_.getRows.asScala) + .toSeq + .map(_.getStruct(0)) + .sortBy(_.getLong(modificationTimeOrdinal)) + .map(_.getLong(baseRowIdOrdinal)) + + assert(sortedBaseRowIds === expectedValue) + } + + private def verifyDefaultRowCommitVersion( + engine: Engine, + tablePath: String, + expectedValue: Seq[Long]) = { + val table = TableImpl.forPath(engine, tablePath) + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + + val AddFileActionsBatches = snapshot.getLogReplay.getAddFilesAsColumnarBatches( + engine, + false, /* shouldReadStats */ + Optional.empty() + ).asScala + + val modificationTimeOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("modificationTime") + val defaultRowCommitVersionOrdinal = + AddFile.SCHEMA_WITHOUT_STATS.indexOf("defaultRowCommitVersion") + + val AddFileDefaultRowCommitVersionsSorted = AddFileActionsBatches + .flatMap(_.getRows.asScala) + .toSeq + .map(_.getStruct(0)) + .sortBy(_.getLong(modificationTimeOrdinal)) + .map(_.getLong(defaultRowCommitVersionOrdinal)) + + assert(AddFileDefaultRowCommitVersionsSorted === expectedValue) + } + + private def verifyHighWatermark(engine: Engine, tablePath: String, expectedValue: Long) = { + val table = TableImpl.forPath(engine, tablePath) + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val rowTrackingMetadataDomain = RowTrackingMetadataDomain.fromSnapshot(snapshot) + + assert(rowTrackingMetadataDomain.isPresent) + assert(rowTrackingMetadataDomain.get().getRowIdHighWaterMark === expectedValue) + } + + test("Base row IDs/default row commit versions are assigned to AddFile actions") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithRowTrackingSupported(engine, tablePath) + + val dataBatch1 = generateData(testSchema, Seq.empty, Map.empty, 100, 1) // 100 rows + val dataBatch2 = generateData(testSchema, Seq.empty, Map.empty, 200, 1) // 200 rows + val dataBatch3 = generateData(testSchema, Seq.empty, Map.empty, 400, 1) // 400 rows + + // Commit 3 AddFile actions in one transaction + val commitVersion = appendData( + engine, + tablePath, + data = Seq(dataBatch1, dataBatch2, dataBatch3).map(Map.empty[String, Literal] -> _) + ).getVersion + + verifyBaseRowIDs(engine, tablePath, Seq(0, 100, 300)) + verifyDefaultRowCommitVersion(engine, tablePath, Seq.fill(3)(commitVersion)) + verifyHighWatermark(engine, tablePath, 699) + } + } + + test("Previous Row ID high watermark can be picked up to assign base row IDs") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithRowTrackingSupported(engine, tablePath) + + val dataBatch1 = generateData(testSchema, Seq.empty, Map.empty, 100, 1) + val commitVersion1 = appendData( + engine, + tablePath, + data = Seq(dataBatch1).map(Map.empty[String, Literal] -> _) + ).getVersion + + verifyBaseRowIDs(engine, tablePath, Seq(0)) + verifyDefaultRowCommitVersion(engine, tablePath, Seq(commitVersion1)) + verifyHighWatermark(engine, tablePath, 99) + + val dataBatch2 = generateData(testSchema, Seq.empty, Map.empty, 200, 1) + val commitVersion2 = appendData( + engine, + tablePath, + data = Seq(dataBatch2).map(Map.empty[String, Literal] -> _) + ).getVersion + + verifyBaseRowIDs(engine, tablePath, Seq(0, 100)) + verifyDefaultRowCommitVersion(engine, tablePath, Seq(commitVersion1, commitVersion2)) + verifyHighWatermark(engine, tablePath, 299) + } + } + + test("Base row IDs/default row commit versions are preserved in checkpoint") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithRowTrackingSupported(engine, tablePath) + + val dataBatch1 = generateData(testSchema, Seq.empty, Map.empty, 100, 1) + val dataBatch2 = generateData(testSchema, Seq.empty, Map.empty, 200, 1) + val dataBatch3 = generateData(testSchema, Seq.empty, Map.empty, 400, 1) + + val commitVersion1 = appendData( + engine, + tablePath, + data = Seq(dataBatch1).map(Map.empty[String, Literal] -> _) + ).getVersion + + val commitVersion2 = appendData( + engine, + tablePath, + data = Seq(dataBatch2).map(Map.empty[String, Literal] -> _) + ).getVersion + + // Checkpoint the table + val table = TableImpl.forPath(engine, tablePath) + val latestVersion = table.getLatestSnapshot(engine).getVersion(engine) + table.checkpoint(engine, latestVersion) + + val commitVersion3 = appendData( + engine, + tablePath, + data = Seq(dataBatch3).map(Map.empty[String, Literal] -> _) + ).getVersion + + verifyBaseRowIDs(engine, tablePath, Seq(0, 100, 300)) + verifyDefaultRowCommitVersion( + engine, + tablePath, + Seq(commitVersion1, commitVersion2, commitVersion3) + ) + verifyHighWatermark(engine, tablePath, 699) + } + } + + test("Fail if row tracking is supported but AddFile actions are missing stats") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithRowTrackingSupported(engine, tablePath) + + val addFile = new AddFile( + "fakePath", + VectorUtils.stringStringMapValue(new util.HashMap[String, String]()), + 0L, + 0L, + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() // No stats + ) + + val action = SingleAction.createAddFileSingleAction(addFile.toRow) + val txn = createTxn(engine, tablePath, isNewTable = false, testSchema, Seq.empty) + + // KernelException thrown inside a lambda is wrapped in a RuntimeException + val e = intercept[RuntimeException] { + txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(action).asJava.iterator()))) + } + assert( + e.getMessage.contains( + "All AddFile actions must have statistics that include the number of records " + + "when writing to a Delta table with the 'rowTracking' table feature supported" + ) + ) + } + } + + test("Fail if row tracking is supported but domain metadata is not supported") { + withTempDirAndEngine((tablePath, engine) => { + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + // Only 'rowTracking' is supported, not 'domainMetadata' + setWriterFeatureSupported(engine, tablePath, testSchema, Seq("rowTracking")) + + val dataBatch1 = generateData(testSchema, Seq.empty, Map.empty, 100, 1) + val e = intercept[KernelException] { + appendData( + engine, + tablePath, + data = Seq(dataBatch1).map(Map.empty[String, Literal] -> _) + ).getVersion + } + + assert( + e.getMessage + .contains("Feature 'rowTracking' is supported but 'domainMetadata' is unsupported") + ) + }) + } + + test("Integration test - Write table with Kernel then write with Spark") { + withTempDirAndEngine((tablePath, engine) => { + val tbl = "tbl" + withTable(tbl) { + val schema = new StructType().add("id", LONG) + createTableWithRowTrackingSupported(engine, tablePath, schema = schema) + + // Write table using Kernel + val dataBatch1 = generateData(schema, Seq.empty, Map.empty, 100, 1) // 100 rows + val dataBatch2 = generateData(schema, Seq.empty, Map.empty, 200, 1) // 200 rows + val dataBatch3 = generateData(schema, Seq.empty, Map.empty, 400, 1) // 400 rows + appendData( + engine, + tablePath, + schema = schema, + data = Seq(dataBatch1, dataBatch2, dataBatch3).map(Map.empty[String, Literal] -> _) + ).getVersion // version 2 + + // Verify the table state + verifyBaseRowIDs(engine, tablePath, Seq(0, 100, 300)) + verifyDefaultRowCommitVersion(engine, tablePath, Seq(2, 2, 2)) + verifyHighWatermark(engine, tablePath, 699) + + // Write 20, 80 rows to the table using Spark + spark.range(0, 20).write.format("delta").mode("append").save(tablePath) // version 3 + spark.range(20, 100).write.format("delta").mode("append").save(tablePath) // version 4 + + // Verify the table state + verifyBaseRowIDs(engine, tablePath, Seq(0, 100, 300, 700, 720)) + verifyDefaultRowCommitVersion(engine, tablePath, Seq(2, 2, 2, 3, 4)) + verifyHighWatermark(engine, tablePath, 799) + } + }) + } + + test("Integration test - Write table with Spark then write with Kernel") { + // TODO: Implement this test. Creating and writing a table using Spark with row tracking also + // enables the 'invariants' feature, which is not yet supported by the Kernel. + } +} From 28163426b8173911be3f18a34081bf6f3b217206 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Mon, 9 Dec 2024 15:47:00 +0100 Subject: [PATCH 2/5] Address pr comments --- .../internal/InternalScanFileUtils.java | 32 +++++++++++ .../delta/kernel/internal/SnapshotImpl.java | 10 ---- .../delta/kernel/internal/TableFeatures.java | 40 +++++++------- .../kernel/internal/TransactionImpl.java | 4 +- .../internal/rowtracking/RowTracking.java | 20 +++---- .../kernel/internal/TableFeaturesSuite.scala | 10 ++-- .../kernel/defaults/RowTrackingSuite.scala | 54 +++++++------------ 7 files changed, 88 insertions(+), 82 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java index ec0ec463a22..0756501dff5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java @@ -32,6 +32,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * Utilities to extract information out of the scan file rows returned by {@link @@ -87,6 +88,11 @@ private InternalScanFileUtils() {} private static final int ADD_FILE_DV_ORDINAL = ADD_FILE_SCHEMA.indexOf("deletionVector"); + private static final int ADD_FILE_BASE_ROW_ID_ORDINAL = ADD_FILE_SCHEMA.indexOf("baseRowId"); + + private static final int ADD_FILE_DEFAULT_ROW_COMMIT_VERSION = + ADD_FILE_SCHEMA.indexOf("defaultRowCommitVersion"); + private static final int TABLE_ROOT_ORDINAL = SCAN_FILE_SCHEMA.indexOf(TABLE_ROOT_COL_NAME); public static final int ADD_FILE_STATS_ORDINAL = AddFile.SCHEMA_WITH_STATS.indexOf("stats"); @@ -190,4 +196,30 @@ public static DeletionVectorDescriptor getDeletionVectorDescriptorFromRow(Row sc public static Column getPartitionValuesParsedRefInAddFile(String partitionColName) { return new Column(new String[] {"add", "partitionValues_parsed", partitionColName}); } + + /** + * Get the base row id from the given scan file row. + * + * @param scanFile {@link Row} representing one scan file. + * @return base row id if present, otherwise empty. + */ + public static Optional getBaseRowId(Row scanFile) { + Row addFile = getAddFileEntry(scanFile); + return addFile.isNullAt(ADD_FILE_BASE_ROW_ID_ORDINAL) + ? Optional.empty() + : Optional.of(addFile.getLong(ADD_FILE_BASE_ROW_ID_ORDINAL)); + } + + /** + * Get the default row commit version from the given scan file row. + * + * @param scanFile {@link Row} representing one scan file. + * @return default row commit version if present, otherwise empty. + */ + public static Optional getDefaultRowCommitVersion(Row scanFile) { + Row addFile = getAddFileEntry(scanFile); + return addFile.isNullAt(ADD_FILE_DEFAULT_ROW_COMMIT_VERSION) + ? Optional.empty() + : Optional.of(addFile.getLong(ADD_FILE_DEFAULT_ROW_COMMIT_VERSION)); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 54ac6563110..72ee67492fa 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -151,16 +151,6 @@ public LogSegment getLogSegment() { return logSegment; } - /** - * Returns the log replay object. Visible for testing, where we need to access all the active - * AddFiles for a snapshot. - * - * @return the {@link LogReplay} object - */ - public LogReplay getLogReplay() { - return logReplay; - } - public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { long minFileRetentionTimestampMillis = System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index e9953647646..bc8b7ad3b1c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -142,6 +142,14 @@ public static void validateWriteSupportedTable( throw unsupportedWriterFeature(tablePath, writerFeature); } } + // Eventually we may have a way to declare and enforce dependencies between features. + // By putting this check for row tracking here, it makes it easier to spot that row + // tracking defines such a dependency that can be implicitly checked. + if (isRowTrackingSupported(protocol) && !isDomainMetadataSupported(protocol)) { + throw new KernelException( + "Feature 'rowTracking' is supported and depends on feature `domainMetadata`" + + "but 'domainMetadata' is unsupported"); + } break; default: throw unsupportedWriterProtocol(tablePath, minWriterVersion); @@ -195,12 +203,7 @@ public static Set extractAutomaticallyEnabledWriterFeatures( * @return true if the "domainMetadata" feature is supported, false otherwise */ public static boolean isDomainMetadataSupported(Protocol protocol) { - List writerFeatures = protocol.getWriterFeatures(); - if (writerFeatures == null) { - return false; - } - return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME) - && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; + return isWriterFeatureSupported(protocol, DOMAIN_METADATA_FEATURE_NAME); } /** @@ -210,21 +213,7 @@ public static boolean isDomainMetadataSupported(Protocol protocol) { * @return true if the protocol supports row tracking, false otherwise */ public static boolean isRowTrackingSupported(Protocol protocol) { - List writerFeatures = protocol.getWriterFeatures(); - if (writerFeatures == null) { - return false; - } - boolean rowTrackingSupported = - writerFeatures.contains(ROW_TRACKING_FEATURE_NAME) - && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; - boolean domainMetadataSupported = isDomainMetadataSupported(protocol); - - if (rowTrackingSupported && !domainMetadataSupported) { - // This should not happen. Row tracking should automatically bring in domain metadata. - throw new KernelException( - "Feature 'rowTracking' is supported but 'domainMetadata' is unsupported"); - } - return rowTrackingSupported; + return isWriterFeatureSupported(protocol, ROW_TRACKING_FEATURE_NAME); } /** @@ -283,4 +272,13 @@ private static void validateNoInvariants(StructType tableSchema) { throw columnInvariantsNotSupported(); } } + + private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) { + List writerFeatures = protocol.getWriterFeatures(); + if (writerFeatures == null) { + return false; + } + return writerFeatures.contains(featureName) + && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 9b8601b26ce..4739074d7e2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -150,7 +150,9 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data // If row tracking is supported, assign base row IDs and default row commit versions to any // AddFile or RemoveFile actions that do not yet have them. If the row ID high watermark // changes, emit a DomainMetadata action. - RowTracking.updateHighWaterMark(protocol, readSnapshot, domainMetadatas, dataActions); + Optional highWaterMark = + RowTracking.createNewHighWaterMarkIfNeeded(protocol, readSnapshot, dataActions); + highWaterMark.ifPresent(domainMetadatas::add); dataActions = RowTracking.assignBaseRowIdAndDefaultRowCommitVersion( protocol, readSnapshot, commitAsVersion, dataActions); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java index 35843a4ef0e..b095099a261 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -24,7 +24,7 @@ import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.DataFileStatistics; import java.io.IOException; -import java.util.List; +import java.util.Optional; /** A collection of helper methods for working with row tracking. */ public class RowTracking { @@ -106,16 +106,12 @@ public CloseableIterator iterator() { * * @param protocol the protocol to check for row tracking support * @param snapshot the current snapshot of the table - * @param domainMetadatas the list of domain metadata actions to append to if needed * @param dataActions the iterable of data actions that may update the high watermark */ - public static void updateHighWaterMark( - Protocol protocol, - SnapshotImpl snapshot, - List domainMetadatas, - CloseableIterable dataActions) { + public static Optional createNewHighWaterMarkIfNeeded( + Protocol protocol, SnapshotImpl snapshot, CloseableIterable dataActions) { if (!TableFeatures.isRowTrackingSupported(protocol)) { - return; + return Optional.empty(); } final long prevRowIdHighWatermark = readRowIdHighWaterMark(snapshot); @@ -130,11 +126,9 @@ public static void updateHighWaterMark( } }); - // Emit a DomainMetadata action to update the high watermark if it has changed - if (newRowIdHighWatermark[0] != prevRowIdHighWatermark) { - domainMetadatas.add( - new RowTrackingMetadataDomain(newRowIdHighWatermark[0]).toDomainMetadata()); - } + return (newRowIdHighWatermark[0] != prevRowIdHighWatermark) + ? Optional.of(new RowTrackingMetadataDomain(newRowIdHighWatermark[0]).toDomainMetadata()) + : Optional.empty(); } /** diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index f373b2f2865..9ed288e5987 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -69,10 +69,14 @@ class TableFeaturesSuite extends AnyFunSuite { } Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening", - "domainMetadata", "rowTracking") - .foreach { supportedWriterFeature => + "domainMetadata", "rowTracking").foreach { supportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") { - checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature)) + val protocol = if (supportedWriterFeature == "rowTracking") { + createTestProtocol(minWriterVersion = 7, supportedWriterFeature, "domainMetadata") + } else { + createTestProtocol(minWriterVersion = 7, supportedWriterFeature) + } + checkSupported(protocol) } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala index 55d084d152f..50ca3735f94 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala @@ -19,7 +19,7 @@ import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions.KernelException import io.delta.kernel.expressions.Literal -import io.delta.kernel.internal.{SnapshotImpl, TableImpl} +import io.delta.kernel.internal.{SnapshotImpl, TableImpl, InternalScanFileUtils} import io.delta.kernel.internal.actions.{AddFile, Protocol, SingleAction} import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain @@ -65,21 +65,13 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { val table = TableImpl.forPath(engine, tablePath) val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] - val AddFileActionsBatches = snapshot.getLogReplay.getAddFilesAsColumnarBatches( - engine, - false, /* shouldReadStats */ - Optional.empty() - ).asScala - - val modificationTimeOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("modificationTime") - val baseRowIdOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("baseRowId") - - val sortedBaseRowIds = AddFileActionsBatches - .flatMap(_.getRows.asScala) - .toSeq - .map(_.getStruct(0)) - .sortBy(_.getLong(modificationTimeOrdinal)) - .map(_.getLong(baseRowIdOrdinal)) + val scanFileRows = collectScanFileRows( + snapshot.getScanBuilder(engine).build() + ) + val sortedBaseRowIds = scanFileRows + .map(InternalScanFileUtils.getBaseRowId) + .map(_.orElse(-1)) + .sorted assert(sortedBaseRowIds === expectedValue) } @@ -91,24 +83,15 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { val table = TableImpl.forPath(engine, tablePath) val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] - val AddFileActionsBatches = snapshot.getLogReplay.getAddFilesAsColumnarBatches( - engine, - false, /* shouldReadStats */ - Optional.empty() - ).asScala - - val modificationTimeOrdinal = AddFile.SCHEMA_WITHOUT_STATS.indexOf("modificationTime") - val defaultRowCommitVersionOrdinal = - AddFile.SCHEMA_WITHOUT_STATS.indexOf("defaultRowCommitVersion") - - val AddFileDefaultRowCommitVersionsSorted = AddFileActionsBatches - .flatMap(_.getRows.asScala) - .toSeq - .map(_.getStruct(0)) - .sortBy(_.getLong(modificationTimeOrdinal)) - .map(_.getLong(defaultRowCommitVersionOrdinal)) + val scanFileRows = collectScanFileRows( + snapshot.getScanBuilder(engine).build() + ) + val sortedAddFileDefaultRowCommitVersions = scanFileRows + .map(InternalScanFileUtils.getDefaultRowCommitVersion) + .map(_.orElse(-1)) + .sorted - assert(AddFileDefaultRowCommitVersionsSorted === expectedValue) + assert(sortedAddFileDefaultRowCommitVersions === expectedValue) } private def verifyHighWatermark(engine: Engine, tablePath: String, expectedValue: Long) = { @@ -261,7 +244,10 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { assert( e.getMessage - .contains("Feature 'rowTracking' is supported but 'domainMetadata' is unsupported") + .contains( + "Feature 'rowTracking' is supported and depends on feature `domainMetadata`" + + "but 'domainMetadata' is unsupported" + ) ) }) } From 895e0aee1d38be909d985112eb6d96ff2352e560 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Thu, 12 Dec 2024 14:36:42 +0100 Subject: [PATCH 3/5] Address PR comments --- .../io/delta/kernel/internal/DeltaErrors.java | 6 ++ .../delta/kernel/internal/TableFeatures.java | 5 +- .../kernel/internal/TransactionImpl.java | 17 +-- .../kernel/internal/actions/AddFile.java | 100 +++++++++++++----- .../internal/rowtracking/RowTracking.java | 55 +++++----- .../kernel/defaults/RowTrackingSuite.scala | 4 +- 6 files changed, 117 insertions(+), 70 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 47fec3751c5..37a7f8759d0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -299,6 +299,12 @@ public static KernelException rowIDAssignmentWithoutStats() { + "when writing to a Delta table with the 'rowTracking' table feature supported"); } + public static KernelException rowTrackingSupportedWithDomainMetadataUnsupported() { + return new KernelException( + "Feature 'rowTracking' is supported and depends on feature 'domainMetadata'," + + " but 'domainMetadata' is unsupported"); + } + /* ------------------------ HELPER METHODS ----------------------------- */ private static String formatTimestamp(long millisSinceEpochUTC) { return new Timestamp(millisSinceEpochUTC).toInstant().toString(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index bc8b7ad3b1c..b6f3bae8271 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -19,7 +19,6 @@ import static io.delta.kernel.internal.DeltaErrors.*; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; -import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.util.ColumnMapping; @@ -146,9 +145,7 @@ public static void validateWriteSupportedTable( // By putting this check for row tracking here, it makes it easier to spot that row // tracking defines such a dependency that can be implicitly checked. if (isRowTrackingSupported(protocol) && !isDomainMetadataSupported(protocol)) { - throw new KernelException( - "Feature 'rowTracking' is supported and depends on feature `domainMetadata`" - + "but 'domainMetadata' is unsupported"); + throw DeltaErrors.rowTrackingSupportedWithDomainMetadataUnsupported(); } break; default: diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 4739074d7e2..b54d74157a2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -148,14 +148,15 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine)); // If row tracking is supported, assign base row IDs and default row commit versions to any - // AddFile or RemoveFile actions that do not yet have them. If the row ID high watermark - // changes, emit a DomainMetadata action. - Optional highWaterMark = - RowTracking.createNewHighWaterMarkIfNeeded(protocol, readSnapshot, dataActions); - highWaterMark.ifPresent(domainMetadatas::add); - dataActions = - RowTracking.assignBaseRowIdAndDefaultRowCommitVersion( - protocol, readSnapshot, commitAsVersion, dataActions); + // AddFile actions that do not yet have them. If the row ID high watermark changes, emit a + // DomainMetadata action to update it. + if (TableFeatures.isRowTrackingSupported(protocol)) { + RowTracking.createNewHighWaterMarkIfNeeded(readSnapshot, dataActions) + .ifPresent(domainMetadatas::add); + dataActions = + RowTracking.assignBaseRowIdAndDefaultRowCommitVersion( + readSnapshot, commitAsVersion, dataActions); + } int numRetries = 0; do { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 58e4631ffb4..0978e7922b5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -16,10 +16,8 @@ package io.delta.kernel.internal.actions; import static io.delta.kernel.internal.util.InternalUtils.relativizePath; -import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap; import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; import io.delta.kernel.data.MapValue; @@ -27,12 +25,15 @@ import io.delta.kernel.expressions.Literal; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.InternalUtils; +import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.*; import io.delta.kernel.utils.DataFileStatistics; import io.delta.kernel.utils.DataFileStatus; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.IntStream; @@ -106,13 +107,10 @@ public static Row convertDataFileStatus( /** * Utility to generate an {@link AddFile} action from an 'AddFile' {@link Row}. * - * @param row the row to read - * @return the extracted {@link AddFile} action + * @throws NullPointerException if row is null */ public static AddFile fromRow(Row row) { - if (row == null) { - return null; - } + Objects.requireNonNull(row, "Cannot generate an AddFile action from a null row"); checkArgument( row.getSchema().equals(FULL_SCHEMA), @@ -121,15 +119,17 @@ public static AddFile fromRow(Row row) { row.getSchema()); return new AddFile( - requireNonNull(row, COL_NAME_TO_ORDINAL.get("path"), "path") + InternalUtils.requireNonNull(row, COL_NAME_TO_ORDINAL.get("path"), "path") .getString(COL_NAME_TO_ORDINAL.get("path")), - requireNonNull(row, COL_NAME_TO_ORDINAL.get("partitionValues"), "partitionValues") + InternalUtils.requireNonNull( + row, COL_NAME_TO_ORDINAL.get("partitionValues"), "partitionValues") .getMap(COL_NAME_TO_ORDINAL.get("partitionValues")), - requireNonNull(row, COL_NAME_TO_ORDINAL.get("size"), "size") + InternalUtils.requireNonNull(row, COL_NAME_TO_ORDINAL.get("size"), "size") .getLong(COL_NAME_TO_ORDINAL.get("size")), - requireNonNull(row, COL_NAME_TO_ORDINAL.get("modificationTime"), "modificationTime") + InternalUtils.requireNonNull( + row, COL_NAME_TO_ORDINAL.get("modificationTime"), "modificationTime") .getLong(COL_NAME_TO_ORDINAL.get("modificationTime")), - requireNonNull(row, COL_NAME_TO_ORDINAL.get("dataChange"), "dataChange") + InternalUtils.requireNonNull(row, COL_NAME_TO_ORDINAL.get("dataChange"), "dataChange") .getBoolean(COL_NAME_TO_ORDINAL.get("dataChange")), Optional.ofNullable( row.isNullAt(COL_NAME_TO_ORDINAL.get("deletionVector")) @@ -178,8 +178,8 @@ public AddFile( Optional baseRowId, Optional defaultRowCommitVersion, Optional stats) { - this.path = requireNonNull(path, "path is null"); - this.partitionValues = requireNonNull(partitionValues, "partitionValues is null"); + this.path = Objects.requireNonNull(path, "path is null"); + this.partitionValues = Objects.requireNonNull(partitionValues, "partitionValues is null"); this.size = size; this.modificationTime = modificationTime; this.dataChange = dataChange; @@ -220,12 +220,11 @@ public Optional getStats() { return stats; } - /** - * Creates a new AddFile instance with the specified base row ID. - * - * @param baseRowId the new base row ID to be assigned - * @return a new AddFile instance with the updated base row ID - */ + public Optional getNumRecords() { + return stats.map(DataFileStatistics::getNumRecords); + } + + /** Creates a new AddFile instance with the specified base row ID. */ public AddFile withNewBaseRowId(long baseRowId) { return new AddFile( path, @@ -240,12 +239,7 @@ public AddFile withNewBaseRowId(long baseRowId) { stats); } - /** - * Creates a new AddFile instance with the specified default row commit version. - * - * @param defaultRowCommitVersion the new default row commit version to be assigned - * @return a new AddFile instance with the updated default row commit version - */ + /** Creates a new AddFile instance with the specified default row commit version. */ public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) { return new AddFile( path, @@ -259,4 +253,58 @@ public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) { Optional.of(defaultRowCommitVersion), stats); } + + @Override + public String toString() { + // Explicitly convert the partitionValues and tags to Java Maps + Map partitionValuesJavaMap = VectorUtils.toJavaMap(this.partitionValues); + Optional> tagsJavaMap = this.tags.map(VectorUtils::toJavaMap); + + StringBuilder sb = new StringBuilder(); + sb.append("AddFile{"); + sb.append("path='").append(path).append('\''); + sb.append(", partitionValues=").append(partitionValuesJavaMap); + sb.append(", size=").append(size); + sb.append(", modificationTime=").append(modificationTime); + sb.append(", dataChange=").append(dataChange); + sb.append(", deletionVector=").append(deletionVector); + sb.append(", tags=").append(tagsJavaMap); + sb.append(", baseRowId=").append(baseRowId); + sb.append(", defaultRowCommitVersion=").append(defaultRowCommitVersion); + sb.append(", stats=").append(stats); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof AddFile)) return false; + AddFile other = (AddFile) obj; + return size == other.size + && modificationTime == other.modificationTime + && dataChange == other.dataChange + && Objects.equals(path, other.path) + && Objects.equals(partitionValues, other.partitionValues) + && Objects.equals(deletionVector, other.deletionVector) + && Objects.equals(tags, other.tags) + && Objects.equals(baseRowId, other.baseRowId) + && Objects.equals(defaultRowCommitVersion, other.defaultRowCommitVersion) + && Objects.equals(stats, other.stats); + } + + @Override + public int hashCode() { + return Objects.hash( + path, + partitionValues, + size, + modificationTime, + dataChange, + deletionVector, + tags, + baseRowId, + defaultRowCommitVersion, + stats); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java index b095099a261..508a3566e96 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -15,6 +15,8 @@ */ package io.delta.kernel.internal.rowtracking; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + import io.delta.kernel.data.Row; import io.delta.kernel.internal.DeltaErrors; import io.delta.kernel.internal.SnapshotImpl; @@ -22,9 +24,9 @@ import io.delta.kernel.internal.actions.*; import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; -import io.delta.kernel.utils.DataFileStatistics; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; /** A collection of helper methods for working with row tracking. */ public class RowTracking { @@ -43,7 +45,6 @@ private RowTracking() { * watermark and increments the watermark accordingly. If a default row commit version is missing, * it assigns the provided commit version. * - * @param protocol the protocol to check for row tracking support * @param snapshot the current snapshot of the table * @param commitVersion the version of the commit for default row commit version assignment * @param dataActions the {@link CloseableIterable} of data actions to process @@ -51,13 +52,11 @@ private RowTracking() { * versions assigned */ public static CloseableIterable assignBaseRowIdAndDefaultRowCommitVersion( - Protocol protocol, - SnapshotImpl snapshot, - long commitVersion, - CloseableIterable dataActions) { - if (!TableFeatures.isRowTrackingSupported(protocol)) { - return dataActions; - } + SnapshotImpl snapshot, long commitVersion, CloseableIterable dataActions) { + checkArgument( + TableFeatures.isRowTrackingSupported(snapshot.getProtocol()), + "Base row ID and default row commit version are assigned " + + "only when feature 'rowTracking' is supported."); return new CloseableIterable() { @Override @@ -68,8 +67,8 @@ public void close() throws IOException { @Override public CloseableIterator iterator() { // Used to keep track of the current high watermark as we iterate through the data actions. - // Use a one-element array here to allow for mutation within the lambda. - final long[] currRowIdHighWatermark = {readRowIdHighWaterMark(snapshot)}; + // Use an AtomicLong to allow for updating the high watermark in the lambda. + final AtomicLong currRowIdHighWatermark = new AtomicLong(readRowIdHighWaterMark(snapshot)); return dataActions .iterator() .map( @@ -83,9 +82,9 @@ public CloseableIterator iterator() { // Assign base row ID if missing if (!addFile.getBaseRowId().isPresent()) { - final long numRecords = getNumRecords(addFile); - addFile = addFile.withNewBaseRowId(currRowIdHighWatermark[0] + 1L); - currRowIdHighWatermark[0] += numRecords; + final long numRecords = getNumRecordsOrThrow(addFile); + addFile = addFile.withNewBaseRowId(currRowIdHighWatermark.get() + 1L); + currRowIdHighWatermark.addAndGet(numRecords); } // Assign default row commit version if missing @@ -104,30 +103,29 @@ public CloseableIterator iterator() { * Emits a {@link DomainMetadata} action if the row ID high watermark has changed due to newly * processed {@link AddFile} actions. * - * @param protocol the protocol to check for row tracking support * @param snapshot the current snapshot of the table * @param dataActions the iterable of data actions that may update the high watermark */ public static Optional createNewHighWaterMarkIfNeeded( - Protocol protocol, SnapshotImpl snapshot, CloseableIterable dataActions) { - if (!TableFeatures.isRowTrackingSupported(protocol)) { - return Optional.empty(); - } + SnapshotImpl snapshot, CloseableIterable dataActions) { + checkArgument( + TableFeatures.isRowTrackingSupported(snapshot.getProtocol()), + "Row ID high watermark is updated only when feature 'rowTracking' is supported."); final long prevRowIdHighWatermark = readRowIdHighWaterMark(snapshot); - // Use a one-element array here to allow for mutation within the lambda. - final long[] newRowIdHighWatermark = {prevRowIdHighWatermark}; + // Use an AtomicLong to allow for updating the high watermark in the lambda + final AtomicLong newRowIdHighWatermark = new AtomicLong(prevRowIdHighWatermark); dataActions.forEach( row -> { if (!row.isNullAt(ADD_FILE_ORDINAL)) { - newRowIdHighWatermark[0] += - getNumRecords(AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL))); + newRowIdHighWatermark.addAndGet( + getNumRecordsOrThrow(AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL)))); } }); - return (newRowIdHighWatermark[0] != prevRowIdHighWatermark) - ? Optional.of(new RowTrackingMetadataDomain(newRowIdHighWatermark[0]).toDomainMetadata()) + return (newRowIdHighWatermark.get() != prevRowIdHighWatermark) + ? Optional.of(new RowTrackingMetadataDomain(newRowIdHighWatermark.get()).toDomainMetadata()) : Optional.empty(); } @@ -151,10 +149,7 @@ public static long readRowIdHighWaterMark(SnapshotImpl snapshot) { * @param addFile the AddFile action * @return the number of records */ - private static long getNumRecords(AddFile addFile) { - return addFile - .getStats() - .map(DataFileStatistics::getNumRecords) - .orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats); + private static long getNumRecordsOrThrow(AddFile addFile) { + return addFile.getNumRecords().orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats); } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala index 50ca3735f94..8e03e4074e4 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala @@ -245,8 +245,8 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { assert( e.getMessage .contains( - "Feature 'rowTracking' is supported and depends on feature `domainMetadata`" - + "but 'domainMetadata' is unsupported" + "Feature 'rowTracking' is supported and depends on feature 'domainMetadata'," + + " but 'domainMetadata' is unsupported" ) ) }) From 86d83ceec2f7317313701b94b617a09d49824efb Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Tue, 17 Dec 2024 16:39:47 +0100 Subject: [PATCH 4/5] Address PR comments - 3 --- .../io/delta/kernel/internal/DeltaErrors.java | 5 +++-- .../delta/kernel/internal/TableFeatures.java | 2 -- .../kernel/internal/actions/AddFile.java | 4 +++- .../internal/rowtracking/RowTracking.java | 20 ++++++++++--------- .../kernel/defaults/RowTrackingSuite.scala | 7 ++++--- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 37a7f8759d0..e2aa1cc2fab 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -295,8 +295,9 @@ public static ConcurrentWriteException concurrentDomainMetadataAction( public static KernelException rowIDAssignmentWithoutStats() { return new KernelException( - "All AddFile actions must have statistics that include the number of records " - + "when writing to a Delta table with the 'rowTracking' table feature supported"); + "Cannot write to a rowTracking-supported table without 'numRecord' statistics. " + + "Connectors are expected to populate the number of records statistics when " + + "writing to a Delta table with 'rowTracking' table feature supported."); } public static KernelException rowTrackingSupportedWithDomainMetadataUnsupported() { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index b6f3bae8271..ef6b0474b6e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -59,10 +59,8 @@ public class TableFeatures { } }); - /** The feature name for domain metadata. */ public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; - /** The feature name for row tracking. */ public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking"; /** The minimum writer version required to support table features. */ diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 0978e7922b5..fceb97ad18d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.TreeMap; import java.util.stream.IntStream; /** Delta log action representing an `AddFile` */ @@ -258,12 +259,13 @@ public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) { public String toString() { // Explicitly convert the partitionValues and tags to Java Maps Map partitionValuesJavaMap = VectorUtils.toJavaMap(this.partitionValues); + Map sortedPartitionValuesJavaMap = new TreeMap<>(partitionValuesJavaMap); Optional> tagsJavaMap = this.tags.map(VectorUtils::toJavaMap); StringBuilder sb = new StringBuilder(); sb.append("AddFile{"); sb.append("path='").append(path).append('\''); - sb.append(", partitionValues=").append(partitionValuesJavaMap); + sb.append(", partitionValues=").append(sortedPartitionValuesJavaMap); sb.append(", size=").append(size); sb.append(", modificationTime=").append(modificationTime); sb.append(", dataChange=").append(dataChange); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java index 508a3566e96..1295bba0c70 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -38,12 +38,11 @@ private RowTracking() { /** * Assigns base row IDs and default row commit versions to {@link AddFile} actions before - * committing. If row tracking is not supported by the given protocol, returns the input actions - * unchanged. + * committing them. This method should be called only when the 'rowTracking' feature is supported. * - *

    When needed, it sets each {@link AddFile} action’s base row ID based on the current high - * watermark and increments the watermark accordingly. If a default row commit version is missing, - * it assigns the provided commit version. + *

    It sets each {@link AddFile} action’s base row ID based on the current high watermark and + * increments the watermark accordingly. If a default row commit version is missing, it assigns + * the provided commit version. * * @param snapshot the current snapshot of the table * @param commitVersion the version of the commit for default row commit version assignment @@ -100,8 +99,9 @@ public CloseableIterator iterator() { } /** - * Emits a {@link DomainMetadata} action if the row ID high watermark has changed due to newly - * processed {@link AddFile} actions. + * Returns a {@link DomainMetadata} action if the row ID high watermark has changed due to newly + * processed {@link AddFile} actions. This method should be called only when the 'rowTracking' + * feature is supported. * * @param snapshot the current snapshot of the table * @param dataActions the iterable of data actions that may update the high watermark @@ -119,8 +119,10 @@ public static Optional createNewHighWaterMarkIfNeeded( dataActions.forEach( row -> { if (!row.isNullAt(ADD_FILE_ORDINAL)) { - newRowIdHighWatermark.addAndGet( - getNumRecordsOrThrow(AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL)))); + AddFile addFile = AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL)); + if (!addFile.getBaseRowId().isPresent()) { + newRowIdHighWatermark.addAndGet(getNumRecordsOrThrow(addFile)); + } } }); diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala index 8e03e4074e4..77351021afb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala @@ -38,7 +38,7 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { private def setWriterFeatureSupported( engine: Engine, tablePath: String, - schema: StructType = testSchema, + schema: StructType, writerFeatures: Seq[String]): Unit = { val protocol = new Protocol( 3, // minReaderVersion @@ -219,8 +219,9 @@ class RowTrackingSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { } assert( e.getMessage.contains( - "All AddFile actions must have statistics that include the number of records " + - "when writing to a Delta table with the 'rowTracking' table feature supported" + "Cannot write to a rowTracking-supported table without 'numRecord' statistics. " + + "Connectors are expected to populate the number of records statistics when " + + "writing to a Delta table with 'rowTracking' table feature supported." ) ) } From 0fd9d1b4bd8a6a3b112b37d985231259ef51ddd9 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Tue, 17 Dec 2024 16:53:03 +0100 Subject: [PATCH 5/5] Rename the error to missingNumRecordsStatsForRowTracking for consistency --- .../src/main/java/io/delta/kernel/internal/DeltaErrors.java | 2 +- .../java/io/delta/kernel/internal/rowtracking/RowTracking.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index e2aa1cc2fab..c13767080fb 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -293,7 +293,7 @@ public static ConcurrentWriteException concurrentDomainMetadataAction( return new ConcurrentWriteException(message); } - public static KernelException rowIDAssignmentWithoutStats() { + public static KernelException missingNumRecordsStatsForRowTracking() { return new KernelException( "Cannot write to a rowTracking-supported table without 'numRecord' statistics. " + "Connectors are expected to populate the number of records statistics when " diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java index 1295bba0c70..3d90f192d22 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -152,6 +152,6 @@ public static long readRowIdHighWaterMark(SnapshotImpl snapshot) { * @return the number of records */ private static long getNumRecordsOrThrow(AddFile addFile) { - return addFile.getNumRecords().orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats); + return addFile.getNumRecords().orElseThrow(DeltaErrors::missingNumRecordsStatsForRowTracking); } }