-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update data ingestion overview about batch #55
Changes from 3 commits
d4b9452
5d7c2dd
62a8e80
86cacab
835d12f
8d234e1
87b0c80
efe6beb
12f3cd1
e194630
6226ed5
d7f342d
9febcca
5adee3b
26dbdef
875a54a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,3 +127,36 @@ WITH ( | |
File sink currently supports only append-only mode, so please change the query to `append-only` and specify this explicitly after the `FORMAT ... ENCODE ...` statement. | ||
|
||
</Info> | ||
|
||
## Batching strategy for file sink | ||
|
||
The batching strategy ensures that the contents within a chunk are not further split, which enables the decoupling of the file sink. Currently, the batching strategy is available for Parquet encode. | ||
|
||
### Category | ||
|
||
There are two primary batching strategies: | ||
|
||
- **Batching based on row numbers**: | ||
For batching based on row count, RisingWave checks whether the maximum row count threshold has been reached after each chunk is written (`sink_writer.write_batch()`). If the threshold is met, the writing of the file is completed. | ||
WanYixian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- **Batching based on rollover interval**: | ||
For batching based on the time interval, RisingWave checks the threshold each time a chunk is about to be written (`sink_writer.write_batch()`) and when a barrier is encountered (`sink_writer.barrier()`). Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is implementation detail, we don't need to expose to user. |
||
|
||
The actual number of rows in a file may slightly exceed the set maximum row count. In extreme cases, it may surpass `chunk_size - 1` rows (currently set at 255). However, this slight excess is generally acceptable due to the typically large row counts in Parquet files. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering whether it is necessary to expose the concept of chunk to users. I suggest to be vague and just say that the condition for batching is relatively coarse-grained. Setting a certain number of lines for batching does not mean that the output file must have this many lines. |
||
|
||
### Sink decoupling with batching strategy | ||
|
||
With the batching strategy for file sink, file writing is no longer dependent on the arrival of barriers. `BatchingLogSinkOf` determines when to truncate the log store. Once a file is written, it will be truncated. However, if batching occurs across barriers and no writing has occurred by the time a barrier arrives, the barrier will not trigger truncation. | ||
|
||
If no batching strategy is defined, the previous logic will still apply, meaning that a file will be forcefully written upon the arrival of a checkpoint barrier. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implementation detail, remove it. We just need to tell the user that if no conditions for batch collection are set, a default batch collection strategy will be given. |
||
|
||
### File naming rule | ||
|
||
Previously, the naming convention for files was `executor_id + epoch.suffix`. With the decoupling of barriers and file writing, the epoch is no longer needed in the file name. However, the `executor_id` is still required, as RisingWave does not perform file merging between different levels of parallelism. | ||
|
||
The current file naming rule is `/Option<partition_by>/executor_id + timestamp.suffix`, where the timestamp differentiates files batched by the rollover interval. For example, the output files look like below: | ||
|
||
``` | ||
path/2024-09-20/47244640257_1727072046.parquet | ||
path/2024-09-20/47244640257_1727072055.parquet | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @wcy-fdu, wanna a double check if the batching strategy is available for only parquet or both parquet and json? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all supported encode: json, csv and parquet.