Skip to content

Commit

Permalink
feat(sink): support sink rate limit for external sink (#19660)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
Co-authored-by: MrCroxx <[email protected]>
  • Loading branch information
hzxa21 and MrCroxx authored Dec 23, 2024
1 parent eabb2d0 commit 4aab017
Show file tree
Hide file tree
Showing 27 changed files with 677 additions and 108 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ${EXTRA_ARGS:-} \
2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, ddl"
seq "$TEST_NUM" | parallel 'MADSIM_TEST_SEED={} ./risingwave_simulation \
seq "$TEST_NUM" | parallel --tmpdir .risingwave 'MADSIM_TEST_SEED={} ./risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ user server_encoding
user server_version
user server_version_num
user sink_decouple
user sink_rate_limit
user source_rate_limit
user standard_conforming_strings
user statement_timeout
Expand Down
49 changes: 49 additions & 0 deletions e2e_test/sink/rate_limit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
CREATE SINK s1 AS select * from t1 WITH (
connector = 'blackhole'
);

statement ok
SET SINK_RATE_LIMIT TO 100;

statement ok
SET SINK_RATE_LIMIT TO 0;

statement ok
SET SINK_RATE_LIMIT TO default;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = 1000;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = 0;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = default;

statement ok
create table t2 (v1 int primary key, v2 int);

statement ok
create sink s2 into t2 as select v1, v2 from t1;

statement error
ALTER SINK s2 SET SINK_RATE_LIMIT = 0;

statement ok
DROP SINK s2;

statement ok
DROP SINK s1;

statement ok
DROP TABLE t2;

statement ok
DROP TABLE t1;

statement ok
FLUSH;
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ enum ThrottleTarget {
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
TABLE_DML = 5;
SINK = 6;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ message SinkNode {
// A sink with a kv log store should have a table.
catalog.Table table = 2;
SinkLogStoreType log_store_type = 3;
optional uint32 rate_limit = 4;
}

message ProjectNode {
Expand Down
19 changes: 19 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,25 @@ impl StreamChunk {
data: self.data.with_visibility(vis),
}
}

// Derive the chunk permits based on the provided rate limit
pub fn compute_rate_limit_chunk_permits(&self, limit: usize) -> usize {
let chunk_size = self.capacity();
let ends_with_update = if chunk_size >= 2 {
// Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`.
// If something inconsistent happens in the stream, we may not have `U+` after this `U-`.
self.ops()[chunk_size - 2].is_update_delete()
} else {
false
};
if chunk_size == limit + 1 && ends_with_update {
// If the chunk size exceed limit because of the last `Update` operation,
// we should minus 1 to make sure the permits consumed is within the limit (max burst).
chunk_size - 1
} else {
chunk_size
}
}
}

impl Deref for StreamChunk {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
const DISABLE_DML_RATE_LIMIT: i32 = -1;
const DISABLE_SINK_RATE_LIMIT: i32 = -1;

/// Default to bypass cluster limits iff in debug mode.
const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
Expand Down Expand Up @@ -289,6 +290,12 @@ pub struct SessionConfig {
#[parameter(default = DISABLE_DML_RATE_LIMIT)]
dml_rate_limit: i32,

/// Set sink rate limit (rows per second) for each parallelism for external sink.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the sink.
#[parameter(default = DISABLE_SINK_RATE_LIMIT)]
sink_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
#[serde_as(as = "DisplayFromStr")]
Expand Down
6 changes: 6 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ csv = "1.3"
deltalake = { workspace = true }
duration-str = "0.11.2"
easy-ext = "1"
either = "1"
elasticsearch = { version = "8.15.0-alpha.1", features = ["rustls-tls"] }
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -60,6 +61,11 @@ google-cloud-bigquery = { version = "0.13.0", features = ["auth"] }
google-cloud-gax = "0.19.0"
google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] }
google-cloud-pubsub = "0.29"
governor = { version = "0.6", default-features = false, features = [
"std",
"dashmap",
"jitter",
] }
http = "0.2"
iceberg = { workspace = true }
iceberg-catalog-glue = { workspace = true }
Expand Down
Loading

0 comments on commit 4aab017

Please sign in to comment.