Skip to content

Commit

Permalink
feat(Sink): support redis sink (#11999)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Oct 23, 2023
1 parent a510421 commit 848bdda
Show file tree
Hide file tree
Showing 16 changed files with 731 additions and 86 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions integration_tests/redis-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Demo: Sinking to Redis

In this demo, we want to showcase how RisingWave is able to sink data to Redis.

1. Launch the cluster:

```sh
docker compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Redis for sink.


2. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_sink.sql

3. Execute a simple query:

```sh
docker compose exec redis redis-ctl keys *

```
We also can use 'get' to query value

```sql
select user_id, count(*) from default.demo_test group by user_id
```
7 changes: 7 additions & 0 deletions integration_tests/redis-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
21 changes: 21 additions & 0 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE SINK bhv_redis_sink_1
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
);

CREATE SINK bhv_redis_sink_2
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
redis.keyformat='user_id:{user_id}',
redis.valueformat='username:{username},event_timestamp{event_timestamp}'
);
16 changes: 16 additions & 0 deletions integration_tests/redis-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE table user_behaviors (
user_id INT,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = 1,
fields.user_id.end = 100,
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;
71 changes: 71 additions & 0 deletions integration_tests/redis-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
---
version: "3"
services:
redis:
image: 'redis:latest'
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
message_queue:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
datagen:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
restart: always
container_name: datagen
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redis = { version = "0.23.3", features = ["aio","tokio-comp","async-std-comp"] }
regex = "1.4"
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
Expand Down
62 changes: 0 additions & 62 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use anyhow::{anyhow, Ok};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
use clickhouse::Client;
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::{Authentication, Pulsar, TokioExecutor};
use rdkafka::ClientConfig;
Expand All @@ -37,7 +36,6 @@ use url::Url;
use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::deserialize_duration_from_string;
use crate::sink::doris_connector::DorisGet;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
Expand Down Expand Up @@ -406,66 +404,6 @@ impl KinesisCommon {
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ClickHouseCommon {
#[serde(rename = "clickhouse.url")]
pub url: String,
#[serde(rename = "clickhouse.user")]
pub user: String,
#[serde(rename = "clickhouse.password")]
pub password: String,
#[serde(rename = "clickhouse.database")]
pub database: String,
#[serde(rename = "clickhouse.table")]
pub table: String,
}

const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5);

impl ClickHouseCommon {
pub(crate) fn build_client(&self) -> anyhow::Result<Client> {
use hyper_tls::HttpsConnector;

let https = HttpsConnector::new();
let client = hyper::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build::<_, hyper::Body>(https);

let client = Client::with_http_client(client)
.with_url(&self.url)
.with_user(&self.user)
.with_password(&self.password)
.with_database(&self.database);
Ok(client)
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DorisCommon {
#[serde(rename = "doris.url")]
pub url: String,
#[serde(rename = "doris.user")]
pub user: String,
#[serde(rename = "doris.password")]
pub password: String,
#[serde(rename = "doris.database")]
pub database: String,
#[serde(rename = "doris.table")]
pub table: String,
}

impl DorisCommon {
pub(crate) fn build_get_client(&self) -> DorisGet {
DorisGet::new(
self.url.clone(),
self.table.clone(),
self.database.clone(),
self.user.clone(),
self.password.clone(),
)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
Expand Down
38 changes: 36 additions & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use core::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use anyhow::anyhow;
use clickhouse::{Client, Row as ClickHouseRow};
use clickhouse::{Client, Client as ClickHouseClient, Row as ClickHouseRow};
use itertools::Itertools;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
Expand All @@ -28,7 +29,6 @@ use serde_derive::Deserialize;
use serde_with::serde_as;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::common::ClickHouseCommon;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand All @@ -37,6 +37,40 @@ use crate::sink::{
pub const CLICKHOUSE_SINK: &str = "clickhouse";
const BUFFER_SIZE: usize = 1024;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ClickHouseCommon {
#[serde(rename = "clickhouse.url")]
pub url: String,
#[serde(rename = "clickhouse.user")]
pub user: String,
#[serde(rename = "clickhouse.password")]
pub password: String,
#[serde(rename = "clickhouse.database")]
pub database: String,
#[serde(rename = "clickhouse.table")]
pub table: String,
}

const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5);

impl ClickHouseCommon {
pub(crate) fn build_client(&self) -> anyhow::Result<ClickHouseClient> {
use hyper_tls::HttpsConnector;

let https = HttpsConnector::new();
let client = hyper::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build::<_, hyper::Body>(https);

let client = ClickHouseClient::with_http_client(client)
.with_url(&self.url)
.with_user(&self.user)
.with_password(&self.password)
.with_database(&self.database);
Ok(client)
}
}

#[serde_as]
#[derive(Clone, Debug, Deserialize)]
pub struct ClickHouseConfig {
Expand Down
Loading

0 comments on commit 848bdda

Please sign in to comment.