-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snowflake-sink): change to streaming upload instead of batched bulk load #16269
Conversation
71922d3
to
7091cd3
Compare
@@ -221,25 +224,49 @@ impl SnowflakeS3Client { | |||
// create the brand new s3 client used to sink files to s3 | |||
let s3_client = S3Client::new(&config); | |||
|
|||
// just use default here | |||
let config = ObjectStoreConfig::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plus, do we need special parameters tuning for the configurations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily, but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible. Just for illustration, any parameters start with opendal.param_name = param_val
with be passed to OpenDAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible.
agree, I could put the support of extra configurations for s3 client in subsequent pr.
})?; | ||
|
||
uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the idea of using streaming upload is not to simply change the way to upload a complete Vec<u8>
buffer, but to replace the buffer with the streaming uploader. The ideal implementation will be,
- at the very beginning, we create a streaming uploader.
- when new data comes, we do some conversion and write the data to the streaming uploader
- when checkpoint barrier comes, we close the writer and commit the file to snowflake
- when the first next data comes, initialize a new streaming uploader
according to #16269 (comment), I refactor the and as a result - we don't need configuration like |
opendal
instead of aws_sdk_s3
bulk load6516e92
to
5e35d06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the rest
ec26ee5
to
ad09874
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM! Thanks for the efforts to update the PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
as titled.
related: #15429 (comment).
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.