Skip to content

Commit

Permalink
perf(stream): optimize native pg sink (#19688) (#19925)
Browse files Browse the repository at this point in the history
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
github-actions[bot] and kwannoel authored Dec 25, 2024
1 parent 5b50b9a commit 194ee27
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 224 deletions.
12 changes: 11 additions & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,23 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)"

[tasks.psql]
category = "RiseDev - Start/Stop"
description = "Run local psql client with default connection parameters. You can pass extra arguments to psql."
description = "Run local psql client for RisingWave with default connection parameters. You can pass extra arguments to psql."
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@"
'''

[tasks.pgpsql]
category = "RiseDev - Start/Stop"
description = "Run local psql client for postgres with default connection parameters. You can pass extra arguments to psql."
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
source "${PREFIX_CONFIG}/risedev-env"
PGHOST=$PGHOST PGPORT=$PGPORT PGUSER=$PGUSER PGDATABASE=$PGDATABASE psql "$@"
'''

[tasks.ctl]
category = "RiseDev - Start/Stop"
description = "Start RiseCtl"
Expand Down
84 changes: 84 additions & 0 deletions e2e_test/sink/pg_native_vs_jdbc.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
control substitution on

statement ok
DROP SINK IF EXISTS pg_sink;

statement ok
DROP SINK IF EXISTS pg_sink_jdbc;

statement ok
DROP TABLE IF EXISTS datagen_source;

system ok
psql -c 'DROP TABLE IF EXISTS datagen_source;'

system ok
psql -c 'DROP TABLE IF EXISTS datagen_source_jdbc;'

system ok
psql -c 'CREATE TABLE datagen_source (id INT PRIMARY KEY, v1 varchar);'

system ok
psql -c 'CREATE TABLE datagen_source_jdbc (id INT PRIMARY KEY, v1 varchar);'

statement ok
CREATE TABLE datagen_source (id INT PRIMARY KEY, v1 varchar);

statement ok
INSERT INTO datagen_source SELECT key, 'asjdkk2kbdk2uk2ubek2' FROM generate_series(1, 2000000) t(key);

statement ok
flush;

statement ok
CREATE SINK pg_sink FROM datagen_source WITH (
connector='postgres',
host='$PGHOST',
port='$PGPORT',
user='$PGUSER',
password='$PGPASSWORD',
database='$PGDATABASE',
table='datagen_source',
type='upsert',
primary_key='id',
);

sleep 240s

system ok
psql --tuples-only -c 'select count(*) from datagen_source;'
----
2000000


statement ok
CREATE SINK pg_sink_jdbc FROM datagen_source WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://${PGHOST}:${PGPORT}/${PGDATABASE}?user=${PGUSER}&password=${PGPASSWORD}',
table.name='datagen_source_jdbc',
primary_key='id',
type='upsert'
);

sleep 240s

system ok
psql --tuples-only -c 'select count(*) from datagen_source_jdbc;'
----
2000000


statement ok
DROP SINK IF EXISTS pg_sink;

statement ok
DROP SINK IF EXISTS pg_sink_jdbc;

statement ok
DROP TABLE IF EXISTS datagen_source;

system ok
psql -c 'DROP TABLE IF EXISTS datagen_source;'

system ok
psql -c 'DROP TABLE IF EXISTS datagen_source_jdbc;'
Loading

0 comments on commit 194ee27

Please sign in to comment.