Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-sink): change to streaming upload instead of batched bulk load #16269

Merged
merged 31 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
08f8ec6
use streaming upload instead of bulk load of aws_sdk_s3
xzhseh Apr 11, 2024
7091cd3
update fmt
xzhseh Apr 11, 2024
f9a354d
update comments
xzhseh Apr 11, 2024
a6e3392
remove S3Client from aws_s3_sdk
xzhseh Apr 12, 2024
463131f
remove useless env var reading
xzhseh Apr 12, 2024
a097dd8
remove async for SnowflakeS3Client::new
xzhseh Apr 12, 2024
db709fd
fix check
xzhseh Apr 13, 2024
6e45411
update comment
xzhseh Apr 13, 2024
f084070
graceful error handling for opendal s3 engine
xzhseh Apr 15, 2024
ca4a86b
update fmt
xzhseh Apr 15, 2024
e9ab227
refactor SnowflakeSinkWriter; introduce streaming_uploader & remove p…
xzhseh Apr 15, 2024
c63e930
update fmt
xzhseh Apr 15, 2024
5e35d06
remove redundant comment regarding error handling; update comment for…
xzhseh Apr 15, 2024
f55a311
remove assertion & gracefully handle the case if streaming uploader h…
xzhseh Apr 15, 2024
a04e29e
update comment for intermediate s3 file path
xzhseh Apr 16, 2024
a2657da
lazily initialize when actual data is fed in
xzhseh Apr 16, 2024
ad09874
fix check
xzhseh Apr 16, 2024
8b3f25e
update fmt
xzhseh Apr 16, 2024
98f4eb7
update with_options_sink.yaml accordingly
xzhseh Apr 16, 2024
1239135
update with_options_sink.yaml; incorporate file_suffix in streaming_u…
xzhseh Apr 17, 2024
c8d1735
Merge branch 'main' into xzhseh/snowflake-sink-streaming-upload
xzhseh Apr 17, 2024
fc4182a
use BytesMut to perform chunk-by-chunk upload
xzhseh Apr 17, 2024
da56015
update fmt
xzhseh Apr 17, 2024
44e2ed7
use row_buf as intermediate buffer to prevent temporary string alloca…
xzhseh Apr 18, 2024
3ca9b49
refactor finish_streaming_upload
xzhseh Apr 18, 2024
6ae3261
update fmt
xzhseh Apr 18, 2024
f9025c5
directly write to chunk_buf
xzhseh Apr 18, 2024
2498746
avoid unwrap; make new_streaming_uploader pure
xzhseh Apr 18, 2024
1fc8996
fix check
xzhseh Apr 18, 2024
bd71443
update comment
xzhseh Apr 18, 2024
342d1a4
fix check
xzhseh Apr 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rumqttc = { version = "0.24.0", features = ["url"] }
Expand Down
183 changes: 119 additions & 64 deletions src/connector/src/sink/snowflake.rs
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_object_store::object::{ObjectStore, StreamingUploader};
use serde::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
Expand All @@ -29,13 +32,14 @@ use with_options::WithOptions;
use super::encoder::{
JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client};
use super::snowflake_connector::{generate_s3_file_name, SnowflakeHttpClient, SnowflakeS3Client};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam};
use crate::sink::writer::SinkWriterExt;
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const SNOWFLAKE_SINK: &str = "snowflake";
const INITIAL_ROW_CAPACITY: usize = 1024;

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct SnowflakeCommon {
Expand Down Expand Up @@ -77,9 +81,9 @@ pub struct SnowflakeCommon {
pub s3_bucket: String,

/// The optional s3 path to be specified
/// the actual file location would be `<s3_bucket>://<s3_path>/<rw_auto_gen_file_name>`
/// the actual file location would be `s3://<s3_bucket>/<s3_path>/<rw_auto_gen_intermediate_file_name>`
/// if this field is specified by user(s)
/// otherwise it would be `<s3_bucket>://<rw_auto_gen_file_name>`
/// otherwise it would be `s3://<s3_bucket>/<rw_auto_gen_intermediate_file_name>`
#[serde(rename = "snowflake.s3_path")]
pub s3_path: Option<String>,

Expand All @@ -94,11 +98,6 @@ pub struct SnowflakeCommon {
/// The s3 region, e.g., us-east-2
#[serde(rename = "snowflake.aws_region")]
pub aws_region: String,

/// The configurable max row(s) to batch,
/// which should be *explicitly* specified by user(s)
#[serde(rename = "snowflake.max_batch_row_num")]
pub max_batch_row_num: String,
}

#[serde_as]
Expand Down Expand Up @@ -137,8 +136,7 @@ impl Sink for SnowflakeSink {
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await
)?
.into_log_sinker(writer_param.sink_metrics))
}

Expand Down Expand Up @@ -177,22 +175,25 @@ pub struct SnowflakeSinkWriter {
/// the client to insert file to external storage (i.e., s3)
s3_client: SnowflakeS3Client,
row_encoder: JsonEncoder,
row_counter: u32,
payload: String,
/// the threshold for sinking to s3
max_batch_row_num: u32,
/// The current epoch, used in naming the sink files
/// mainly used for debugging purpose
epoch: u64,
/// streaming uploader to upload data to the intermediate (s3) storage.
/// this also contains the file suffix *unique* to the *local* sink writer per epoch.
/// i.e., opendal s3 engine and the file suffix for intermediate s3 file.
/// note: the option here *implicitly* indicates whether we have at
/// least call `streaming_upload` once during this epoch,
/// which is mainly used to prevent uploading empty data.
streaming_uploader: Option<(Box<dyn StreamingUploader>, String)>,
}

impl SnowflakeSinkWriter {
pub async fn new(
pub fn new(
config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Self {
) -> Result<Self> {
let http_client = SnowflakeHttpClient::new(
config.common.account_identifier.clone(),
config.common.user.clone(),
Expand All @@ -211,17 +212,9 @@ impl SnowflakeSinkWriter {
config.common.aws_access_key_id.clone(),
config.common.aws_secret_access_key.clone(),
config.common.aws_region.clone(),
)
.await;

let max_batch_row_num = config
.common
.max_batch_row_num
.clone()
.parse::<u32>()
.expect("failed to parse `snowflake.max_batch_row_num` as a `u32`");
)?;

Self {
Ok(Self {
config,
schema: schema.clone(),
pk_indices,
Expand All @@ -236,32 +229,102 @@ impl SnowflakeSinkWriter {
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
),
row_counter: 0,
payload: String::new(),
max_batch_row_num,
// initial value of `epoch` will start from 0
// initial value of `epoch` will be set to 0
epoch: 0,
}
// will be (lazily) initialized after the begin of each epoch
// when some data is ready to be upload
streaming_uploader: None,
})
}

/// reset the `payload` and `row_counter`.
/// shall *only* be called after a successful sink to s3.
fn reset(&mut self) {
self.payload.clear();
self.row_counter = 0;
/// return a brand new the streaming uploader as well as the file suffix.
/// note: should *only* be called iff after a new epoch begins,
/// and `streaming_upload` being called the first time.
/// i.e., lazily initialization of the internal `streaming_uploader`.
/// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy.
async fn new_streaming_uploader(&mut self) -> Result<(Box<dyn StreamingUploader>, String)> {
let file_suffix = self.file_suffix();
let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix);
let uploader = self
.s3_client
.opendal_s3_engine
.streaming_upload(&path)
.await
.map_err(|err| {
SinkError::Snowflake(format!(
"failed to create the streaming uploader of opendal s3 engine for epoch {}, error: {}",
self.epoch,
err
))
})?;
Ok((uploader, file_suffix))
}

/// write data to the current streaming uploader for this epoch.
async fn streaming_upload(&mut self, data: Bytes) -> Result<()> {
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
let (uploader, _) = match self.streaming_uploader.as_mut() {
Some(s) => s,
None => {
assert!(
self.streaming_uploader.is_none(),
"expect `streaming_uploader` to be None"
);
let uploader = self.new_streaming_uploader().await?;
self.streaming_uploader.insert(uploader)
}
};
uploader
.write_bytes(data)
.await
.map_err(|err| {
SinkError::Snowflake(format!(
"failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}",
err
))
})?;
Ok(())
}

fn at_sink_threshold(&self) -> bool {
self.row_counter >= self.max_batch_row_num
/// finalize streaming upload for this epoch.
/// ensure all the data has been properly uploaded to intermediate s3.
async fn finish_streaming_upload(&mut self) -> Result<Option<String>> {
let uploader = std::mem::take(&mut self.streaming_uploader);
let Some((uploader, file_suffix)) = uploader else {
// there is no data to be uploaded for this epoch
return Ok(None);
};
uploader.finish().await.map_err(|err| {
SinkError::Snowflake(format!(
"failed to finish streaming upload to s3 for snowflake sink, error: {}",
err
))
})?;
Ok(Some(file_suffix))
}

fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let mut chunk_buf = BytesMut::new();

// write the json representations of the row(s) in current chunk to `chunk_buf`
for (op, row) in chunk.rows() {
assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
self.payload.push_str(&row_json_string);
self.row_counter += 1;
// to prevent temporary string allocation,
// so we directly write to `chunk_buf` implicitly via `write_fmt`.
write!(
chunk_buf,
"{}",
Value::Object(self.row_encoder.encode(row)?)
)
.map_err(|err| {
SinkError::Snowflake(format!(
"failed to write json object to `row_buf`, error: {}",
err
))
})?;
}

// streaming upload in a chunk-by-chunk manner
self.streaming_upload(chunk_buf.freeze()).await?;
Ok(())
}

Expand All @@ -284,20 +347,16 @@ impl SnowflakeSinkWriter {

/// sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// to snowflake, to finish the overall sinking pipeline.
async fn sink_payload(&mut self) -> Result<()> {
if self.payload.is_empty() {
async fn commit(&mut self) -> Result<()> {
// note that after `finish_streaming_upload`, do *not* interact with
// `streaming_uploader` until new data comes in at next epoch,
// since the ownership has been taken in this method, and `None` will be left.
let Some(file_suffix) = self.finish_streaming_upload().await? else {
// represents there is no data to be uploaded for this epoch
return Ok(());
}
let file_suffix = self.file_suffix();
// todo: change this to streaming upload
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), file_suffix.clone())
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(file_suffix).await?;
// reset `payload` & `row_counter`
self.reset();
};
// trigger `insertFiles` post request to snowflake
self.http_client.send_request(&file_suffix).await?;
Ok(())
}
}
Expand All @@ -319,20 +378,16 @@ impl SinkWriter for SnowflakeSinkWriter {

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
if is_checkpoint {
// sink all the row(s) currently batched in `self.payload`
self.sink_payload().await?;
// finalize current streaming upload, plus notify snowflake to sink
// the corresponding data to snowflake pipe.
// note: if no data needs to be committed, then `commit` is simply a no-op.
self.commit().await?;
}
Ok(())
}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
self.append_only(chunk)?;

// When the number of row exceeds `MAX_BATCH_ROW_NUM`
if self.at_sink_threshold() {
self.sink_payload().await?;
}

self.append_only(chunk).await?;
Ok(())
}
}
Loading
Loading