Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-sink): change to streaming upload instead of batched bulk load #16269

Merged
merged 31 commits into from
Apr 19, 2024

Conversation

xzhseh
Copy link
Contributor

@xzhseh xzhseh commented Apr 11, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

as titled.

related: #15429 (comment).

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xzhseh xzhseh requested review from fuyufjh, wenym1 and xxhZs April 11, 2024 21:44
@xzhseh xzhseh self-assigned this Apr 11, 2024
@xzhseh xzhseh requested a review from a team as a code owner April 11, 2024 21:44
@xzhseh xzhseh force-pushed the xzhseh/snowflake-sink-streaming-upload branch from 71922d3 to 7091cd3 Compare April 11, 2024 21:45
@xzhseh
Copy link
Contributor Author

xzhseh commented Apr 11, 2024

note that some difference when changing to streaming upload, e.g.

  • previously data is evenly distributed by parallel writer(s) and sink directly to the intermediate s3.
  • using streaming upload under the context example use-case (i.e., the one specified in integration_tests/snowflake-sink) would lead to intermediate file(s) in s3 being further (evenly) split to multiple smaller files - due to the streaming nature.

as an example, using my personal s3 bucket and the example source table / mv in integration_tests/snowflake-sink/..., 25 KB * 4 files will appear intermediately in previous approach, ~19.1 KB * 4 + ~2.1 KB * 4 files will appear intemediately when changing to streaming upload, just as shown below.

CleanShot 2024-04-11 at 17 45 59@2x

@@ -221,25 +224,49 @@ impl SnowflakeS3Client {
// create the brand new s3 client used to sink files to s3
let s3_client = S3Client::new(&config);

// just use default here
let config = ObjectStoreConfig::default();
Copy link
Contributor Author

@xzhseh xzhseh Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus, do we need special parameters tuning for the configurations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily, but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible. Just for illustration, any parameters start with opendal.param_name = param_val with be passed to OpenDAL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible.

agree, I could put the support of extra configurations for s3 client in subsequent pr.

src/connector/src/sink/snowflake_connector.rs Show resolved Hide resolved
})?;

uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the idea of using streaming upload is not to simply change the way to upload a complete Vec<u8> buffer, but to replace the buffer with the streaming uploader. The ideal implementation will be,

  1. at the very beginning, we create a streaming uploader.
  2. when new data comes, we do some conversion and write the data to the streaming uploader
  3. when checkpoint barrier comes, we close the writer and commit the file to snowflake
  4. when the first next data comes, initialize a new streaming uploader

@xzhseh xzhseh requested a review from wenym1 April 15, 2024 17:26
@xzhseh
Copy link
Contributor Author

xzhseh commented Apr 15, 2024

according to #16269 (comment), I refactor the SnowflakeSinkWriter and now we don't need to batch intermediate in-memory data in payload, instead we'll write them to streaming_uploader, which will be initialized / updated per epoch, and whenever a ckpt barrier comes (and there indeed has some data in the current streaming_uploader) we will finalize the entire streaming upload, and notify snowflake pipe about this.

and as a result - we don't need configuration like snowflake.max_batch_row_num anymore, everything is handled internally and transparent to users.

cc @wenym1 @fuyufjh @xxhZs.

@xzhseh xzhseh changed the title feat(snowflake-sink): use streaming upload from opendal instead of aws_sdk_s3 bulk load feat(snowflake-sink): change to streaming upload instead of batched bulk load Apr 15, 2024
@xzhseh xzhseh force-pushed the xzhseh/snowflake-sink-streaming-upload branch from 6516e92 to 5e35d06 Compare April 15, 2024 18:50
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the rest

src/connector/src/sink/snowflake.rs Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
@xzhseh xzhseh force-pushed the xzhseh/snowflake-sink-streaming-upload branch from ec26ee5 to ad09874 Compare April 16, 2024 17:42
@xzhseh xzhseh requested review from fuyufjh and wenym1 April 17, 2024 19:07
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM! Thanks for the efforts to update the PR!

src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/snowflake.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@xzhseh xzhseh added this pull request to the merge queue Apr 19, 2024
Merged via the queue into main with commit 66ac135 Apr 19, 2024
28 of 29 checks passed
@xzhseh xzhseh deleted the xzhseh/snowflake-sink-streaming-upload branch April 19, 2024 06:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants