Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Assign base row ID and default row commit version to AddFile #3894

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

qiyuandong-db
Copy link
Contributor

@qiyuandong-db qiyuandong-db commented Nov 21, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR implements the first part of row tracking support in Delta Kernel, based on the Delta Protocol. Specifically, it includes the following changes:

  • add baseRowId, defaultRowCommitVersion fields to AddFile and RemoveFile actions
  • implement functionality to assign baseRowId and defaultRowCommitVersion to AddFile actions prior to committing them
  • maintain the rowIdHighWaterMark of the delta.rowTracking metadata domain during the base row ID assignment, which is the highest assigned fresh row id for the table

It doesn't include:

  • resolving conflicts with transactions that assigned overlapping Row IDs and Commit Versions
  • assignment of the physical row ID column names at table creation

How was this patch tested?

Added tests in RowTrackingSuite.scala. This includes unit tests and integration tests with Delta-Spark.

Does this PR introduce any user-facing changes?

No.

@qiyuandong-db qiyuandong-db force-pushed the delta-kernel-row-tracking branch from 54b77cc to 75c0005 Compare November 21, 2024 15:09
Copy link
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few things missing from the row tracking spec, see my comment

Comment on lines 307 to 300
return new KernelException(
"Cannot assign baseRowId to add action. "
+ "The number of records in this data file is missing.");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, this can be an issue: if connectors don't populate numRecords stats in the addFile action that are committed, the commit will fail if row tracking is supported (note that this is still better than today where we always fail in that case since we don't support row tracking.

Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords? Are connectors that implement writes today (if any) populating numRecords?

In any case, I would word the exception so that it puts the burden more on the connector, for example:
"All add actions must have statistics that include the number of records when writing to a Delta table with the RowTracking table feature enabled."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords? Are connectors that implement writes today (if any) populating numRecords?

cc @vkorukanti thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect the connector to populate the numRecords and other stats as it is a heavy operation and we don't want to do that in Kernel. If some protocol feature (e.g., icebergCompatV2) requires stats and they are missing in the DataFileStatus, we throw errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on making the error message more explicit about (1) what is unsupported (cannot write to row tracking table)(2) why it is unsupported (requires numRecords to be populated in stats) (3) who is responsible/what needs to be updated for support (not supported by this kernel integration/engine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I’ve updated the error message to better explain the situation and our expectations.

So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords ?

I'm not sure if this should go into protocol. The numRecords is used for assigning baseRowId, but we might have alternative ways to obtain this information (e.g. passing it directly from the connector, or potentially computing it within the Kernel ourselves). It seems more like an expectation for connectors to provide this data.

@qiyuandong-db qiyuandong-db force-pushed the delta-kernel-row-tracking branch from 75c0005 to c584707 Compare December 7, 2024 21:46
@qiyuandong-db qiyuandong-db changed the title [Kernel] Assign base row ID to AddFile actions [Kernel] Assign base row ID and default row commit version to AddFile Dec 7, 2024
@qiyuandong-db qiyuandong-db force-pushed the delta-kernel-row-tracking branch from c584707 to e94da89 Compare December 9, 2024 08:28
* @return an {@link CloseableIterable} of data actions with base row IDs and default row commit
* versions assigned
*/
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible approach is to assign baseRowId and defaultRowCommitVersion when creating AddFile actions.

However, this would duplicate logic across all places where AddFile actions are created (currently just Transaction.generateAppendActions, but potentially more in the future I guess). Plus, baseRowId assignment is stateful, making it tricky if AddFile actions are created in multiple steps/places.

I chose to handle it here for now to keep things centralized, even though it requires converting AddFile rows back to actions, updating them, and converting back to rows.

Comment on lines +159 to +168
private final String path;
private final MapValue partitionValues;
private final long size;
private final long modificationTime;
private final boolean dataChange;
private final Optional<DeletionVectorDescriptor> deletionVector;
private final Optional<MapValue> tags;
private final Optional<Long> baseRowId;
private final Optional<Long> defaultRowCommitVersion;
private final Optional<DataFileStatistics> stats;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moves this first in the class

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies -- what does this comment mean? Having the static methods first, then the static variables, then the member variables and constructor LGTM

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the ordering convention used in kernel, I would typically put all variables (static/member) before any method definition but that's personal and we should follow whatever convention kernel uses.

What tripped me initially is that AddFile is essentially a record and these member fields are arguably the most important part of the class to make sense of the rest but they are lost in the middle of the file

@qiyuandong-db qiyuandong-db force-pushed the delta-kernel-row-tracking branch from 954641b to 2816342 Compare December 9, 2024 15:14
Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks for making this. Left some comments.

Comment on lines +159 to +168
private final String path;
private final MapValue partitionValues;
private final long size;
private final long modificationTime;
private final boolean dataChange;
private final Optional<DeletionVectorDescriptor> deletionVector;
private final Optional<MapValue> tags;
private final Optional<Long> baseRowId;
private final Optional<Long> defaultRowCommitVersion;
private final Optional<DataFileStatistics> stats;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies -- what does this comment mean? Having the static methods first, then the static variables, then the member variables and constructor LGTM

* @param addFile the AddFile action
* @return the number of records
*/
private static long getNumRecords(AddFile addFile) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put this onto the AddFIle itself? This feels a lot like c and struct like code. Putting this into AddFile would make the code more cohesive

Copy link
Contributor Author

@qiyuandong-db qiyuandong-db Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to make it throw rowIDAssignmentWithoutStats in case of missing stats to simply some row tracking code, which is very specific to row tracking and would be inappropriate for a getter method of AddFile itself.

Now I've put the getter public Optional<Long> getNumRecords() to the AddFile itself, and have a helper method in RowTracking.java:

private static long getNumRecordsOrThrow(AddFile addFile) {
    return addFile.getNumRecords().orElseThrow(DeltaErrors::rowIDAssignmentWithoutStats);
}

Does this look good to you?

* @param protocol the protocol to check
* @return true if the protocol supports row tracking, false otherwise
*/
public static boolean isRowTrackingSupported(Protocol protocol) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: where (probably in future PRs?) will you check that RowTracking is enabled (which is strictly stronger than supported?

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not be checking whether row tracking is enabled in this PR or in planned future ones. The current goal is to add a minimal implementation to ensure row tracking is supported in Delta Kernel. Addressing the enabled requirement is outside the current scope.

I imagine that in the future, we may need to check it is enabled when 1) reconstructing stable row ID / row commit version during reads, and 2) preserving stable row ID / row commit version during writes (e.g., for handling UPDATE and DELETE operations).

.orElse(null)));
}

private final String path;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti -- were we trying to avoid materializing all of the AddFile fields in memory into a POJO?

the addfile is materialized into a pojo inside of RowTracking.java below like so:

                  AddFile addFile = AddFile.fromRow(row.getStruct(ADD_FILE_ORDINAL));

                  // Assign base row ID if missing
                  if (!addFile.getBaseRowId().isPresent()) {
                    final long numRecords = getNumRecordsOrThrow(addFile);
                    addFile = addFile.withNewBaseRowId(currRowIdHighWatermark.get() + 1L);
                    currRowIdHighWatermark.addAndGet(numRecords);
                  }

                  // Assign default row commit version if missing
                  if (!addFile.getDefaultRowCommitVersion().isPresent()) {
                    addFile = addFile.withNewDefaultRowCommitVersion(commitVersion);
                  }

                  // Return a new AddFile row with assigned baseRowId/defaultRowCommitVersion
                  return SingleAction.createAddFileSingleAction(addFile.toRow());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a better way here, to implement AddFile without materializing all of the values, and just pointing to the underlying row

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can solve this problem like so:

Step 1: Create a DelegateRow class

public abstract class DelegateRow implements Row {
    private final Row delegate;

    public DelegateRow(Row delegate) {
        this.delegate = delegate;
    }

    @Override
    public StructType getSchema() {
        return baseRow.getSchema();
    }

    // implement all Row interfaces using the delegate
}

Step 2: Implement AddFile class without materializing everything

class AddFile {
  private final Row row;

  public AddFile(Row row) {
    this.row = row;

    this.parsedPartitionValues = ...
  }

  public String getPath() {
    return row.getString(COL_NAME_TO_ORDINAL.get("path"));
  }

  // ... implement getters by referencing the row ...

  public AddFile withNewBaseRowId(long baseRowId) {
    Row updatedRow = new DelegateRow(row) {
      @Override
      public long getLong(int ordinal) {
        if (ordinal == COL_NAME_TO_ORDINAL.get("baseRowId")) {
          return baseRowId;
        }
        return super.getLong(ordinal);
      }

      @Override
      public boolean isNullAt(int ordinal) {
        if (ordinal == COL_NAME_TO_ORDINAL.get("baseRowId")) {
          return false; // baseRowId is now defined
        }
        return super.isNullAt(ordinal);
      }
    };
    return new AddFile(updatedRow);
  }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to implement DelegateRow is to pass in a map of overrides ... I'd have to think more of the tradeoffs between the two designs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue that we now need to modify an add file? Instead of just previously accessing the values/writing them as is?

I think this is important to think about from generic rust/java standpoint as well. Like if we have some generic chunk of engine data, and we need to change/update/add to it, how do we do so?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unblock this change and go forward with the current approach while we're figuring things out?

I agree it's not ideal to materialize AddFiles, but we only do it when row tracking is used so this PR is still a strict improvement over the current situation since tables with row tracking are not readable by kernel today.

Avoiding materializing actions to mutate them is then an optimization. It may also be useful for other features in the future

Copy link
Collaborator

@scottsand-db scottsand-db Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @johanl-db and @qiyuandong-db -- I think we need to pause work on this PR and take a step back to really plan this out. I think that the Kernel folks need to decide how we want (or further, if we want) Rows to be created and updated.

For example, another way to accomplish this would be to create an expression that transforms the row to create a new one with whatever values we want.

We can chat more on Slack to figure out next steps here. Great work on this PR and I'm glad that this PR has made us think quite hard about our data model and integration with engine connectors.

StringBuilder sb = new StringBuilder();
sb.append("AddFile{");
sb.append("path='").append(path).append('\'');
sb.append(", partitionValues=").append(partitionValuesJavaMap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that partitionValues would be better printed out as the in-order part1=value1, part2=value2 string

this map has no order guarantees, it could be confusing if we printed out { part2=value2, part1=value1 }

The updated to AddFile alone, btw, would warrant their own PR. You could add an AddFileSuite to test these methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that partitionValues would be better printed out as the in-order part1=value1, part2=value2 string

Yes, I’ll use a TreeMap to ensure the order.

The updated to AddFile alone, btw, would warrant their own PR. You could add an AddFileSuite to test these methods

I agree the updates to AddFile alone do need their own PR. My initial intention was to combine them to better showcase how AddFile is used in row tracking and to justify the changes. I can prepare a separate PR for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants