-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Assign base row ID and default row commit version to AddFile #3894
base: master
Are you sure you want to change the base?
[Kernel] Assign base row ID and default row commit version to AddFile #3894
Conversation
54b77cc
to
75c0005
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few things missing from the row tracking spec, see my comment
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
return new KernelException( | ||
"Cannot assign baseRowId to add action. " | ||
+ "The number of records in this data file is missing."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, this can be an issue: if connectors don't populate numRecords
stats in the addFile action that are committed, the commit will fail if row tracking is supported (note that this is still better than today where we always fail in that case since we don't support row tracking.
Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords
? Are connectors that implement writes today (if any) populating numRecords
?
In any case, I would word the exception so that it puts the burden more on the connector, for example:
"All add actions must have statistics that include the number of records when writing to a Delta table with the RowTracking table feature enabled."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords? Are connectors that implement writes today (if any) populating numRecords?
cc @vkorukanti thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect the connector to populate the numRecords
and other stats as it is a heavy operation and we don't want to do that in Kernel. If some protocol feature (e.g., icebergCompatV2) requires stats and they are missing in the DataFileStatus
, we throw errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on making the error message more explicit about (1) what is unsupported (cannot write to row tracking table)(2) why it is unsupported (requires numRecords to be populated in stats) (3) who is responsible/what needs to be updated for support (not supported by this kernel integration/engine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I’ve updated the error message to better explain the situation and our expectations.
So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords ?
I'm not sure if this should go into protocol. The numRecords
is used for assigning baseRowId
, but we might have alternative ways to obtain this information (e.g. passing it directly from the connector, or potentially computing it within the Kernel ourselves). It seems more like an expectation for connectors to provide this data.
75c0005
to
c584707
Compare
c584707
to
e94da89
Compare
* @return an {@link CloseableIterable} of data actions with base row IDs and default row commit | ||
* versions assigned | ||
*/ | ||
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible approach is to assign baseRowId
and defaultRowCommitVersion
when creating AddFile
actions.
However, this would duplicate logic across all places where AddFile
actions are created (currently just Transaction.generateAppendActions
, but potentially more in the future I guess). Plus, baseRowId
assignment is stateful, making it tricky if AddFile
actions are created in multiple steps/places.
I chose to handle it here for now to keep things centralized, even though it requires converting AddFile
rows back to actions, updating them, and converting back to rows.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
private final String path; | ||
private final MapValue partitionValues; | ||
private final long size; | ||
private final long modificationTime; | ||
private final boolean dataChange; | ||
private final Optional<DeletionVectorDescriptor> deletionVector; | ||
private final Optional<MapValue> tags; | ||
private final Optional<Long> baseRowId; | ||
private final Optional<Long> defaultRowCommitVersion; | ||
private final Optional<DataFileStatistics> stats; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moves this first in the class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies -- what does this comment mean? Having the static methods first, then the static variables, then the member variables and constructor LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know the ordering convention used in kernel, I would typically put all variables (static/member) before any method definition but that's personal and we should follow whatever convention kernel uses.
What tripped me initially is that AddFile
is essentially a record and these member fields are arguably the most important part of the class to make sense of the rest but they are lost in the middle of the file
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala
Outdated
Show resolved
Hide resolved
954641b
to
2816342
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for making this. Left some comments.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Show resolved
Hide resolved
private final String path; | ||
private final MapValue partitionValues; | ||
private final long size; | ||
private final long modificationTime; | ||
private final boolean dataChange; | ||
private final Optional<DeletionVectorDescriptor> deletionVector; | ||
private final Optional<MapValue> tags; | ||
private final Optional<Long> baseRowId; | ||
private final Optional<Long> defaultRowCommitVersion; | ||
private final Optional<DataFileStatistics> stats; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies -- what does this comment mean? Having the static methods first, then the static variables, then the member variables and constructor LGTM
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
* @param addFile the AddFile action | ||
* @return the number of records | ||
*/ | ||
private static long getNumRecords(AddFile addFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put this onto the AddFIle itself? This feels a lot like c
and struct
like code. Putting this into AddFile would make the code more cohesive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea was to make it throw rowIDAssignmentWithoutStats
in case of missing stats to simply some row tracking code, which is very specific to row tracking and would be inappropriate for a getter method of AddFile
itself.
Now I've put the getter public Optional<Long> getNumRecords()
to the AddFile
itself, and have a helper method in RowTracking.java
:
private static long getNumRecordsOrThrow(AddFile addFile) {
return addFile.getNumRecords().orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats);
}
Does this look good to you?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
700a46f
to
895e0ae
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
* @param protocol the protocol to check | ||
* @return true if the protocol supports row tracking, false otherwise | ||
*/ | ||
public static boolean isRowTrackingSupported(Protocol protocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: where (probably in future PRs?) will you check that RowTracking is enabled
(which is strictly stronger than supported
?
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not be checking whether row tracking is enabled
in this PR or in planned future ones. The current goal is to add a minimal implementation to ensure row tracking is supported
in Delta Kernel. Addressing the enabled requirement is outside the current scope.
I imagine that in the future, we may need to check it is enabled when 1) reconstructing stable row ID / row commit version during reads, and 2) preserving stable row ID / row commit version during writes (e.g., for handling UPDATE and DELETE operations).
.orElse(null))); | ||
} | ||
|
||
private final String path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti -- were we trying to avoid materializing all of the AddFile fields in memory into a POJO?
the addfile is materialized into a pojo inside of RowTracking.java
below like so:
AddFile addFile = AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL));
// Assign base row ID if missing
if (!addFile.getBaseRowId().isPresent()) {
final long numRecords = getNumRecordsOrThrow(addFile);
addFile = addFile.withNewBaseRowId(currRowIdHighWatermark.get() + 1L);
currRowIdHighWatermark.addAndGet(numRecords);
}
// Assign default row commit version if missing
if (!addFile.getDefaultRowCommitVersion().isPresent()) {
addFile = addFile.withNewDefaultRowCommitVersion(commitVersion);
}
// Return a new AddFile row with assigned baseRowId/defaultRowCommitVersion
return SingleAction.createAddFileSingleAction(addFile.toRow());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a better way here, to implement AddFile without materializing all of the values, and just pointing to the underlying row
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can solve this problem like so:
Step 1: Create a DelegateRow class
public abstract class DelegateRow implements Row {
private final Row delegate;
public DelegateRow(Row delegate) {
this.delegate = delegate;
}
@Override
public StructType getSchema() {
return baseRow.getSchema();
}
// implement all Row interfaces using the delegate
}
Step 2: Implement AddFile class without materializing everything
class AddFile {
private final Row row;
public AddFile(Row row) {
this.row = row;
this.parsedPartitionValues = ...
}
public String getPath() {
return row.getString(COL_NAME_TO_ORDINAL.get("path"));
}
// ... implement getters by referencing the row ...
public AddFile withNewBaseRowId(long baseRowId) {
Row updatedRow = new DelegateRow(row) {
@Override
public long getLong(int ordinal) {
if (ordinal == COL_NAME_TO_ORDINAL.get("baseRowId")) {
return baseRowId;
}
return super.getLong(ordinal);
}
@Override
public boolean isNullAt(int ordinal) {
if (ordinal == COL_NAME_TO_ORDINAL.get("baseRowId")) {
return false; // baseRowId is now defined
}
return super.isNullAt(ordinal);
}
};
return new AddFile(updatedRow);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to implement DelegateRow is to pass in a map of overrides ... I'd have to think more of the tradeoffs between the two designs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the issue that we now need to modify an add file? Instead of just previously accessing the values/writing them as is?
I think this is important to think about from generic rust/java standpoint as well. Like if we have some generic chunk of engine data, and we need to change/update/add to it, how do we do so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we unblock this change and go forward with the current approach while we're figuring things out?
I agree it's not ideal to materialize AddFiles, but we only do it when row tracking is used so this PR is still a strict improvement over the current situation since tables with row tracking are not readable by kernel today.
Avoiding materializing actions to mutate them is then an optimization. It may also be useful for other features in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @johanl-db and @qiyuandong-db -- I think we need to pause work on this PR and take a step back to really plan this out. I think that the Kernel folks need to decide how we want (or further, if we want) Rows to be created and updated.
For example, another way to accomplish this would be to create an expression that transforms the row to create a new one with whatever values we want.
We can chat more on Slack to figure out next steps here. Great work on this PR and I'm glad that this PR has made us think quite hard about our data model and integration with engine connectors.
StringBuilder sb = new StringBuilder(); | ||
sb.append("AddFile{"); | ||
sb.append("path='").append(path).append('\''); | ||
sb.append(", partitionValues=").append(partitionValuesJavaMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that partitionValues would be better printed out as the in-order part1=value1, part2=value2
string
this map has no order guarantees, it could be confusing if we printed out { part2=value2, part1=value1 }
The updated to AddFile alone, btw, would warrant their own PR. You could add an AddFileSuite
to test these methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that partitionValues would be better printed out as the in-order
part1=value1, part2=value2
string
Yes, I’ll use a TreeMap
to ensure the order.
The updated to AddFile alone, btw, would warrant their own PR. You could add an
AddFileSuite
to test these methods
I agree the updates to AddFile
alone do need their own PR. My initial intention was to combine them to better showcase how AddFile
is used in row tracking and to justify the changes. I can prepare a separate PR for it.
Which Delta project/connector is this regarding?
Description
This PR implements the first part of row tracking support in Delta Kernel, based on the Delta Protocol. Specifically, it includes the following changes:
baseRowId
,defaultRowCommitVersion
fields toAddFile
andRemoveFile
actionsbaseRowId
anddefaultRowCommitVersion
toAddFile
actions prior to committing themrowIdHighWaterMark
of thedelta.rowTracking
metadata domain during the base row ID assignment, which is the highest assigned fresh row id for the tableIt doesn't include:
How was this patch tested?
Added tests in
RowTrackingSuite.scala
. This includes unit tests and integration tests with Delta-Spark.Does this PR introduce any user-facing changes?
No.