diff --git a/integration_tests/snowflake-sink/README.md b/integration_tests/snowflake-sink/README.md new file mode 100644 index 0000000000000..98bcf73bcb265 --- /dev/null +++ b/integration_tests/snowflake-sink/README.md @@ -0,0 +1,45 @@ +# Example Use Case: Sinking to Snowflake + +this tutorial (and the corresponding examples) aims at showcasing how to sink data to Snowflake in RisingWave. + +## 1. Preparation + +due to the SaaS nature of snowflake, sinking data to snowflake via risingWave typically has some prerequisites. + +for detailed data-pipelining and sinking logic, please refer to the official documentation(s), e.g., [data load with snowpipe overview](https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview). + +### 1.1 S3 setup + +users will need to setup an **external** S3 bucket first, and please make sure you have the corresponding credentials, which will be required **both** in your snowflake stage and risingwave sink creation time. + +note: the required credentials including the following, i.e., +- `snowflake.s3_bucket` (a.k.a. the `URL` in snowflake stage) +- `snowflake.aws_access_key_id` (a.k.a., the `AWS_KEY_ID` in snowflake stage) +- `snowflake.aws_secret_access_key` (a.k.a. the `AWS_SECRET_KEY` in snowflake stage) + +### 1.2 Snowflake setup + +users will then need to setup the snowflake, which includes, i.e., +- generate the key-value pair for later authentication +- create a `role` and grant the appropriate permission +- setup the credential for the user (e.g., `RSA_PUBLIC_KEY`), and retrieve the `snowflake.rsa_public_key_fp` which will later be used in risingwave +- create a `table` to store the sink data from risingwave +- create a `stage` to refer the external s3 bucket, which will be used internally by snowflake to load the corresponding data +- create a `pipe` to actual receive loaded data from the pre-defined stage and copy the data to the snowflake table. + +ps. +1. this assumes the users have already created their accounts and the corresponding databases in snowflake. +2. for detailed authentication process, refer to [official authentication guide](https://docs.snowflake.com/en/developer-guide/sql-api/authenticating). +3. for detailed commands, refer to [official reference](https://docs.snowflake.com/en/reference) + +an example for snowflake setup commands could be checked at `snowflake_prep.sql`, this also corresponds to the following example sinking use case. + +## 2. Begin to sink data + +launch your risingwave cluster, and execute the following sql commands respectively. + +- `create_source.sql` +- `create_mv.sql` +- `create_sink.sql` + +note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`. diff --git a/integration_tests/snowflake-sink/create_mv.sql b/integration_tests/snowflake-sink/create_mv.sql new file mode 100644 index 0000000000000..515f6076e31a9 --- /dev/null +++ b/integration_tests/snowflake-sink/create_mv.sql @@ -0,0 +1,10 @@ +-- please note that the column name(s) for your mv should be *exactly* +-- the same as the column name(s) in your snowflake table, since we are matching column by name. + +CREATE MATERIALIZED VIEW ss_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp +FROM + user_behaviors; diff --git a/integration_tests/snowflake-sink/create_sink.sql b/integration_tests/snowflake-sink/create_sink.sql new file mode 100644 index 0000000000000..457def4f82e2d --- /dev/null +++ b/integration_tests/snowflake-sink/create_sink.sql @@ -0,0 +1,20 @@ +CREATE SINK snowflake_sink FROM ss_mv WITH ( + connector = 'snowflake', + type = 'append-only', + snowflake.database = 'EXAMPLE_DB', + snowflake.schema = 'EXAMPLE_SCHEMA', + snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE', + snowflake.account_identifier = '-', + snowflake.user = 'XZHSEH', + snowflake.rsa_public_key_fp = 'EXAMPLE_FP', + snowflake.private_key = 'EXAMPLE_PK', + snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET', + snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID', + snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY', + snowflake.aws_region = 'EXAMPLE_REGION', + snowflake.max_batch_row_num = '1030', + snowflake.s3_path = 'EXAMPLE_S3_PATH', + -- depends on your mv setup, note that snowflake sink only supports + -- append-only at present. + force_append_only = 'true' +); \ No newline at end of file diff --git a/integration_tests/snowflake-sink/create_source.sql b/integration_tests/snowflake-sink/create_source.sql new file mode 100644 index 0000000000000..ed7c02341638a --- /dev/null +++ b/integration_tests/snowflake-sink/create_source.sql @@ -0,0 +1,16 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + datagen.rows.per.second = '100' +) FORMAT PLAIN ENCODE JSON; diff --git a/integration_tests/snowflake-sink/snowflake_prep.sql b/integration_tests/snowflake-sink/snowflake_prep.sql new file mode 100644 index 0000000000000..96026d0884415 --- /dev/null +++ b/integration_tests/snowflake-sink/snowflake_prep.sql @@ -0,0 +1,29 @@ +USE EXAMPLE_DB; + +ALTER USER xzhseh SET RSA_PUBLIC_KEY='your local rsa public key'; + +-- set user permission to account admin level +GRANT ROLE ACCOUNTADMIN TO USER xzhseh; + +-- you could either retrieve the fp from desc user's info panel, +-- or from the following select stmt. +DESC USER xzhseh; +-- also fine, see the documentation for details. +SELECT TRIM( + (SELECT "value" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) + WHERE "property" = 'RSA_PUBLIC_KEY_FP'), + 'SHA256:' +); + +-- snowflake table, note to keep the same column name(s). +CREATE OR REPLACE TABLE example_snowflake_sink_table (user_id INT, target_id VARCHAR, event_timestamp TIMESTAMP_TZ); + +-- snowflake stage, we only supports json as sink format at present +CREATE OR REPLACE STAGE example_snowflake_stage URL = '' + credentials = ( AWS_KEY_ID = '' AWS_SECRET_KEY = '' ) file_format = ( type = JSON ); + +-- snowflake pipe +CREATE OR REPLACE PIPE example_snowflake_pipe AS COPY INTO example_snowflake_sink_table FROM @example_snowflake_stage MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; + +-- select from table after sinking from rw +SELECT * FROM example_snowflake_sink_table WHERE event_timestamp IS NOT NULL; diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index ba0973a0b0145..f4901b025effc 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -245,7 +245,7 @@ impl SnowflakeSinkWriter { } /// reset the `payload` and `row_counter`. - /// shall *only* be called after a successful sink. + /// shall *only* be called after a successful sink to s3. fn reset(&mut self) { self.payload.clear(); self.row_counter = 0; @@ -288,13 +288,14 @@ impl SnowflakeSinkWriter { if self.payload.is_empty() { return Ok(()); } + let file_suffix = self.file_suffix(); // todo: change this to streaming upload // first sink to the external stage provided by user (i.e., s3) self.s3_client - .sink_to_s3(self.payload.clone().into(), self.file_suffix()) + .sink_to_s3(self.payload.clone().into(), file_suffix.clone()) .await?; // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(self.file_suffix()).await?; + self.http_client.send_request(file_suffix).await?; // reset `payload` & `row_counter` self.reset(); Ok(())