From 08f8ec633fa459f20c73a5108eb964e8c8374ee4 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 11 Apr 2024 17:41:57 -0400 Subject: [PATCH 01/30] use streaming upload instead of bulk load of aws_sdk_s3 --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/sink/snowflake_connector.rs | 42 +++++++++------ .../opendal_engine/opendal_object_store.rs | 2 + .../src/object/opendal_engine/opendal_s3.rs | 53 +++++++++++++++++++ 5 files changed, 84 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58a157173e81e..dcb6f63097273 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9618,6 +9618,7 @@ dependencies = [ "risingwave_common", "risingwave_common_estimate_size", "risingwave_jni_core", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "rumqttc", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 51437f82e3ec5..bb97fe608127c 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -110,6 +110,7 @@ risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } +risingwave_object_store = { workspace = true } rumqttc = { version = "0.24.0", features = ["url"] } rust_decimal = "1" rustls-native-certs = "0.7" diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 30c74045441a2..6ea732b846e08 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -18,12 +18,13 @@ use std::time::{SystemTime, UNIX_EPOCH}; use aws_config; use aws_config::meta::region::RegionProviderChain; use aws_sdk_s3::config::Credentials; -use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client as S3Client; use aws_types::region::Region; use bytes::Bytes; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use reqwest::{header, Client, RequestBuilder, StatusCode}; +use risingwave_common::config::ObjectStoreConfig; +use risingwave_object_store::object::*; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -191,6 +192,7 @@ pub struct SnowflakeS3Client { s3_bucket: String, s3_path: Option, s3_client: S3Client, + opendal_s3_engine: OpendalObjectStore, } impl SnowflakeS3Client { @@ -202,15 +204,15 @@ impl SnowflakeS3Client { aws_region: String, ) -> Self { let credentials = Credentials::new( - aws_access_key_id, - aws_secret_access_key, + aws_access_key_id.clone(), + aws_secret_access_key.clone(), // we don't allow temporary credentials None, None, "rw_sink_to_s3_credentials", ); - let region = RegionProviderChain::first_try(Region::new(aws_region)).or_default_provider(); + let region = RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); let config = aws_config::from_env() .credentials_provider(credentials) @@ -221,25 +223,35 @@ impl SnowflakeS3Client { // create the brand new s3 client used to sink files to s3 let s3_client = S3Client::new(&config); + // just use default here + let config = ObjectStoreConfig::default(); + + // create the s3 engine for streaming upload to the intermediate s3 bucket + // note: this will lead to a complete panic if any credential / intermediate creation + // process has error, which may not be acceptable... + // but it's hard to gracefully handle the error without modifying downstream return type(s)... + let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( + &s3_bucket, + config, + &aws_access_key_id, + &aws_secret_access_key, + &aws_region, + ).unwrap(); + Self { s3_bucket, s3_path, s3_client, + opendal_s3_engine, } } pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { - self.s3_client - .put_object() - .bucket(self.s3_bucket.clone()) - .key(generate_s3_file_name(self.s3_path.clone(), file_suffix)) - .body(ByteStream::from(data)) - .send() - .await - .map_err(|err| { - SinkError::Snowflake(format!("failed to sink data to S3, error: {}", err)) - })?; - + let path = generate_s3_file_name(self.s3_path.clone(), file_suffix); + let mut uploader = self.opendal_s3_engine.streaming_upload(&path).await. + map_err(|err| SinkError::Snowflake(format!("failed to create the streaming uploader of opendal s3 engine, error: {}", err)))?; + uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?; + uploader.finish().await.map_err(|err| SinkError::Snowflake(format!("failed to finish streaming upload to s3 for snowflake sink, error: {}", err)))?; Ok(()) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index d50422f015c7a..5060808fd04be 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -33,6 +33,7 @@ pub struct OpendalObjectStore { pub(crate) op: Operator, pub(crate) engine_type: EngineType, } + #[derive(Clone)] pub enum EngineType { Memory, @@ -216,6 +217,7 @@ impl ObjectStore for OpendalObjectStore { pub struct OpendalStreamingUploader { writer: Writer, } + impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { let writer = op diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index db2c7732d8fbf..06d1d092804c1 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -59,6 +59,7 @@ impl OpendalObjectStore { .with_jitter(), ) .finish(); + Ok(Self { op, engine_type: EngineType::S3, @@ -78,4 +79,56 @@ impl OpendalObjectStore { Ok(HttpClient::build(client_builder)?) } + + /// currently used by snowflake sink, + /// especially when sinking to the intermediate s3 bucket. + pub fn new_s3_engine_with_credentials( + bucket: &str, + object_store_config: ObjectStoreConfig, + aws_access_key_id: &str, + aws_secret_access_key: &str, + aws_region: &str, + ) -> ObjectResult { + // Create s3 builder with credentials. + let mut builder = S3::default(); + + // set credentials for s3 sink + builder.bucket(bucket); + builder.access_key_id(aws_access_key_id); + builder.secret_access_key(aws_secret_access_key); + builder.region(aws_region); + + // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. + if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { + builder.endpoint(&endpoint_url); + } + + if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { + builder.enable_virtual_host_style(); + } + + let http_client = Self::new_http_client(&object_store_config)?; + builder.http_client(http_client); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.1) + .with_jitter(), + ) + .finish(); + + Ok(Self { + op, + engine_type: EngineType::S3, + }) + } } From 7091cd3ec6f9f5127f071d01c3c7edd0738c6edf Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 11 Apr 2024 17:44:51 -0400 Subject: [PATCH 02/30] update fmt --- src/connector/Cargo.toml | 2 +- src/connector/src/sink/snowflake_connector.rs | 25 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index bb97fe608127c..5f759ff1612cf 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -108,9 +108,9 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } -risingwave_object_store = { workspace = true } rumqttc = { version = "0.24.0", features = ["url"] } rust_decimal = "1" rustls-native-certs = "0.7" diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 6ea732b846e08..3466633757c81 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -212,7 +212,8 @@ impl SnowflakeS3Client { "rw_sink_to_s3_credentials", ); - let region = RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); + let region = + RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); let config = aws_config::from_env() .credentials_provider(credentials) @@ -236,7 +237,8 @@ impl SnowflakeS3Client { &aws_access_key_id, &aws_secret_access_key, &aws_region, - ).unwrap(); + ) + .unwrap(); Self { s3_bucket, @@ -248,10 +250,23 @@ impl SnowflakeS3Client { pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { let path = generate_s3_file_name(self.s3_path.clone(), file_suffix); - let mut uploader = self.opendal_s3_engine.streaming_upload(&path).await. - map_err(|err| SinkError::Snowflake(format!("failed to create the streaming uploader of opendal s3 engine, error: {}", err)))?; + let mut uploader = self + .opendal_s3_engine + .streaming_upload(&path) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to create the streaming uploader of opendal s3 engine, error: {}", + err + )) + })?; uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?; - uploader.finish().await.map_err(|err| SinkError::Snowflake(format!("failed to finish streaming upload to s3 for snowflake sink, error: {}", err)))?; + uploader.finish().await.map_err(|err| { + SinkError::Snowflake(format!( + "failed to finish streaming upload to s3 for snowflake sink, error: {}", + err + )) + })?; Ok(()) } } From f9a354dbf3b16eef5f92ebd8beb630b9a1a7b360 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 11 Apr 2024 17:53:11 -0400 Subject: [PATCH 03/30] update comments --- src/connector/src/sink/snowflake_connector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 3466633757c81..d7b80aba9a261 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -228,7 +228,7 @@ impl SnowflakeS3Client { let config = ObjectStoreConfig::default(); // create the s3 engine for streaming upload to the intermediate s3 bucket - // note: this will lead to a complete panic if any credential / intermediate creation + // note: this will lead to an internal panic if any credential / intermediate creation // process has error, which may not be acceptable... // but it's hard to gracefully handle the error without modifying downstream return type(s)... let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( From a6e3392c2e92f128dfdd2608e525d58f47fdfbb2 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 12 Apr 2024 14:04:29 -0400 Subject: [PATCH 04/30] remove S3Client from aws_s3_sdk --- src/connector/src/sink/snowflake_connector.rs | 29 +------------------ 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index d7b80aba9a261..f45f655082520 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -16,10 +16,6 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use aws_config; -use aws_config::meta::region::RegionProviderChain; -use aws_sdk_s3::config::Credentials; -use aws_sdk_s3::Client as S3Client; -use aws_types::region::Region; use bytes::Bytes; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use reqwest::{header, Client, RequestBuilder, StatusCode}; @@ -191,7 +187,6 @@ impl SnowflakeHttpClient { pub struct SnowflakeS3Client { s3_bucket: String, s3_path: Option, - s3_client: S3Client, opendal_s3_engine: OpendalObjectStore, } @@ -203,28 +198,7 @@ impl SnowflakeS3Client { aws_secret_access_key: String, aws_region: String, ) -> Self { - let credentials = Credentials::new( - aws_access_key_id.clone(), - aws_secret_access_key.clone(), - // we don't allow temporary credentials - None, - None, - "rw_sink_to_s3_credentials", - ); - - let region = - RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); - - let config = aws_config::from_env() - .credentials_provider(credentials) - .region(region) - .load() - .await; - - // create the brand new s3 client used to sink files to s3 - let s3_client = S3Client::new(&config); - - // just use default here + // just use default configuration here for opendal s3 engine let config = ObjectStoreConfig::default(); // create the s3 engine for streaming upload to the intermediate s3 bucket @@ -243,7 +217,6 @@ impl SnowflakeS3Client { Self { s3_bucket, s3_path, - s3_client, opendal_s3_engine, } } From 463131f9fe4eb490e2b0c5ca64fb0a42d8b07ad5 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 12 Apr 2024 14:08:24 -0400 Subject: [PATCH 05/30] remove useless env var reading --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 06d1d092804c1..a52c1e2fc9edc 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -98,15 +98,6 @@ impl OpendalObjectStore { builder.secret_access_key(aws_secret_access_key); builder.region(aws_region); - // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. - if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { - builder.endpoint(&endpoint_url); - } - - if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { - builder.enable_virtual_host_style(); - } - let http_client = Self::new_http_client(&object_store_config)?; builder.http_client(http_client); From a097dd891da4e6b5c80c7137a224164ba341e9b0 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 12 Apr 2024 16:03:43 -0400 Subject: [PATCH 06/30] remove async for SnowflakeS3Client::new --- src/connector/src/sink/snowflake.rs | 3 +-- src/connector/src/sink/snowflake_connector.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index f4901b025effc..c525aa501797f 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -211,8 +211,7 @@ impl SnowflakeSinkWriter { config.common.aws_access_key_id.clone(), config.common.aws_secret_access_key.clone(), config.common.aws_region.clone(), - ) - .await; + ); let max_batch_row_num = config .common diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index f45f655082520..658cf65b5b69a 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use aws_config; use bytes::Bytes; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use reqwest::{header, Client, RequestBuilder, StatusCode}; @@ -191,7 +190,7 @@ pub struct SnowflakeS3Client { } impl SnowflakeS3Client { - pub async fn new( + pub fn new( s3_bucket: String, s3_path: Option, aws_access_key_id: String, From db709fd74e0bcb3d15c4a7bc0c0d93d27cee4b36 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 12 Apr 2024 20:33:45 -0400 Subject: [PATCH 07/30] fix check --- src/connector/src/sink/snowflake.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index c525aa501797f..1b0a670ff1371 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -138,7 +138,6 @@ impl Sink for SnowflakeSink { self.pk_indices.clone(), self.is_append_only, ) - .await .into_log_sinker(writer_param.sink_metrics)) } @@ -187,7 +186,7 @@ pub struct SnowflakeSinkWriter { } impl SnowflakeSinkWriter { - pub async fn new( + pub fn new( config: SnowflakeConfig, schema: Schema, pk_indices: Vec, From 6e4541160e4bf26cbd1e486b50794c0a5d7fd5e1 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 12 Apr 2024 20:35:05 -0400 Subject: [PATCH 08/30] update comment --- src/connector/src/sink/snowflake.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 1b0a670ff1371..4c6cf47e414d9 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -287,8 +287,8 @@ impl SnowflakeSinkWriter { return Ok(()); } let file_suffix = self.file_suffix(); - // todo: change this to streaming upload - // first sink to the external stage provided by user (i.e., s3) + // first sink to the external stage provided by user. (i.e., s3 bucket) + // note: this upload is streaming. self.s3_client .sink_to_s3(self.payload.clone().into(), file_suffix.clone()) .await?; From f0840703c3d141951766b1caffff62523103a696 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 09:49:55 -0400 Subject: [PATCH 09/30] graceful error handling for opendal s3 engine --- src/connector/src/sink/snowflake.rs | 10 +++++----- src/connector/src/sink/snowflake_connector.rs | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 4c6cf47e414d9..6e3d1c70a57f7 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -137,7 +137,7 @@ impl Sink for SnowflakeSink { self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ) + )? .into_log_sinker(writer_param.sink_metrics)) } @@ -191,7 +191,7 @@ impl SnowflakeSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Self { + ) -> Result { let http_client = SnowflakeHttpClient::new( config.common.account_identifier.clone(), config.common.user.clone(), @@ -210,7 +210,7 @@ impl SnowflakeSinkWriter { config.common.aws_access_key_id.clone(), config.common.aws_secret_access_key.clone(), config.common.aws_region.clone(), - ); + )?; let max_batch_row_num = config .common @@ -219,7 +219,7 @@ impl SnowflakeSinkWriter { .parse::() .expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); - Self { + Ok(Self { config, schema: schema.clone(), pk_indices, @@ -239,7 +239,7 @@ impl SnowflakeSinkWriter { max_batch_row_num, // initial value of `epoch` will start from 0 epoch: 0, - } + }) } /// reset the `payload` and `row_counter`. diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 658cf65b5b69a..c1a2c47ebcf8f 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -196,7 +196,7 @@ impl SnowflakeS3Client { aws_access_key_id: String, aws_secret_access_key: String, aws_region: String, - ) -> Self { + ) -> Result { // just use default configuration here for opendal s3 engine let config = ObjectStoreConfig::default(); @@ -211,13 +211,13 @@ impl SnowflakeS3Client { &aws_secret_access_key, &aws_region, ) - .unwrap(); + .map_err(|e| SinkError::Snowflake(format!("failed to create opendal s3 engine, error: {}", e.to_string())))?; - Self { + Ok(Self { s3_bucket, s3_path, opendal_s3_engine, - } + }) } pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { From ca4a86b2e99ca8489aa5ecfa32034be474a86d4a Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 09:50:31 -0400 Subject: [PATCH 10/30] update fmt --- src/connector/src/sink/snowflake_connector.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index c1a2c47ebcf8f..0c21f4ffff504 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -211,7 +211,12 @@ impl SnowflakeS3Client { &aws_secret_access_key, &aws_region, ) - .map_err(|e| SinkError::Snowflake(format!("failed to create opendal s3 engine, error: {}", e.to_string())))?; + .map_err(|e| { + SinkError::Snowflake(format!( + "failed to create opendal s3 engine, error: {}", + e.to_string() + )) + })?; Ok(Self { s3_bucket, From e9ab2277357ec4f303b9bd4c662efd2be4427fa2 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 13:25:28 -0400 Subject: [PATCH 11/30] refactor SnowflakeSinkWriter; introduce streaming_uploader & remove payload --- src/connector/src/sink/snowflake.rs | 137 +++++++++++------- src/connector/src/sink/snowflake_connector.rs | 31 +--- 2 files changed, 92 insertions(+), 76 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 6e3d1c70a57f7..5436e978c79cd 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -17,9 +17,11 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use bytes::Bytes; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_object_store::object::{ObjectStore, StreamingUploader}; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; @@ -29,7 +31,7 @@ use with_options::WithOptions; use super::encoder::{ JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; -use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; +use super::snowflake_connector::{generate_s3_file_name, SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; use crate::sink::writer::SinkWriterExt; @@ -94,11 +96,6 @@ pub struct SnowflakeCommon { /// The s3 region, e.g., us-east-2 #[serde(rename = "snowflake.aws_region")] pub aws_region: String, - - /// The configurable max row(s) to batch, - /// which should be *explicitly* specified by user(s) - #[serde(rename = "snowflake.max_batch_row_num")] - pub max_batch_row_num: String, } #[serde_as] @@ -176,13 +173,16 @@ pub struct SnowflakeSinkWriter { /// the client to insert file to external storage (i.e., s3) s3_client: SnowflakeS3Client, row_encoder: JsonEncoder, - row_counter: u32, - payload: String, - /// the threshold for sinking to s3 - max_batch_row_num: u32, /// The current epoch, used in naming the sink files /// mainly used for debugging purpose epoch: u64, + /// streaming uploader, i.e., opendal s3 engine + streaming_uploader: Option>, + /// the *unique* file suffix for intermediate s3 files + file_suffix: Option, + /// the flag that indicates whether we have at least call `streaming_upload` + /// once during this epoch, this is used to prevent uploading empty data. + has_data: bool, } impl SnowflakeSinkWriter { @@ -212,13 +212,6 @@ impl SnowflakeSinkWriter { config.common.aws_region.clone(), )?; - let max_batch_row_num = config - .common - .max_batch_row_num - .clone() - .parse::() - .expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); - Ok(Self { config, schema: schema.clone(), @@ -234,31 +227,83 @@ impl SnowflakeSinkWriter { TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, ), - row_counter: 0, - payload: String::new(), - max_batch_row_num, // initial value of `epoch` will start from 0 epoch: 0, + // will be initialized after the begin of each epoch + streaming_uploader: None, + file_suffix: None, + has_data: false, }) } - /// reset the `payload` and `row_counter`. - /// shall *only* be called after a successful sink to s3. - fn reset(&mut self) { - self.payload.clear(); - self.row_counter = 0; + /// update the streaming uploader as well as the file suffix. + /// note: should *only* be called when a new epoch begins. + async fn update_streaming_uploader(&mut self) -> Result<()> { + self.file_suffix = Some(self.file_suffix()); + let path = generate_s3_file_name(self.s3_client.s3_path(), self.file_suffix.as_ref().unwrap()); + let uploader = self + .s3_client + .opendal_s3_engine + .streaming_upload(&path) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to create the streaming uploader of opendal s3 engine for epoch {}, error: {}", + self.epoch, + err + )) + })?; + self.streaming_uploader = Some(uploader); + // we don't have data at the beginning of each epoch + self.has_data = false; + Ok(()) } - fn at_sink_threshold(&self) -> bool { - self.row_counter >= self.max_batch_row_num + /// write data to the current streaming uploader for this epoch. + async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { + debug_assert!(self.streaming_uploader.is_some(), "expect streaming uploader to be properly initialized"); + self + .streaming_uploader + .as_mut() + .unwrap() + .write_bytes(data) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", + err + )) + })?; + // well, at least there are some data to be uploaded + self.has_data = true; + Ok(()) } - fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + /// finalize streaming upload for this epoch. + /// ensure all the data has been properly uploaded to intermediate s3. + async fn finish_streaming_upload(&mut self) -> Result<()> { + let uploader = std::mem::take(&mut self.streaming_uploader); + let Some(uploader) = uploader else { + return Err(SinkError::Snowflake(format!( + "streaming uploader is not valid when trying to finish streaming upload for epoch {}", + self.epoch + ) + )); + }; + uploader.finish().await.map_err(|err| { + SinkError::Snowflake(format!( + "failed to finish streaming upload to s3 for snowflake sink, error: {}", + err + )) + })?; + Ok(()) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.payload.push_str(&row_json_string); - self.row_counter += 1; + self.streaming_upload(row_json_string.into()).await?; } Ok(()) } @@ -282,20 +327,14 @@ impl SnowflakeSinkWriter { /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. - async fn sink_payload(&mut self) -> Result<()> { - if self.payload.is_empty() { + async fn commit(&mut self) -> Result<()> { + if !self.has_data { + // no data needs to be committed return Ok(()); } - let file_suffix = self.file_suffix(); - // first sink to the external stage provided by user. (i.e., s3 bucket) - // note: this upload is streaming. - self.s3_client - .sink_to_s3(self.payload.clone().into(), file_suffix.clone()) - .await?; - // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(file_suffix).await?; - // reset `payload` & `row_counter` - self.reset(); + self.finish_streaming_upload().await?; + // trigger `insertFiles` post request to snowflake + self.http_client.send_request(self.file_suffix.as_ref().unwrap().as_str()).await?; Ok(()) } } @@ -304,6 +343,7 @@ impl SnowflakeSinkWriter { impl SinkWriter for SnowflakeSinkWriter { async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { self.update_epoch(epoch); + self.update_streaming_uploader().await?; Ok(()) } @@ -317,20 +357,15 @@ impl SinkWriter for SnowflakeSinkWriter { async fn barrier(&mut self, is_checkpoint: bool) -> Result { if is_checkpoint { - // sink all the row(s) currently batched in `self.payload` - self.sink_payload().await?; + // finalize current streaming upload, plus notify snowflake to sink + // the corresponding data to snowflake pipe. + self.commit().await?; } Ok(()) } async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - self.append_only(chunk)?; - - // When the number of row exceeds `MAX_BATCH_ROW_NUM` - if self.at_sink_threshold() { - self.sink_payload().await?; - } - + self.append_only(chunk).await?; Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 0c21f4ffff504..3373771b5c5d3 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use bytes::Bytes; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use reqwest::{header, Client, RequestBuilder, StatusCode}; use risingwave_common::config::ObjectStoreConfig; @@ -31,7 +30,7 @@ const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; /// The helper function to generate the *global unique* s3 file name. -fn generate_s3_file_name(s3_path: Option, suffix: String) -> String { +pub(crate) fn generate_s3_file_name(s3_path: Option<&str>, suffix: &str) -> String { match s3_path { Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix), None => format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, suffix), @@ -151,7 +150,7 @@ impl SnowflakeHttpClient { /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 - pub async fn send_request(&self, file_suffix: String) -> Result<()> { + pub async fn send_request(&self, file_suffix: &str) -> Result<()> { let builder = self.build_request_and_client(); // Generate the jwt_token @@ -163,7 +162,7 @@ impl SnowflakeHttpClient { "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", ) - .body(generate_s3_file_name(self.s3_path.clone(), file_suffix)); + .body(generate_s3_file_name(self.s3_path.as_ref().map(|s| s.as_str()), file_suffix)); let response = builder .send() @@ -186,7 +185,7 @@ impl SnowflakeHttpClient { pub struct SnowflakeS3Client { s3_bucket: String, s3_path: Option, - opendal_s3_engine: OpendalObjectStore, + pub opendal_s3_engine: OpendalObjectStore, } impl SnowflakeS3Client { @@ -225,25 +224,7 @@ impl SnowflakeS3Client { }) } - pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { - let path = generate_s3_file_name(self.s3_path.clone(), file_suffix); - let mut uploader = self - .opendal_s3_engine - .streaming_upload(&path) - .await - .map_err(|err| { - SinkError::Snowflake(format!( - "failed to create the streaming uploader of opendal s3 engine, error: {}", - err - )) - })?; - uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?; - uploader.finish().await.map_err(|err| { - SinkError::Snowflake(format!( - "failed to finish streaming upload to s3 for snowflake sink, error: {}", - err - )) - })?; - Ok(()) + pub fn s3_path(&self) -> Option<&str> { + self.s3_path.as_ref().map(|s| s.as_str()) } } From c63e9300a59ce7f43bd6d82c79fe741e9b068b53 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 13:26:02 -0400 Subject: [PATCH 12/30] update fmt --- src/connector/src/sink/snowflake.rs | 12 +++++++++--- src/connector/src/sink/snowflake_connector.rs | 5 ++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 5436e978c79cd..ba4cd2d015ac9 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -240,7 +240,8 @@ impl SnowflakeSinkWriter { /// note: should *only* be called when a new epoch begins. async fn update_streaming_uploader(&mut self) -> Result<()> { self.file_suffix = Some(self.file_suffix()); - let path = generate_s3_file_name(self.s3_client.s3_path(), self.file_suffix.as_ref().unwrap()); + let path = + generate_s3_file_name(self.s3_client.s3_path(), self.file_suffix.as_ref().unwrap()); let uploader = self .s3_client .opendal_s3_engine @@ -261,7 +262,10 @@ impl SnowflakeSinkWriter { /// write data to the current streaming uploader for this epoch. async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { - debug_assert!(self.streaming_uploader.is_some(), "expect streaming uploader to be properly initialized"); + debug_assert!( + self.streaming_uploader.is_some(), + "expect streaming uploader to be properly initialized" + ); self .streaming_uploader .as_mut() @@ -334,7 +338,9 @@ impl SnowflakeSinkWriter { } self.finish_streaming_upload().await?; // trigger `insertFiles` post request to snowflake - self.http_client.send_request(self.file_suffix.as_ref().unwrap().as_str()).await?; + self.http_client + .send_request(self.file_suffix.as_ref().unwrap().as_str()) + .await?; Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 3373771b5c5d3..b22832ffe9b02 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -162,7 +162,10 @@ impl SnowflakeHttpClient { "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", ) - .body(generate_s3_file_name(self.s3_path.as_ref().map(|s| s.as_str()), file_suffix)); + .body(generate_s3_file_name( + self.s3_path.as_ref().map(|s| s.as_str()), + file_suffix, + )); let response = builder .send() From 5e35d06481818fc559243e28a555324b5a66b8f6 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 14:49:29 -0400 Subject: [PATCH 13/30] remove redundant comment regarding error handling; update comment for barrier commit --- src/connector/src/sink/snowflake.rs | 1 + src/connector/src/sink/snowflake_connector.rs | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index ba4cd2d015ac9..b1968ae39e78f 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -365,6 +365,7 @@ impl SinkWriter for SnowflakeSinkWriter { if is_checkpoint { // finalize current streaming upload, plus notify snowflake to sink // the corresponding data to snowflake pipe. + // note: if no data needs to be committed, then `commit` is simply a no-op. self.commit().await?; } Ok(()) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index b22832ffe9b02..8ff70aeef18bc 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -203,9 +203,6 @@ impl SnowflakeS3Client { let config = ObjectStoreConfig::default(); // create the s3 engine for streaming upload to the intermediate s3 bucket - // note: this will lead to an internal panic if any credential / intermediate creation - // process has error, which may not be acceptable... - // but it's hard to gracefully handle the error without modifying downstream return type(s)... let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( &s3_bucket, config, From f55a3119640271ae106bb7cfe4a7a6a3cc4f4fd5 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 15 Apr 2024 14:59:20 -0400 Subject: [PATCH 14/30] remove assertion & gracefully handle the case if streaming uploader has not been properly initialized --- src/connector/src/sink/snowflake.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index b1968ae39e78f..5c2e68d6f264b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -262,14 +262,14 @@ impl SnowflakeSinkWriter { /// write data to the current streaming uploader for this epoch. async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { - debug_assert!( - self.streaming_uploader.is_some(), - "expect streaming uploader to be properly initialized" - ); - self - .streaming_uploader - .as_mut() - .unwrap() + let Some(uploader) = self.streaming_uploader.as_mut() else { + return Err(SinkError::Snowflake(format!( + "expect streaming uploader to be properly initialized when performing streaming upload for epoch {}", + self.epoch + )) + ); + }; + uploader .write_bytes(data) .await .map_err(|err| { From a04e29ea55b53179e2d6a185b22339061b7078a9 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 16 Apr 2024 12:49:11 -0400 Subject: [PATCH 15/30] update comment for intermediate s3 file path --- src/connector/src/sink/snowflake.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 5c2e68d6f264b..6e307de646e6b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -79,9 +79,9 @@ pub struct SnowflakeCommon { pub s3_bucket: String, /// The optional s3 path to be specified - /// the actual file location would be `:///` + /// the actual file location would be `s3:////` /// if this field is specified by user(s) - /// otherwise it would be `://` + /// otherwise it would be `s3:///` #[serde(rename = "snowflake.s3_path")] pub s3_path: Option, From a2657da1e81066511afa2be17b7b75dfa22134b8 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 16 Apr 2024 13:25:57 -0400 Subject: [PATCH 16/30] lazily initialize when actual data is fed in --- src/connector/src/sink/snowflake.rs | 52 +++++++++++++++++------------ 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 6e307de646e6b..4fe28dc9c7ad4 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -176,13 +176,13 @@ pub struct SnowflakeSinkWriter { /// The current epoch, used in naming the sink files /// mainly used for debugging purpose epoch: u64, - /// streaming uploader, i.e., opendal s3 engine + /// streaming uploader to upload data to the intermediate (s3) storage. + /// i.e., opendal s3 engine. + /// note: the option here *implicitly* indicates whether we have at + /// least call `streaming_upload` once during this epoch, + /// which is mainly used to prevent uploading empty data. streaming_uploader: Option>, - /// the *unique* file suffix for intermediate s3 files file_suffix: Option, - /// the flag that indicates whether we have at least call `streaming_upload` - /// once during this epoch, this is used to prevent uploading empty data. - has_data: bool, } impl SnowflakeSinkWriter { @@ -227,21 +227,23 @@ impl SnowflakeSinkWriter { TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, ), - // initial value of `epoch` will start from 0 + // initial value of `epoch` will be set to 0 epoch: 0, - // will be initialized after the begin of each epoch + // will be (lazily) initialized after the begin of each epoch + // when some data is ready to be upload streaming_uploader: None, file_suffix: None, - has_data: false, }) } /// update the streaming uploader as well as the file suffix. - /// note: should *only* be called when a new epoch begins. - async fn update_streaming_uploader(&mut self) -> Result<()> { - self.file_suffix = Some(self.file_suffix()); + /// note: should *only* be called iff after a new epoch begins, + /// and `streaming_upload` being called the first time. + /// i.e., lazily initialization of the internal `streaming_uploader`. + async fn new_streaming_uploader(&mut self) -> Result<()> { + let file_suffix = self.file_suffix(); let path = - generate_s3_file_name(self.s3_client.s3_path(), self.file_suffix.as_ref().unwrap()); + generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); let uploader = self .s3_client .opendal_s3_engine @@ -255,13 +257,27 @@ impl SnowflakeSinkWriter { )) })?; self.streaming_uploader = Some(uploader); - // we don't have data at the beginning of each epoch - self.has_data = false; + self.file_suffix = Some(file_suffix); Ok(()) } + /// should be *only* called after `commit` per epoch. + fn reset_streaming_uploader(&mut self) { + self.streaming_uploader = None; + self.file_suffix = None; + } + + /// the `Option` is the flag to determine if there is data to upload. + fn has_data(&self) -> bool { + self.streaming_uploader.is_some() + } + /// write data to the current streaming uploader for this epoch. async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { + if !self.has_data() { + // lazily initialization + self.new_streaming_uploader().await?; + } let Some(uploader) = self.streaming_uploader.as_mut() else { return Err(SinkError::Snowflake(format!( "expect streaming uploader to be properly initialized when performing streaming upload for epoch {}", @@ -278,8 +294,6 @@ impl SnowflakeSinkWriter { err )) })?; - // well, at least there are some data to be uploaded - self.has_data = true; Ok(()) } @@ -332,15 +346,12 @@ impl SnowflakeSinkWriter { /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn commit(&mut self) -> Result<()> { - if !self.has_data { - // no data needs to be committed - return Ok(()); - } self.finish_streaming_upload().await?; // trigger `insertFiles` post request to snowflake self.http_client .send_request(self.file_suffix.as_ref().unwrap().as_str()) .await?; + self.reset_streaming_uploader(); Ok(()) } } @@ -349,7 +360,6 @@ impl SnowflakeSinkWriter { impl SinkWriter for SnowflakeSinkWriter { async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { self.update_epoch(epoch); - self.update_streaming_uploader().await?; Ok(()) } From ad09874f00cd9213ff807e9e6339dccc6fd385db Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 16 Apr 2024 13:37:13 -0400 Subject: [PATCH 17/30] fix check --- src/connector/src/sink/snowflake_connector.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 8ff70aeef18bc..adf8b7634d24e 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -163,7 +163,7 @@ impl SnowflakeHttpClient { "KEYPAIR_JWT", ) .body(generate_s3_file_name( - self.s3_path.as_ref().map(|s| s.as_str()), + self.s3_path.as_deref(), file_suffix, )); @@ -210,10 +210,10 @@ impl SnowflakeS3Client { &aws_secret_access_key, &aws_region, ) - .map_err(|e| { + .map_err(|err| { SinkError::Snowflake(format!( "failed to create opendal s3 engine, error: {}", - e.to_string() + err )) })?; @@ -225,6 +225,6 @@ impl SnowflakeS3Client { } pub fn s3_path(&self) -> Option<&str> { - self.s3_path.as_ref().map(|s| s.as_str()) + self.s3_path.as_deref() } } From 8b3f25ede39e853174302c625b7162c11848758b Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 16 Apr 2024 13:46:17 -0400 Subject: [PATCH 18/30] update fmt --- src/connector/src/sink/snowflake.rs | 3 +-- src/connector/src/sink/snowflake_connector.rs | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 4fe28dc9c7ad4..565c558d80872 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -242,8 +242,7 @@ impl SnowflakeSinkWriter { /// i.e., lazily initialization of the internal `streaming_uploader`. async fn new_streaming_uploader(&mut self) -> Result<()> { let file_suffix = self.file_suffix(); - let path = - generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); + let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); let uploader = self .s3_client .opendal_s3_engine diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index adf8b7634d24e..432d222a8b426 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -162,10 +162,7 @@ impl SnowflakeHttpClient { "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", ) - .body(generate_s3_file_name( - self.s3_path.as_deref(), - file_suffix, - )); + .body(generate_s3_file_name(self.s3_path.as_deref(), file_suffix)); let response = builder .send() From 98f4eb717661452cedbee142d1c43c11c236a8a4 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 16 Apr 2024 14:37:31 -0400 Subject: [PATCH 19/30] update with_options_sink.yaml accordingly --- src/connector/with_options_sink.yaml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 07da6a36a0e3a..e982999a93759 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -564,7 +564,7 @@ SnowflakeConfig: required: true - name: snowflake.s3_path field_type: String - comments: The optional s3 path to be specified the actual file location would be `:///` if this field is specified by user(s) otherwise it would be `://` + comments: The optional s3 path to be specified the actual file location would be `s3://:///` if this field is specified by user(s) otherwise it would be `s3://://` required: false - name: snowflake.aws_access_key_id field_type: String @@ -578,10 +578,6 @@ SnowflakeConfig: field_type: String comments: The s3 region, e.g., us-east-2 required: true - - name: snowflake.max_batch_row_num - field_type: String - comments: The configurable max row(s) to batch, which should be *explicitly* specified by user(s) - required: true StarrocksConfig: fields: - name: starrocks.host From 12391353ead51d953ce56e1f61fb0d720cf2f0cc Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 17 Apr 2024 12:44:42 -0400 Subject: [PATCH 20/30] update with_options_sink.yaml; incorporate file_suffix in streaming_uploader --- src/connector/src/sink/snowflake.rs | 22 ++++++++++------------ src/connector/with_options_sink.yaml | 2 +- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 565c558d80872..ec9b79adc22fd 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -177,12 +177,12 @@ pub struct SnowflakeSinkWriter { /// mainly used for debugging purpose epoch: u64, /// streaming uploader to upload data to the intermediate (s3) storage. - /// i.e., opendal s3 engine. + /// this also contains the file suffix *unique* to the *local* sink writer per epoch. + /// i.e., opendal s3 engine and the file suffix for intermediate s3 file. /// note: the option here *implicitly* indicates whether we have at /// least call `streaming_upload` once during this epoch, /// which is mainly used to prevent uploading empty data. - streaming_uploader: Option>, - file_suffix: Option, + streaming_uploader: Option<(Box, String)>, } impl SnowflakeSinkWriter { @@ -232,7 +232,6 @@ impl SnowflakeSinkWriter { // will be (lazily) initialized after the begin of each epoch // when some data is ready to be upload streaming_uploader: None, - file_suffix: None, }) } @@ -255,15 +254,13 @@ impl SnowflakeSinkWriter { err )) })?; - self.streaming_uploader = Some(uploader); - self.file_suffix = Some(file_suffix); + self.streaming_uploader = Some((uploader, file_suffix)); Ok(()) } /// should be *only* called after `commit` per epoch. fn reset_streaming_uploader(&mut self) { self.streaming_uploader = None; - self.file_suffix = None; } /// the `Option` is the flag to determine if there is data to upload. @@ -285,6 +282,7 @@ impl SnowflakeSinkWriter { ); }; uploader + .0 .write_bytes(data) .await .map_err(|err| { @@ -298,7 +296,7 @@ impl SnowflakeSinkWriter { /// finalize streaming upload for this epoch. /// ensure all the data has been properly uploaded to intermediate s3. - async fn finish_streaming_upload(&mut self) -> Result<()> { + async fn finish_streaming_upload(&mut self) -> Result { let uploader = std::mem::take(&mut self.streaming_uploader); let Some(uploader) = uploader else { return Err(SinkError::Snowflake(format!( @@ -307,13 +305,13 @@ impl SnowflakeSinkWriter { ) )); }; - uploader.finish().await.map_err(|err| { + uploader.0.finish().await.map_err(|err| { SinkError::Snowflake(format!( "failed to finish streaming upload to s3 for snowflake sink, error: {}", err )) })?; - Ok(()) + Ok(uploader.1) } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -345,10 +343,10 @@ impl SnowflakeSinkWriter { /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn commit(&mut self) -> Result<()> { - self.finish_streaming_upload().await?; + let file_suffix = self.finish_streaming_upload().await?; // trigger `insertFiles` post request to snowflake self.http_client - .send_request(self.file_suffix.as_ref().unwrap().as_str()) + .send_request(&file_suffix) .await?; self.reset_streaming_uploader(); Ok(()) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index e982999a93759..27daa718b64f9 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -564,7 +564,7 @@ SnowflakeConfig: required: true - name: snowflake.s3_path field_type: String - comments: The optional s3 path to be specified the actual file location would be `s3://:///` if this field is specified by user(s) otherwise it would be `s3://://` + comments: The optional s3 path to be specified the actual file location would be `s3:////` if this field is specified by user(s) otherwise it would be `s3:///` required: false - name: snowflake.aws_access_key_id field_type: String From fc4182a773cc045c3e38fa45d59a375587b0627e Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 17 Apr 2024 14:34:31 -0400 Subject: [PATCH 21/30] use BytesMut to perform chunk-by-chunk upload --- src/connector/src/sink/snowflake.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index ec9b79adc22fd..9a70d0020a30b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use bytes::Bytes; +use bytes::BytesMut; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -38,6 +39,7 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; +const INITIAL_CHUNK_CAPACITY: usize = 1024; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { @@ -315,11 +317,13 @@ impl SnowflakeSinkWriter { } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let mut chunk_buf = BytesMut::with_capacity(INITIAL_CHUNK_CAPACITY); for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); - let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.streaming_upload(row_json_string.into()).await?; + chunk_buf.extend_from_slice(Value::Object(self.row_encoder.encode(row)?).to_string().as_bytes()); } + // streaming upload in a chunk-by-chunk manner + self.streaming_upload(chunk_buf.freeze()).await?; Ok(()) } From da56015b54cdebbb67c710a0fdfe21c8da76987e Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 17 Apr 2024 14:34:59 -0400 Subject: [PATCH 22/30] update fmt --- src/connector/src/sink/snowflake.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 9a70d0020a30b..6faa98f553e54 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -17,8 +17,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -320,7 +319,11 @@ impl SnowflakeSinkWriter { let mut chunk_buf = BytesMut::with_capacity(INITIAL_CHUNK_CAPACITY); for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); - chunk_buf.extend_from_slice(Value::Object(self.row_encoder.encode(row)?).to_string().as_bytes()); + chunk_buf.extend_from_slice( + Value::Object(self.row_encoder.encode(row)?) + .to_string() + .as_bytes(), + ); } // streaming upload in a chunk-by-chunk manner self.streaming_upload(chunk_buf.freeze()).await?; @@ -349,9 +352,7 @@ impl SnowflakeSinkWriter { async fn commit(&mut self) -> Result<()> { let file_suffix = self.finish_streaming_upload().await?; // trigger `insertFiles` post request to snowflake - self.http_client - .send_request(&file_suffix) - .await?; + self.http_client.send_request(&file_suffix).await?; self.reset_streaming_uploader(); Ok(()) } From 44e2ed7feb2d79443b98f7c1ea25268f4b5f4154 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 12:04:47 -0400 Subject: [PATCH 23/30] use row_buf as intermediate buffer to prevent temporary string allocation --- src/connector/src/sink/snowflake.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 6faa98f553e54..0e581b043763c 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::io::Write; use anyhow::anyhow; use async_trait::async_trait; @@ -38,7 +39,7 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; -const INITIAL_CHUNK_CAPACITY: usize = 1024; +const INITIAL_ROW_CAPACITY: usize = 1024; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { @@ -316,15 +317,23 @@ impl SnowflakeSinkWriter { } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut chunk_buf = BytesMut::with_capacity(INITIAL_CHUNK_CAPACITY); + let mut chunk_buf = BytesMut::new(); + let mut row_buf: Vec = Vec::with_capacity(INITIAL_ROW_CAPACITY); + + // write the json representations of the row(s) in current chunk to `chunk_buf` for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); - chunk_buf.extend_from_slice( - Value::Object(self.row_encoder.encode(row)?) - .to_string() - .as_bytes(), - ); + // to prevent temporary string allocation, + // so we use an intermediate vector buffer instead. + write!(row_buf, "{}", Value::Object(self.row_encoder.encode(row)?)) + .map_err(|err| SinkError::Snowflake(format!( + "failed to write json object to `row_buf`, error: {}", + err + )))?; + chunk_buf.extend_from_slice(&row_buf); + row_buf.clear(); } + // streaming upload in a chunk-by-chunk manner self.streaming_upload(chunk_buf.freeze()).await?; Ok(()) From 3ca9b493e01f386bab1f2f5073c3814673ef5f38 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 12:11:11 -0400 Subject: [PATCH 24/30] refactor finish_streaming_upload --- src/connector/src/sink/snowflake.rs | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 0e581b043763c..46681fcee4ff4 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -260,11 +260,6 @@ impl SnowflakeSinkWriter { Ok(()) } - /// should be *only* called after `commit` per epoch. - fn reset_streaming_uploader(&mut self) { - self.streaming_uploader = None; - } - /// the `Option` is the flag to determine if there is data to upload. fn has_data(&self) -> bool { self.streaming_uploader.is_some() @@ -298,14 +293,11 @@ impl SnowflakeSinkWriter { /// finalize streaming upload for this epoch. /// ensure all the data has been properly uploaded to intermediate s3. - async fn finish_streaming_upload(&mut self) -> Result { + async fn finish_streaming_upload(&mut self) -> Result> { let uploader = std::mem::take(&mut self.streaming_uploader); let Some(uploader) = uploader else { - return Err(SinkError::Snowflake(format!( - "streaming uploader is not valid when trying to finish streaming upload for epoch {}", - self.epoch - ) - )); + // there is no data to be uploaded for this epoch + return Ok(None); }; uploader.0.finish().await.map_err(|err| { SinkError::Snowflake(format!( @@ -313,7 +305,7 @@ impl SnowflakeSinkWriter { err )) })?; - Ok(uploader.1) + Ok(Some(uploader.1)) } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -359,10 +351,15 @@ impl SnowflakeSinkWriter { /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn commit(&mut self) -> Result<()> { - let file_suffix = self.finish_streaming_upload().await?; + // note that after `finish_streaming_upload`, do *not* interact with + // `streaming_uploader` until new data comes in at next epoch, + // since the ownership has been taken in this method, and `None` will be left. + let Some(file_suffix) = self.finish_streaming_upload().await? else { + // represents there is no data to be uploaded for this epoch + return Ok(()); + }; // trigger `insertFiles` post request to snowflake self.http_client.send_request(&file_suffix).await?; - self.reset_streaming_uploader(); Ok(()) } } From 6ae3261a5f816d19afc0acb7f4c33a257c32930f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 12:11:37 -0400 Subject: [PATCH 25/30] update fmt --- src/connector/src/sink/snowflake.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 46681fcee4ff4..aa16d8183f87b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -13,8 +13,8 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use std::io::Write; +use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; @@ -317,11 +317,12 @@ impl SnowflakeSinkWriter { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); // to prevent temporary string allocation, // so we use an intermediate vector buffer instead. - write!(row_buf, "{}", Value::Object(self.row_encoder.encode(row)?)) - .map_err(|err| SinkError::Snowflake(format!( + write!(row_buf, "{}", Value::Object(self.row_encoder.encode(row)?)).map_err(|err| { + SinkError::Snowflake(format!( "failed to write json object to `row_buf`, error: {}", err - )))?; + )) + })?; chunk_buf.extend_from_slice(&row_buf); row_buf.clear(); } From f9025c5a603fddbbb7a04267b751c115e8fcc847 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 15:34:02 -0400 Subject: [PATCH 26/30] directly write to chunk_buf --- src/connector/src/sink/snowflake.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index aa16d8183f87b..7a9782f9cb8ef 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::io::Write; +use std::fmt::Write; use std::sync::Arc; use anyhow::anyhow; @@ -310,21 +310,18 @@ impl SnowflakeSinkWriter { async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { let mut chunk_buf = BytesMut::new(); - let mut row_buf: Vec = Vec::with_capacity(INITIAL_ROW_CAPACITY); // write the json representations of the row(s) in current chunk to `chunk_buf` for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); // to prevent temporary string allocation, - // so we use an intermediate vector buffer instead. - write!(row_buf, "{}", Value::Object(self.row_encoder.encode(row)?)).map_err(|err| { + // so we directly write to `chunk_buf` implicitly via `write_fmt`. + write!(chunk_buf, "{}", Value::Object(self.row_encoder.encode(row)?)).map_err(|err| { SinkError::Snowflake(format!( "failed to write json object to `row_buf`, error: {}", err )) })?; - chunk_buf.extend_from_slice(&row_buf); - row_buf.clear(); } // streaming upload in a chunk-by-chunk manner From 249874634b76f5d208d03f035a80f6833ab63e0f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 15:59:31 -0400 Subject: [PATCH 27/30] avoid unwrap; make new_streaming_uploader pure --- src/connector/src/sink/snowflake.rs | 37 +++++++++++------------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 7a9782f9cb8ef..14977d35a3422 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -237,11 +237,12 @@ impl SnowflakeSinkWriter { }) } - /// update the streaming uploader as well as the file suffix. + /// return a brand new the streaming uploader as well as the file suffix. /// note: should *only* be called iff after a new epoch begins, /// and `streaming_upload` being called the first time. /// i.e., lazily initialization of the internal `streaming_uploader`. - async fn new_streaming_uploader(&mut self) -> Result<()> { + /// plus, this function is *pure*, the `&mut self` is to make rustc (and tokio) happy. + async fn new_streaming_uploader(&mut self) -> Result<(Box, String)> { let file_suffix = self.file_suffix(); let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); let uploader = self @@ -256,30 +257,20 @@ impl SnowflakeSinkWriter { err )) })?; - self.streaming_uploader = Some((uploader, file_suffix)); - Ok(()) - } - - /// the `Option` is the flag to determine if there is data to upload. - fn has_data(&self) -> bool { - self.streaming_uploader.is_some() + Ok((uploader, file_suffix)) } /// write data to the current streaming uploader for this epoch. async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { - if !self.has_data() { - // lazily initialization - self.new_streaming_uploader().await?; - } - let Some(uploader) = self.streaming_uploader.as_mut() else { - return Err(SinkError::Snowflake(format!( - "expect streaming uploader to be properly initialized when performing streaming upload for epoch {}", - self.epoch - )) - ); + let (uploader, _) = match self.streaming_uploader.as_mut() { + Some(s) => s, + None => { + assert_eq!(self.streaming_uploader.is_none(), true, "expect `streaming_uploader` to be None"); + let uploader = self.new_streaming_uploader().await?; + self.streaming_uploader.insert(uploader) + } }; uploader - .0 .write_bytes(data) .await .map_err(|err| { @@ -295,17 +286,17 @@ impl SnowflakeSinkWriter { /// ensure all the data has been properly uploaded to intermediate s3. async fn finish_streaming_upload(&mut self) -> Result> { let uploader = std::mem::take(&mut self.streaming_uploader); - let Some(uploader) = uploader else { + let Some((uploader, file_suffix)) = uploader else { // there is no data to be uploaded for this epoch return Ok(None); }; - uploader.0.finish().await.map_err(|err| { + uploader.finish().await.map_err(|err| { SinkError::Snowflake(format!( "failed to finish streaming upload to s3 for snowflake sink, error: {}", err )) })?; - Ok(Some(uploader.1)) + Ok(Some(file_suffix)) } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { From 1fc8996fa13e5f08647882e5b9c181dfca0062db Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 15:59:57 -0400 Subject: [PATCH 28/30] fix check --- src/connector/src/sink/snowflake.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 14977d35a3422..ac75e3f583644 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -265,7 +265,11 @@ impl SnowflakeSinkWriter { let (uploader, _) = match self.streaming_uploader.as_mut() { Some(s) => s, None => { - assert_eq!(self.streaming_uploader.is_none(), true, "expect `streaming_uploader` to be None"); + assert_eq!( + self.streaming_uploader.is_none(), + true, + "expect `streaming_uploader` to be None" + ); let uploader = self.new_streaming_uploader().await?; self.streaming_uploader.insert(uploader) } @@ -307,7 +311,12 @@ impl SnowflakeSinkWriter { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); // to prevent temporary string allocation, // so we directly write to `chunk_buf` implicitly via `write_fmt`. - write!(chunk_buf, "{}", Value::Object(self.row_encoder.encode(row)?)).map_err(|err| { + write!( + chunk_buf, + "{}", + Value::Object(self.row_encoder.encode(row)?) + ) + .map_err(|err| { SinkError::Snowflake(format!( "failed to write json object to `row_buf`, error: {}", err From bd71443e8ea5710fb9570ebc28535bdffd73720f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 16:00:18 -0400 Subject: [PATCH 29/30] update comment --- src/connector/src/sink/snowflake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index ac75e3f583644..d61273a8636a4 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -241,7 +241,7 @@ impl SnowflakeSinkWriter { /// note: should *only* be called iff after a new epoch begins, /// and `streaming_upload` being called the first time. /// i.e., lazily initialization of the internal `streaming_uploader`. - /// plus, this function is *pure*, the `&mut self` is to make rustc (and tokio) happy. + /// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy. async fn new_streaming_uploader(&mut self) -> Result<(Box, String)> { let file_suffix = self.file_suffix(); let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); From 342d1a4525b6c7fbb96b3019cf1d17f1de2f1f26 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 18 Apr 2024 22:22:10 -0400 Subject: [PATCH 30/30] fix check --- src/connector/src/sink/snowflake.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index d61273a8636a4..e4dbbfa59f17b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -265,9 +265,8 @@ impl SnowflakeSinkWriter { let (uploader, _) = match self.streaming_uploader.as_mut() { Some(s) => s, None => { - assert_eq!( + assert!( self.streaming_uploader.is_none(), - true, "expect `streaming_uploader` to be None" ); let uploader = self.new_streaming_uploader().await?;