-
Notifications
You must be signed in to change notification settings - Fork 596
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(snowflake-sink): add example use case & detailed spec; fix a sub…
…tle problem regarding `file_suffix` (#16241)
- Loading branch information
Showing
6 changed files
with
124 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Example Use Case: Sinking to Snowflake | ||
|
||
this tutorial (and the corresponding examples) aims at showcasing how to sink data to Snowflake in RisingWave. | ||
|
||
## 1. Preparation | ||
|
||
due to the SaaS nature of snowflake, sinking data to snowflake via risingWave typically has some prerequisites. | ||
|
||
for detailed data-pipelining and sinking logic, please refer to the official documentation(s), e.g., [data load with snowpipe overview](https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview). | ||
|
||
### 1.1 S3 setup | ||
|
||
users will need to setup an **external** S3 bucket first, and please make sure you have the corresponding credentials, which will be required **both** in your snowflake stage and risingwave sink creation time. | ||
|
||
note: the required credentials including the following, i.e., | ||
- `snowflake.s3_bucket` (a.k.a. the `URL` in snowflake stage) | ||
- `snowflake.aws_access_key_id` (a.k.a., the `AWS_KEY_ID` in snowflake stage) | ||
- `snowflake.aws_secret_access_key` (a.k.a. the `AWS_SECRET_KEY` in snowflake stage) | ||
|
||
### 1.2 Snowflake setup | ||
|
||
users will then need to setup the snowflake, which includes, i.e., | ||
- generate the key-value pair for later authentication | ||
- create a `role` and grant the appropriate permission | ||
- setup the credential for the user (e.g., `RSA_PUBLIC_KEY`), and retrieve the `snowflake.rsa_public_key_fp` which will later be used in risingwave | ||
- create a `table` to store the sink data from risingwave | ||
- create a `stage` to refer the external s3 bucket, which will be used internally by snowflake to load the corresponding data | ||
- create a `pipe` to actual receive loaded data from the pre-defined stage and copy the data to the snowflake table. | ||
|
||
ps. | ||
1. this assumes the users have already created their accounts and the corresponding databases in snowflake. | ||
2. for detailed authentication process, refer to [official authentication guide](https://docs.snowflake.com/en/developer-guide/sql-api/authenticating). | ||
3. for detailed commands, refer to [official reference](https://docs.snowflake.com/en/reference) | ||
|
||
an example for snowflake setup commands could be checked at `snowflake_prep.sql`, this also corresponds to the following example sinking use case. | ||
|
||
## 2. Begin to sink data | ||
|
||
launch your risingwave cluster, and execute the following sql commands respectively. | ||
|
||
- `create_source.sql` | ||
- `create_mv.sql` | ||
- `create_sink.sql` | ||
|
||
note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
-- please note that the column name(s) for your mv should be *exactly* | ||
-- the same as the column name(s) in your snowflake table, since we are matching column by name. | ||
|
||
CREATE MATERIALIZED VIEW ss_mv AS | ||
SELECT | ||
user_id, | ||
target_id, | ||
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp | ||
FROM | ||
user_behaviors; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
CREATE SINK snowflake_sink FROM ss_mv WITH ( | ||
connector = 'snowflake', | ||
type = 'append-only', | ||
snowflake.database = 'EXAMPLE_DB', | ||
snowflake.schema = 'EXAMPLE_SCHEMA', | ||
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE', | ||
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>', | ||
snowflake.user = 'XZHSEH', | ||
snowflake.rsa_public_key_fp = 'EXAMPLE_FP', | ||
snowflake.private_key = 'EXAMPLE_PK', | ||
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET', | ||
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID', | ||
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY', | ||
snowflake.aws_region = 'EXAMPLE_REGION', | ||
snowflake.max_batch_row_num = '1030', | ||
snowflake.s3_path = 'EXAMPLE_S3_PATH', | ||
-- depends on your mv setup, note that snowflake sink only supports | ||
-- append-only at present. | ||
force_append_only = 'true' | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
CREATE table user_behaviors ( | ||
user_id int, | ||
target_id VARCHAR, | ||
target_type VARCHAR, | ||
event_timestamp TIMESTAMPTZ, | ||
behavior_type VARCHAR, | ||
parent_target_type VARCHAR, | ||
parent_target_id VARCHAR, | ||
PRIMARY KEY(user_id) | ||
) WITH ( | ||
connector = 'datagen', | ||
fields.user_id.kind = 'sequence', | ||
fields.user_id.start = '1', | ||
fields.user_id.end = '1000', | ||
datagen.rows.per.second = '100' | ||
) FORMAT PLAIN ENCODE JSON; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
USE EXAMPLE_DB; | ||
|
||
ALTER USER xzhseh SET RSA_PUBLIC_KEY='your local rsa public key'; | ||
|
||
-- set user permission to account admin level | ||
GRANT ROLE ACCOUNTADMIN TO USER xzhseh; | ||
|
||
-- you could either retrieve the fp from desc user's info panel, | ||
-- or from the following select stmt. | ||
DESC USER xzhseh; | ||
-- also fine, see the documentation for details. | ||
SELECT TRIM( | ||
(SELECT "value" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) | ||
WHERE "property" = 'RSA_PUBLIC_KEY_FP'), | ||
'SHA256:' | ||
); | ||
|
||
-- snowflake table, note to keep the same column name(s). | ||
CREATE OR REPLACE TABLE example_snowflake_sink_table (user_id INT, target_id VARCHAR, event_timestamp TIMESTAMP_TZ); | ||
|
||
-- snowflake stage, we only supports json as sink format at present | ||
CREATE OR REPLACE STAGE example_snowflake_stage URL = '<S3_PATH>' | ||
credentials = ( AWS_KEY_ID = '<S3_CREDENTIALS>' AWS_SECRET_KEY = '<S3_CREDENTIALS>' ) file_format = ( type = JSON ); | ||
|
||
-- snowflake pipe | ||
CREATE OR REPLACE PIPE example_snowflake_pipe AS COPY INTO example_snowflake_sink_table FROM @example_snowflake_stage MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; | ||
|
||
-- select from table after sinking from rw | ||
SELECT * FROM example_snowflake_sink_table WHERE event_timestamp IS NOT NULL; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters