Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update data ingestion overview about batch #55

Merged
merged 16 commits into from
Nov 25, 2024
Merged

Update data ingestion overview about batch #55

merged 16 commits into from
Nov 25, 2024

Conversation

WanYixian
Copy link
Collaborator

@WanYixian WanYixian commented Nov 15, 2024

Description

Update data ingestion overview

Related Doc issue

Resolve #53


## Batching strategy for file sink

The batching strategy ensures that the contents within a chunk are not further split, which enables the decoupling of the file sink. Currently, the batching strategy is available for Parquet encode.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wcy-fdu, wanna a double check if the batching strategy is available for only parquet or both parquet and json? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all supported encode: json, csv and parquet.

@WanYixian WanYixian marked this pull request as ready for review November 18, 2024 08:05
@WanYixian WanYixian requested a review from wcy-fdu November 18, 2024 08:06
Copy link
Contributor

@wcy-fdu wcy-fdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

We can highlight two things in the document:

  1. The purpose of batching is to avoid the file sink from generating many small files
  2. The conditions for batching do not guarantee accuracy, because the purpose is 1.


## Batching strategy for file sink

The batching strategy ensures that the contents within a chunk are not further split, which enables the decoupling of the file sink. Currently, the batching strategy is available for Parquet encode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all supported encode: json, csv and parquet.

For batching based on row count, RisingWave checks whether the maximum row count threshold has been reached after each chunk is written (`sink_writer.write_batch()`). If the threshold is met, the writing of the file is completed.

- **Batching based on rollover interval**:
For batching based on the time interval, RisingWave checks the threshold each time a chunk is about to be written (`sink_writer.write_batch()`) and when a barrier is encountered (`sink_writer.barrier()`). Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks.

This is implementation detail, we don't need to expose to user.

- **Batching based on rollover interval**:
For batching based on the time interval, RisingWave checks the threshold each time a chunk is about to be written (`sink_writer.write_batch()`) and when a barrier is encountered (`sink_writer.barrier()`). Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks.

The actual number of rows in a file may slightly exceed the set maximum row count. In extreme cases, it may surpass `chunk_size - 1` rows (currently set at 255). However, this slight excess is generally acceptable due to the typically large row counts in Parquet files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether it is necessary to expose the concept of chunk to users. I suggest to be vague and just say that the condition for batching is relatively coarse-grained. Setting a certain number of lines for batching does not mean that the output file must have this many lines.

Comment on lines 146 to 148
With the batching strategy for file sink, file writing is no longer dependent on the arrival of barriers. `BatchingLogSinkOf` determines when to truncate the log store. Once a file is written, it will be truncated. However, if batching occurs across barriers and no writing has occurred by the time a barrier arrives, the barrier will not trigger truncation.

If no batching strategy is defined, the previous logic will still apply, meaning that a file will be forcefully written upon the arrival of a checkpoint barrier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation detail, remove it.

We just need to tell the user that if no conditions for batch collection are set, a default batch collection strategy will be given.

Copy link
Contributor

@wcy-fdu wcy-fdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

risingwavelabs/risingwave#18472 also introduce partition_by, which is used to partition files by time. Please help add a description of this parameter(perhaps in file naming part): if it is set, there will be subdirectories in the file naming path.

delivery/overview.mdx Outdated Show resolved Hide resolved
delivery/overview.mdx Outdated Show resolved Hide resolved

<Note>The condition for batching is relatively coarse-grained. The actual number of rows or exact timing of file completion may vary from the specified thresholds, as this function is intentionally flexible to prioritize efficient file management.</Note>

If no conditions for batch collection are set, RisingWave will apply a default batching strategy to ensure proper file writing and data consistency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to indicate what the default policy is(IIRC 10s, pls check the origin pr).

Copy link
Contributor

@wcy-fdu wcy-fdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the efforts!

@WanYixian WanYixian merged commit 4471321 into main Nov 25, 2024
2 checks passed
@WanYixian WanYixian deleted the wyx/resolve_51 branch November 25, 2024 05:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Document: feat(sink): introduce batching strategy for file sink
2 participants