Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): support switching from pg jdbc to pg native sinks #19703

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 32 additions & 9 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,30 @@ shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

prepare_pg() {
# set up PG sink destination
export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" || true
dropdb -h db -U postgres test || true
createdb -h db -U postgres test
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);"
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql
}

# Change process number limit
echo "--- os limits"
ulimit -a

echo "--- Download connector node package"
echo "--- download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- download pg dependencies"
apt-get -y install postgresql-client jq

echo "--- prepare mysql"
# prepare environment mysql sink
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;"
# grant access to `test` for ci test user
Expand All @@ -42,15 +57,20 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql

echo "--- preparing postgresql"
prepare_pg

# set up PG sink destination
apt-get -y install postgresql-client jq
export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);"
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql
echo "--- starting risingwave cluster: ci-1cn-1fe-switch-to-pg-native"
risedev ci-start ci-1cn-1fe-jdbc-to-native

echo "--- test sink: jdbc:postgres switch to postgres native"
# check sink destination postgres
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
sleep 1
sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

echo "--- killing risingwave cluster: ci-1cn-1fe-switch-to-pg-native"
risedev ci-kill

echo "--- starting risingwave cluster"
risedev ci-start ci-1cn-1fe
Expand All @@ -64,6 +84,9 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/file_sink.slt'
sleep 1

echo "--- preparing postgresql"
prepare_pg

echo "--- testing remote sinks"

# check sink destination postgres
Expand Down
12 changes: 12 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,18 @@ profile:
- use: frontend
- use: compactor

ci-1cn-1fe-jdbc-to-native:
config-path: src/config/ci-jdbc-to-native.toml
steps:
- use: minio
- use: sqlite
- use: meta-node
meta-backend: sqlite
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-3cn-1fe:
config-path: src/config/ci.toml
steps:
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,11 @@ pub struct StreamingDeveloperConfig {
/// even if session variable set.
/// If true, it's decided by session variable `streaming_use_shared_source` (default true)
pub enable_shared_source: bool,

#[serde(default = "default::developer::switch_jdbc_pg_to_native")]
/// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
pub switch_jdbc_pg_to_native: bool,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -2104,6 +2109,10 @@ pub mod default {
pub fn stream_enable_auto_schema_change() -> bool {
true
}

pub fn switch_jdbc_pg_to_native() -> bool {
false
}
}

pub use crate::system_param::default as system;
Expand Down
30 changes: 30 additions & 0 deletions src/config/ci-jdbc-to-native.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[meta]
disable_recovery = true
max_heartbeat_interval_secs = 60

[meta.compaction_config]
level0_tier_compact_file_number = 6
level0_overlapping_sub_level_compact_level_count = 3
level0_max_compact_file_number = 96

[streaming]
in_flight_barrier_nums = 10

[streaming.developer]
stream_exchange_concurrent_barriers = 10
switch_jdbc_pg_to_native = true

[storage]
imm_merge_threshold = 2
max_cached_recent_versions_number = 200

[storage.object_store.retry]
streaming_upload_attempt_timeout_ms = 10000
upload_retry_attempts = 5
read_attempt_timeout_ms = 16000
read_retry_attempts = 6

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
max_concurrent_creating_streaming_jobs = 0
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true
stream_switch_jdbc_pg_to_native = false

[storage]
share_buffers_sync_parallelism = 1
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ tokio-retry = "0.3"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
url = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
112 changes: 102 additions & 10 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use anyhow::anyhow;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, Schema};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
Expand All @@ -28,6 +29,7 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::plan_common::PbColumnCatalog;
use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
use url::Url;

use super::*;
use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
Expand Down Expand Up @@ -158,13 +160,40 @@ impl ExecutorBuilder for SinkExecutorBuilder {
.map(ColumnCatalog::from)
.collect_vec();

let mut properties_with_secret =
LocalSecretManager::global().fill_secrets(properties, secret_refs)?;

if params.env.config().developer.switch_jdbc_pg_to_native
&& let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
&& connector_type == "jdbc"
&& let Some(url) = properties_with_secret.get("jdbc.url")
&& url.starts_with("jdbc:postgresql:")
{
let jdbc_url = parse_jdbc_url(url)
.map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?;
properties_with_secret.insert("host".to_owned(), jdbc_url.host);
properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
properties_with_secret.insert("user".to_owned(), jdbc_url.username);
properties_with_secret.insert("password".to_owned(), jdbc_url.password);
if let Some(table_name) = properties_with_secret.get("table.name") {
properties_with_secret.insert("table".to_owned(), table_name.clone());
}
if let Some(schema_name) = properties_with_secret.get("schema.name") {
properties_with_secret.insert("schema".to_owned(), schema_name.clone());
}
// TODO(kwannoel): Do we need to handle jdbc.query.timeout?
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

let connector = {
let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| {
StreamExecutorError::from((
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
sink_id.sink_id,
))
})?;
let sink_type = properties_with_secret
.get(CONNECTOR_TYPE_KEY)
.ok_or_else(|| {
StreamExecutorError::from((
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
sink_id.sink_id,
))
})?;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
Expand All @@ -185,7 +214,7 @@ impl ExecutorBuilder for SinkExecutorBuilder {
.try_into()
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
),
None => match sink_desc.properties.get(SINK_TYPE_OPTION) {
None => match properties_with_secret.get(SINK_TYPE_OPTION) {
// Case B: old syntax `type = '...'`
Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
Expand All @@ -194,9 +223,6 @@ impl ExecutorBuilder for SinkExecutorBuilder {
},
};

let properties_with_secret =
LocalSecretManager::global().fill_secrets(properties, secret_refs)?;

let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;

Expand Down Expand Up @@ -296,3 +322,69 @@ impl ExecutorBuilder for SinkExecutorBuilder {
Ok((params.info, exec).into())
}
}

struct JdbcUrl {
host: String,
port: u16,
db_name: String,
username: String,
password: String,
}

fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
if !url.starts_with("jdbc:postgresql") {
bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://...")
}

// trim the "jdbc:" prefix to make it a valid url
let url = url.replace("jdbc:", "");

// parse the url
let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;

let scheme = url.scheme();
assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
let host = url
.host_str()
.ok_or_else(|| anyhow!("missing host in jdbc url"))?;
let port = url
.port()
.ok_or_else(|| anyhow!("missing port in jdbc url"))?;
let db_name = url.path();
let mut username = None;
let mut password = None;
for (key, value) in url.query_pairs() {
if key == "user" {
username = Some(value.to_string());
}
if key == "password" {
password = Some(value.to_string());
}
}
let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?;
let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?;

Ok(JdbcUrl {
host: host.to_owned(),
port,
db_name: db_name.to_owned(),
username: username.to_owned(),
password: password.to_owned(),
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_jdbc_url() {
let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
let jdbc_url = parse_jdbc_url(url).unwrap();
assert_eq!(jdbc_url.host, "localhost");
assert_eq!(jdbc_url.port, 5432);
assert_eq!(jdbc_url.db_name, "/test");
assert_eq!(jdbc_url.username, "postgres");
assert_eq!(jdbc_url.password, "postgres");
}
}
Loading