From 0cc6d8ba34bc045838714a47e74b4a8a55b6bb30 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 15:00:31 +0800 Subject: [PATCH 01/15] feat(common): support switching from pg jdbc to pg native sinks --- src/common/src/config.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 1da7efdf008a0..c332cb76bfba1 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1145,6 +1145,10 @@ pub struct StreamingDeveloperConfig { /// even if session variable set. /// If true, it's decided by session variable `streaming_use_shared_source` (default true) pub enable_shared_source: bool, + + #[serde(default = "default::developer::switch_jdbc_pg_to_native")] + /// When true, all jdbc sinks for pg will be switched to native pg sinks. + pub switch_jdbc_pg_to_native: bool, } /// The subsections `[batch.developer]`. @@ -2104,6 +2108,10 @@ pub mod default { pub fn stream_enable_auto_schema_change() -> bool { true } + + pub fn switch_jdbc_pg_to_native() -> bool { + false + } } pub use crate::system_param::default as system; From 9488c4dbd332e524c348fa48031a7d77963023f1 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 20:49:57 +0800 Subject: [PATCH 02/15] do switching inside the from_proto constructor --- Cargo.lock | 1 + src/stream/Cargo.toml | 1 + src/stream/src/from_proto/sink.rs | 107 +++++++++++++++++++++++++++--- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b9fb3994f247..5219a09983d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12273,6 +12273,7 @@ dependencies = [ "tokio-stream 0.1.15", "tracing", "tracing-test", + "url", "workspace-hack", ] diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 3c30e0fccf6d7..8a0061ab99ea9 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -83,6 +83,7 @@ tokio-retry = "0.3" tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" +url = "2" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 48d58b74f908a..27d268e83d47e 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use anyhow::anyhow; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnCatalog, Schema}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; @@ -28,6 +29,7 @@ use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; +use url::Url; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -158,13 +160,40 @@ impl ExecutorBuilder for SinkExecutorBuilder { .map(ColumnCatalog::from) .collect_vec(); + let mut properties_with_secret = + LocalSecretManager::global().fill_secrets(properties, secret_refs)?; + + if params.env.config().developer.switch_jdbc_pg_to_native + && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY) + && connector_type == "jdbc" + && let Some(url) = properties_with_secret.get("jdbc.url") + && url.starts_with("jdbc:postgresql:") + { + let jdbc_url = parse_jdbc_url(url) + .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?; + properties_with_secret.insert("host".to_string(), jdbc_url.host); + properties_with_secret.insert("port".to_string(), jdbc_url.port.to_string()); + properties_with_secret.insert("database".to_string(), jdbc_url.db_name); + properties_with_secret.insert("user".to_string(), jdbc_url.username); + properties_with_secret.insert("password".to_string(), jdbc_url.password); + if let Some(table_name) = properties_with_secret.get("table.name") { + properties_with_secret.insert("table".to_string(), table_name.clone()); + } + if let Some(schema_name) = properties_with_secret.get("schema.name") { + properties_with_secret.insert("schema".to_string(), schema_name.clone()); + } + // TODO(kwannoel): Do we need to handle jdbc.query.timeout? + } + let connector = { - let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - StreamExecutorError::from(( - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)), - sink_id.sink_id, - )) - })?; + let sink_type = properties_with_secret + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| { + StreamExecutorError::from(( + SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)), + sink_id.sink_id, + )) + })?; match_sink_name_str!( sink_type.to_lowercase().as_str(), @@ -185,7 +214,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { .try_into() .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, ), - None => match sink_desc.properties.get(SINK_TYPE_OPTION) { + None => match properties_with_secret.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` Some(t) => SinkFormatDesc::from_legacy_type(connector, t) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, @@ -194,9 +223,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { }, }; - let properties_with_secret = - LocalSecretManager::global().fill_secrets(properties, secret_refs)?; - let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?; @@ -296,3 +322,64 @@ impl ExecutorBuilder for SinkExecutorBuilder { Ok((params.info, exec).into()) } } + +struct JdbcUrl { + host: String, + port: u16, + db_name: String, + username: String, + password: String, +} + +fn parse_jdbc_url(url: &str) -> anyhow::Result { + if !url.contains("jdbc:postgres") { + bail!("invalid jdbc url") + } + + let url = url.replace("jdbc:", ""); + let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?; + let scheme = url.scheme(); + assert_eq!("postgresql", scheme); + let host = url + .host_str() + .ok_or_else(|| anyhow!("missing host in jdbc url"))?; + let port = url + .port() + .ok_or_else(|| anyhow!("missing port in jdbc url"))?; + let db_name = url.path(); + let mut username = None; + let mut password = None; + for (key, value) in url.query_pairs() { + if key == "user" { + username = Some(value.to_string()); + } + if key == "password" { + password = Some(value.to_string()); + } + } + let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?; + let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; + Ok(JdbcUrl { + host: host.to_string(), + port, + db_name: db_name.to_string(), + username: username.to_string(), + password: password.to_string(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_jdbc_url() { + let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres"; + let jdbc_url = parse_jdbc_url(url).unwrap(); + assert_eq!(jdbc_url.host, "localhost"); + assert_eq!(jdbc_url.port, 5432); + assert_eq!(jdbc_url.db_name, "/test"); + assert_eq!(jdbc_url.username, "postgres"); + assert_eq!(jdbc_url.password, "postgres"); + } +} From 006a025273dfa57b4bb3a0fb6d08fca3b89cb80d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 21:06:19 +0800 Subject: [PATCH 03/15] add tests --- ci/scripts/e2e-sink-test.sh | 14 ++++++++++++++ risedev.yml | 12 ++++++++++++ src/config/ci-jdbc-to-native.toml | 30 ++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) create mode 100644 src/config/ci-jdbc-to-native.toml diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index ad58d2b96d1fb..5bd01f21afb24 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -52,6 +52,20 @@ psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int); psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql + +echo "--- starting risingwave cluster: ci-1cn-1fe-switch-to-pg-native" +risedev ci-start ci-1cn-1fe-switch-to-pg-native + +echo "--- test sink: jdbc:postgres switch to postgres native" +# check sink destination postgres +sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt' +sleep 1 +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sleep 1 + +echo "--- killing risingwave cluster: ci-1cn-1fe-switch-to-pg-native" +risedev ci-kill + echo "--- starting risingwave cluster" risedev ci-start ci-1cn-1fe diff --git a/risedev.yml b/risedev.yml index e2f3db84ada6a..eeae0d0024b87 100644 --- a/risedev.yml +++ b/risedev.yml @@ -580,6 +580,18 @@ profile: - use: frontend - use: compactor + ci-1cn-1fe-jdbc-to-native-pg: + config-path: src/config/ci-jdbc-to-native.toml + steps: + - use: minio + - use: sqlite + - use: meta-node + meta-backend: sqlite + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + ci-3cn-1fe: config-path: src/config/ci.toml steps: diff --git a/src/config/ci-jdbc-to-native.toml b/src/config/ci-jdbc-to-native.toml new file mode 100644 index 0000000000000..368d2f0760b4c --- /dev/null +++ b/src/config/ci-jdbc-to-native.toml @@ -0,0 +1,30 @@ +[meta] +disable_recovery = true +max_heartbeat_interval_secs = 60 + +[meta.compaction_config] +level0_tier_compact_file_number = 6 +level0_overlapping_sub_level_compact_level_count = 3 +level0_max_compact_file_number = 96 + +[streaming] +in_flight_barrier_nums = 10 + +[streaming.developer] +stream_exchange_concurrent_barriers = 10 +switch_jdbc_pg_to_native = true + +[storage] +imm_merge_threshold = 2 +max_cached_recent_versions_number = 200 + +[storage.object_store.retry] +streaming_upload_attempt_timeout_ms = 10000 +upload_retry_attempts = 5 +read_attempt_timeout_ms = 16000 +read_retry_attempts = 6 + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 5 +max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file From 6e1837e4267f9808866efe2722924fef2b2b45cc Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 21:43:17 +0800 Subject: [PATCH 04/15] fix name --- ci/scripts/e2e-sink-test.sh | 2 +- risedev.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 5bd01f21afb24..5139c0e1ac8d1 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -54,7 +54,7 @@ psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql echo "--- starting risingwave cluster: ci-1cn-1fe-switch-to-pg-native" -risedev ci-start ci-1cn-1fe-switch-to-pg-native +risedev ci-start ci-1cn-1fe-jdbc-to-native echo "--- test sink: jdbc:postgres switch to postgres native" # check sink destination postgres diff --git a/risedev.yml b/risedev.yml index eeae0d0024b87..b8fa94c1faaa6 100644 --- a/risedev.yml +++ b/risedev.yml @@ -580,7 +580,7 @@ profile: - use: frontend - use: compactor - ci-1cn-1fe-jdbc-to-native-pg: + ci-1cn-1fe-jdbc-to-native: config-path: src/config/ci-jdbc-to-native.toml steps: - use: minio From 76c079bd2d4d12e17bd2ed5621ca2aa0cc857787 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 22:26:41 +0800 Subject: [PATCH 05/15] fix example --- src/config/example.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/example.toml b/src/config/example.toml index 2227f68bb136b..18eeca662113c 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -146,6 +146,7 @@ stream_enable_actor_tokio_metrics = false stream_exchange_connection_pool_size = 1 stream_enable_auto_schema_change = true stream_enable_shared_source = true +stream_switch_jdbc_pg_to_native = false [storage] share_buffers_sync_parallelism = 1 From c12061c593cb703e20651f7ef3ebdb09d78d8a9c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 22:57:42 +0800 Subject: [PATCH 06/15] prepare pg --- ci/scripts/e2e-sink-test.sh | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 5139c0e1ac8d1..2f57d2d8146aa 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -25,15 +25,30 @@ shift $((OPTIND -1)) download_and_prepare_rw "$profile" source +prepare_pg() { + # set up PG sink destination + dropdb -h db -U postgres test || true + export PGPASSWORD=postgres + psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" + createdb -h db -U postgres test + psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" + psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" + psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql +} + # Change process number limit echo "--- os limits" ulimit -a -echo "--- Download connector node package" +echo "--- download connector node package" buildkite-agent artifact download risingwave-connector.tar.gz ./ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node +echo "--- download pg dependencies" +apt-get -y install postgresql-client jq + +echo "--- prepare mysql" # prepare environment mysql sink mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;" # grant access to `test` for ci test user @@ -42,16 +57,7 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql echo "--- preparing postgresql" - -# set up PG sink destination -apt-get -y install postgresql-client jq -export PGPASSWORD=postgres -psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" -createdb -h db -U postgres test -psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" -psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" -psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql - +prepare_pg echo "--- starting risingwave cluster: ci-1cn-1fe-switch-to-pg-native" risedev ci-start ci-1cn-1fe-jdbc-to-native @@ -78,6 +84,9 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/file_sink.slt' sleep 1 +echo "--- preparing postgresql" +prepare_pg + echo "--- testing remote sinks" # check sink destination postgres From 5e41e08aea77c1173112001c2c423f0f49bc5bc6 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 7 Dec 2024 16:10:45 +0800 Subject: [PATCH 07/15] pw --- ci/scripts/e2e-sink-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 2f57d2d8146aa..e62688e13705b 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -27,8 +27,8 @@ download_and_prepare_rw "$profile" source prepare_pg() { # set up PG sink destination - dropdb -h db -U postgres test || true export PGPASSWORD=postgres + dropdb -h db -U postgres test || true psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" createdb -h db -U postgres test psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" From 9763426e7f97e28a37b5543c74ecdebcd195bdfe Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 7 Dec 2024 16:53:02 +0800 Subject: [PATCH 08/15] create role if not exists --- ci/scripts/e2e-sink-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index e62688e13705b..5a43834361761 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -28,8 +28,8 @@ download_and_prepare_rw "$profile" source prepare_pg() { # set up PG sink destination export PGPASSWORD=postgres + psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" || true dropdb -h db -U postgres test || true - psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" createdb -h db -U postgres test psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" From 34b95558570e06f7541e503fef7df0a599b123d9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:04:30 +0800 Subject: [PATCH 09/15] doc --- src/stream/src/from_proto/sink.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 27d268e83d47e..ad1dff2225dba 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -332,14 +332,14 @@ struct JdbcUrl { } fn parse_jdbc_url(url: &str) -> anyhow::Result { - if !url.contains("jdbc:postgres") { - bail!("invalid jdbc url") + if url.starts_with("jdbc:postgres") { + bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgres://...") } let url = url.replace("jdbc:", ""); let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?; let scheme = url.scheme(); - assert_eq!("postgresql", scheme); + assert_eq!("postgresql", scheme, "jdbc target should be postgres"); let host = url .host_str() .ok_or_else(|| anyhow!("missing host in jdbc url"))?; From 386ef6b3b2142fa00d1e0bb2d1629b47ff06385c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:08:44 +0800 Subject: [PATCH 10/15] docs --- src/common/src/config.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c332cb76bfba1..154ac0c260fb6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1147,7 +1147,8 @@ pub struct StreamingDeveloperConfig { pub enable_shared_source: bool, #[serde(default = "default::developer::switch_jdbc_pg_to_native")] - /// When true, all jdbc sinks for pg will be switched to native pg sinks. + /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..." + /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks. pub switch_jdbc_pg_to_native: bool, } From e1383f60e1a12cdbd46f064b0b91e8fe6a53de9a Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:09:44 +0800 Subject: [PATCH 11/15] fix --- src/stream/src/from_proto/sink.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index ad1dff2225dba..6f409e46277a1 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -332,8 +332,8 @@ struct JdbcUrl { } fn parse_jdbc_url(url: &str) -> anyhow::Result { - if url.starts_with("jdbc:postgres") { - bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgres://...") + if url.starts_with("jdbc:postgresql") { + bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://...") } let url = url.replace("jdbc:", ""); From b93482ca65e26f8ff743bb0997692494d3bad5bc Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:10:29 +0800 Subject: [PATCH 12/15] fix --- src/stream/src/from_proto/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 6f409e46277a1..bc19c316cb09e 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -339,7 +339,7 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { let url = url.replace("jdbc:", ""); let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?; let scheme = url.scheme(); - assert_eq!("postgresql", scheme, "jdbc target should be postgres"); + assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql"); let host = url .host_str() .ok_or_else(|| anyhow!("missing host in jdbc url"))?; From f172fb42cc305d4c93d924374110f796401ba49f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:56:09 +0800 Subject: [PATCH 13/15] fix --- src/stream/src/from_proto/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index bc19c316cb09e..2a5da8d88df73 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -332,7 +332,7 @@ struct JdbcUrl { } fn parse_jdbc_url(url: &str) -> anyhow::Result { - if url.starts_with("jdbc:postgresql") { + if !url.starts_with("jdbc:postgresql") { bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://...") } From 9aca6cc5170533ac9f312b97c8d1a3680eaec45e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 11 Dec 2024 08:58:09 +0800 Subject: [PATCH 14/15] docs --- src/stream/src/from_proto/sink.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 2a5da8d88df73..3888008f3a002 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -336,8 +336,12 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://...") } + // trim the "jdbc:" prefix to make it a valid url let url = url.replace("jdbc:", ""); + + // parse the url let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?; + let scheme = url.scheme(); assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql"); let host = url @@ -359,6 +363,7 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { } let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?; let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; + Ok(JdbcUrl { host: host.to_string(), port, From 2e302e9ebe93d547d6df3680527db84c351c7679 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 12 Dec 2024 20:58:43 +0800 Subject: [PATCH 15/15] to_owned --- src/stream/src/from_proto/sink.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 3888008f3a002..74cedfaee83dc 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -171,16 +171,16 @@ impl ExecutorBuilder for SinkExecutorBuilder { { let jdbc_url = parse_jdbc_url(url) .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?; - properties_with_secret.insert("host".to_string(), jdbc_url.host); - properties_with_secret.insert("port".to_string(), jdbc_url.port.to_string()); - properties_with_secret.insert("database".to_string(), jdbc_url.db_name); - properties_with_secret.insert("user".to_string(), jdbc_url.username); - properties_with_secret.insert("password".to_string(), jdbc_url.password); + properties_with_secret.insert("host".to_owned(), jdbc_url.host); + properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string()); + properties_with_secret.insert("database".to_owned(), jdbc_url.db_name); + properties_with_secret.insert("user".to_owned(), jdbc_url.username); + properties_with_secret.insert("password".to_owned(), jdbc_url.password); if let Some(table_name) = properties_with_secret.get("table.name") { - properties_with_secret.insert("table".to_string(), table_name.clone()); + properties_with_secret.insert("table".to_owned(), table_name.clone()); } if let Some(schema_name) = properties_with_secret.get("schema.name") { - properties_with_secret.insert("schema".to_string(), schema_name.clone()); + properties_with_secret.insert("schema".to_owned(), schema_name.clone()); } // TODO(kwannoel): Do we need to handle jdbc.query.timeout? } @@ -365,11 +365,11 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; Ok(JdbcUrl { - host: host.to_string(), + host: host.to_owned(), port, - db_name: db_name.to_string(), - username: username.to_string(), - password: password.to_string(), + db_name: db_name.to_owned(), + username: username.to_owned(), + password: password.to_owned(), }) }