diff --git a/Cargo.lock b/Cargo.lock index 9b9fb3994f247..5219a09983d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12273,6 +12273,7 @@ dependencies = [ "tokio-stream 0.1.15", "tracing", "tracing-test", + "url", "workspace-hack", ] diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index ad58d2b96d1fb..5a43834361761 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -25,15 +25,30 @@ shift $((OPTIND -1)) download_and_prepare_rw "$profile" source +prepare_pg() { + # set up PG sink destination + export PGPASSWORD=postgres + psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" || true + dropdb -h db -U postgres test || true + createdb -h db -U postgres test + psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" + psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" + psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql +} + # Change process number limit echo "--- os limits" ulimit -a -echo "--- Download connector node package" +echo "--- download connector node package" buildkite-agent artifact download risingwave-connector.tar.gz ./ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node +echo "--- download pg dependencies" +apt-get -y install postgresql-client jq + +echo "--- prepare mysql" # prepare environment mysql sink mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;" # grant access to `test` for ci test user @@ -42,15 +57,20 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql echo "--- preparing postgresql" +prepare_pg -# set up PG sink destination -apt-get -y install postgresql-client jq -export PGPASSWORD=postgres -psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" -createdb -h db -U postgres test -psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" -psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);" -psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql +echo "--- starting risingwave cluster: ci-1cn-1fe-switch-to-pg-native" +risedev ci-start ci-1cn-1fe-jdbc-to-native + +echo "--- test sink: jdbc:postgres switch to postgres native" +# check sink destination postgres +sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt' +sleep 1 +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sleep 1 + +echo "--- killing risingwave cluster: ci-1cn-1fe-switch-to-pg-native" +risedev ci-kill echo "--- starting risingwave cluster" risedev ci-start ci-1cn-1fe @@ -64,6 +84,9 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/file_sink.slt' sleep 1 +echo "--- preparing postgresql" +prepare_pg + echo "--- testing remote sinks" # check sink destination postgres diff --git a/risedev.yml b/risedev.yml index e2f3db84ada6a..b8fa94c1faaa6 100644 --- a/risedev.yml +++ b/risedev.yml @@ -580,6 +580,18 @@ profile: - use: frontend - use: compactor + ci-1cn-1fe-jdbc-to-native: + config-path: src/config/ci-jdbc-to-native.toml + steps: + - use: minio + - use: sqlite + - use: meta-node + meta-backend: sqlite + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + ci-3cn-1fe: config-path: src/config/ci.toml steps: diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 1da7efdf008a0..154ac0c260fb6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1145,6 +1145,11 @@ pub struct StreamingDeveloperConfig { /// even if session variable set. /// If true, it's decided by session variable `streaming_use_shared_source` (default true) pub enable_shared_source: bool, + + #[serde(default = "default::developer::switch_jdbc_pg_to_native")] + /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..." + /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks. + pub switch_jdbc_pg_to_native: bool, } /// The subsections `[batch.developer]`. @@ -2104,6 +2109,10 @@ pub mod default { pub fn stream_enable_auto_schema_change() -> bool { true } + + pub fn switch_jdbc_pg_to_native() -> bool { + false + } } pub use crate::system_param::default as system; diff --git a/src/config/ci-jdbc-to-native.toml b/src/config/ci-jdbc-to-native.toml new file mode 100644 index 0000000000000..368d2f0760b4c --- /dev/null +++ b/src/config/ci-jdbc-to-native.toml @@ -0,0 +1,30 @@ +[meta] +disable_recovery = true +max_heartbeat_interval_secs = 60 + +[meta.compaction_config] +level0_tier_compact_file_number = 6 +level0_overlapping_sub_level_compact_level_count = 3 +level0_max_compact_file_number = 96 + +[streaming] +in_flight_barrier_nums = 10 + +[streaming.developer] +stream_exchange_concurrent_barriers = 10 +switch_jdbc_pg_to_native = true + +[storage] +imm_merge_threshold = 2 +max_cached_recent_versions_number = 200 + +[storage.object_store.retry] +streaming_upload_attempt_timeout_ms = 10000 +upload_retry_attempts = 5 +read_attempt_timeout_ms = 16000 +read_retry_attempts = 6 + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 5 +max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file diff --git a/src/config/example.toml b/src/config/example.toml index 2227f68bb136b..18eeca662113c 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -146,6 +146,7 @@ stream_enable_actor_tokio_metrics = false stream_exchange_connection_pool_size = 1 stream_enable_auto_schema_change = true stream_enable_shared_source = true +stream_switch_jdbc_pg_to_native = false [storage] share_buffers_sync_parallelism = 1 diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 3c30e0fccf6d7..8a0061ab99ea9 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -83,6 +83,7 @@ tokio-retry = "0.3" tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" +url = "2" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 48d58b74f908a..74cedfaee83dc 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use anyhow::anyhow; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnCatalog, Schema}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; @@ -28,6 +29,7 @@ use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; +use url::Url; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -158,13 +160,40 @@ impl ExecutorBuilder for SinkExecutorBuilder { .map(ColumnCatalog::from) .collect_vec(); + let mut properties_with_secret = + LocalSecretManager::global().fill_secrets(properties, secret_refs)?; + + if params.env.config().developer.switch_jdbc_pg_to_native + && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY) + && connector_type == "jdbc" + && let Some(url) = properties_with_secret.get("jdbc.url") + && url.starts_with("jdbc:postgresql:") + { + let jdbc_url = parse_jdbc_url(url) + .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?; + properties_with_secret.insert("host".to_owned(), jdbc_url.host); + properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string()); + properties_with_secret.insert("database".to_owned(), jdbc_url.db_name); + properties_with_secret.insert("user".to_owned(), jdbc_url.username); + properties_with_secret.insert("password".to_owned(), jdbc_url.password); + if let Some(table_name) = properties_with_secret.get("table.name") { + properties_with_secret.insert("table".to_owned(), table_name.clone()); + } + if let Some(schema_name) = properties_with_secret.get("schema.name") { + properties_with_secret.insert("schema".to_owned(), schema_name.clone()); + } + // TODO(kwannoel): Do we need to handle jdbc.query.timeout? + } + let connector = { - let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - StreamExecutorError::from(( - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)), - sink_id.sink_id, - )) - })?; + let sink_type = properties_with_secret + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| { + StreamExecutorError::from(( + SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)), + sink_id.sink_id, + )) + })?; match_sink_name_str!( sink_type.to_lowercase().as_str(), @@ -185,7 +214,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { .try_into() .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, ), - None => match sink_desc.properties.get(SINK_TYPE_OPTION) { + None => match properties_with_secret.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` Some(t) => SinkFormatDesc::from_legacy_type(connector, t) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, @@ -194,9 +223,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { }, }; - let properties_with_secret = - LocalSecretManager::global().fill_secrets(properties, secret_refs)?; - let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?; @@ -296,3 +322,69 @@ impl ExecutorBuilder for SinkExecutorBuilder { Ok((params.info, exec).into()) } } + +struct JdbcUrl { + host: String, + port: u16, + db_name: String, + username: String, + password: String, +} + +fn parse_jdbc_url(url: &str) -> anyhow::Result { + if !url.starts_with("jdbc:postgresql") { + bail!("invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://...") + } + + // trim the "jdbc:" prefix to make it a valid url + let url = url.replace("jdbc:", ""); + + // parse the url + let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?; + + let scheme = url.scheme(); + assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql"); + let host = url + .host_str() + .ok_or_else(|| anyhow!("missing host in jdbc url"))?; + let port = url + .port() + .ok_or_else(|| anyhow!("missing port in jdbc url"))?; + let db_name = url.path(); + let mut username = None; + let mut password = None; + for (key, value) in url.query_pairs() { + if key == "user" { + username = Some(value.to_string()); + } + if key == "password" { + password = Some(value.to_string()); + } + } + let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?; + let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; + + Ok(JdbcUrl { + host: host.to_owned(), + port, + db_name: db_name.to_owned(), + username: username.to_owned(), + password: password.to_owned(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_jdbc_url() { + let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres"; + let jdbc_url = parse_jdbc_url(url).unwrap(); + assert_eq!(jdbc_url.host, "localhost"); + assert_eq!(jdbc_url.port, 5432); + assert_eq!(jdbc_url.db_name, "/test"); + assert_eq!(jdbc_url.username, "postgres"); + assert_eq!(jdbc_url.password, "postgres"); + } +}