Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update data ingestion overview about batch #55

Merged
merged 16 commits into from
Nov 25, 2024
Merged
33 changes: 33 additions & 0 deletions delivery/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,36 @@ WITH (
File sink currently supports only append-only mode, so please change the query to `append-only` and specify this explicitly after the `FORMAT ... ENCODE ...` statement.

</Info>

## Batching strategy for file sink

The batching strategy ensures that the contents within a chunk are not further split, which enables the decoupling of the file sink. Currently, the batching strategy is available for Parquet encode.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wcy-fdu, wanna a double check if the batching strategy is available for only parquet or both parquet and json? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all supported encode: json, csv and parquet.


### Category

There are two primary batching strategies:

- **Batching based on row numbers**:
For batching based on row count, RisingWave checks whether the maximum row count threshold has been reached after each chunk is written (`sink_writer.write_batch()`). If the threshold is met, the writing of the file is completed.
WanYixian marked this conversation as resolved.
Show resolved Hide resolved

- **Batching based on rollover interval**:
For batching based on the time interval, RisingWave checks the threshold each time a chunk is about to be written (`sink_writer.write_batch()`) and when a barrier is encountered (`sink_writer.barrier()`). Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if a barrier gets stuck, batching may not strictly adhere to the preset rollover interval, leading to possible delays in writing. Future implementations will optimize this process by monitoring the writer itself, rather than relying solely on barriers or chunks.

This is implementation detail, we don't need to expose to user.


The actual number of rows in a file may slightly exceed the set maximum row count. In extreme cases, it may surpass `chunk_size - 1` rows (currently set at 255). However, this slight excess is generally acceptable due to the typically large row counts in Parquet files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether it is necessary to expose the concept of chunk to users. I suggest to be vague and just say that the condition for batching is relatively coarse-grained. Setting a certain number of lines for batching does not mean that the output file must have this many lines.


### Sink decoupling with batching strategy

With the batching strategy for file sink, file writing is no longer dependent on the arrival of barriers. `BatchingLogSinkOf` determines when to truncate the log store. Once a file is written, it will be truncated. However, if batching occurs across barriers and no writing has occurred by the time a barrier arrives, the barrier will not trigger truncation.

If no batching strategy is defined, the previous logic will still apply, meaning that a file will be forcefully written upon the arrival of a checkpoint barrier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation detail, remove it.

We just need to tell the user that if no conditions for batch collection are set, a default batch collection strategy will be given.


### File naming rule

Previously, the naming convention for files was `executor_id + epoch.suffix`. With the decoupling of barriers and file writing, the epoch is no longer needed in the file name. However, the `executor_id` is still required, as RisingWave does not perform file merging between different levels of parallelism.

The current file naming rule is `/Option<partition_by>/executor_id + timestamp.suffix`, where the timestamp differentiates files batched by the rollover interval. For example, the output files look like below:

```
path/2024-09-20/47244640257_1727072046.parquet
path/2024-09-20/47244640257_1727072055.parquet
```
1 change: 1 addition & 0 deletions mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
{"source": "/docs/current/architecture", "destination": "/reference/architecture"},
{"source": "/docs/current/fault-tolerance", "destination": "/reference/fault-tolerance"},
{"source": "/docs/current/limitations", "destination": "/reference/limitations"},
{"source": "/docs/current/sources", "destination": "/integrations/sources/overview"},
{"source": "/docs/current/sql-alter-connection", "destination": "/sql/commands/sql-alter-connection"},
{"source": "/docs/current/sql-alter-database", "destination": "/sql/commands/sql-alter-database"},
{"source": "/docs/current/sql-alter-function", "destination": "/sql/commands/sql-alter-function"},
Expand Down