From 4aab01730c3b3c24944a2833e63179b6d208fa61 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Mon, 23 Dec 2024 15:18:20 +0800 Subject: [PATCH] feat(sink): support sink rate limit for external sink (#19660) Signed-off-by: MrCroxx Co-authored-by: MrCroxx --- Cargo.lock | 2 + ci/scripts/deterministic-recovery-test.sh | 2 +- e2e_test/batch/catalog/pg_settings.slt.part | 1 + e2e_test/sink/rate_limit.slt | 49 ++++ proto/meta.proto | 1 + proto/stream_plan.proto | 1 + src/common/src/array/stream_chunk.rs | 19 ++ src/common/src/session_config/mod.rs | 7 + src/connector/Cargo.toml | 6 + src/connector/src/sink/log_store.rs | 259 ++++++++++++++++++ .../src/handler/alter_streaming_rate_limit.rs | 15 +- src/frontend/src/handler/mod.rs | 12 + .../src/optimizer/plan_node/stream_sink.rs | 1 + src/frontend/src/utils/overwrite_options.rs | 16 ++ src/meta/service/src/stream_service.rs | 7 +- src/meta/src/controller/streaming_job.rs | 171 +++++++----- src/meta/src/manager/metadata.rs | 17 +- src/prost/src/lib.rs | 9 +- src/sqlparser/src/ast/ddl.rs | 6 + src/sqlparser/src/parser.rs | 24 ++ src/stream/src/executor/dml.rs | 5 +- src/stream/src/executor/sink.rs | 37 ++- src/stream/src/executor/source/mod.rs | 7 +- src/stream/src/executor/utils.rs | 18 -- src/stream/src/from_proto/sink.rs | 2 + .../tests/integration_tests/sink/mod.rs | 2 + .../integration_tests/sink/rate_limit.rs | 89 ++++++ 27 files changed, 677 insertions(+), 108 deletions(-) create mode 100644 e2e_test/sink/rate_limit.slt create mode 100644 src/tests/simulation/tests/integration_tests/sink/rate_limit.rs diff --git a/Cargo.lock b/Cargo.lock index f11864048ceb3..d2cb5a29b3284 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11076,6 +11076,7 @@ dependencies = [ "deltalake", "duration-str", "easy-ext", + "either", "elasticsearch", "enum-as-inner 0.6.0", "expect-test", @@ -11088,6 +11089,7 @@ dependencies = [ "google-cloud-gax", "google-cloud-googleapis", "google-cloud-pubsub", + "governor", "http 0.2.9", "iceberg", "iceberg-catalog-glue", diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index 82641b66afc58..165712d225971 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -62,7 +62,7 @@ ${EXTRA_ARGS:-} \ 2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, ddl" -seq "$TEST_NUM" | parallel 'MADSIM_TEST_SEED={} ./risingwave_simulation \ +seq "$TEST_NUM" | parallel --tmpdir .risingwave 'MADSIM_TEST_SEED={} ./risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ --background-ddl-rate=${BACKGROUND_DDL_RATE} \ diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 402f17a6d116b..cfc106d2acdc5 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -52,6 +52,7 @@ user server_encoding user server_version user server_version_num user sink_decouple +user sink_rate_limit user source_rate_limit user standard_conforming_strings user statement_timeout diff --git a/e2e_test/sink/rate_limit.slt b/e2e_test/sink/rate_limit.slt new file mode 100644 index 0000000000000..a9f7c7cd9c302 --- /dev/null +++ b/e2e_test/sink/rate_limit.slt @@ -0,0 +1,49 @@ +statement ok +CREATE TABLE t1(v1 int, v2 int); + +statement ok +CREATE SINK s1 AS select * from t1 WITH ( + connector = 'blackhole' +); + +statement ok +SET SINK_RATE_LIMIT TO 100; + +statement ok +SET SINK_RATE_LIMIT TO 0; + +statement ok +SET SINK_RATE_LIMIT TO default; + +statement ok +ALTER SINK s1 SET SINK_RATE_LIMIT = 1000; + +statement ok +ALTER SINK s1 SET SINK_RATE_LIMIT = 0; + +statement ok +ALTER SINK s1 SET SINK_RATE_LIMIT = default; + +statement ok +create table t2 (v1 int primary key, v2 int); + +statement ok +create sink s2 into t2 as select v1, v2 from t1; + +statement error +ALTER SINK s2 SET SINK_RATE_LIMIT = 0; + +statement ok +DROP SINK s2; + +statement ok +DROP SINK s1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t1; + +statement ok +FLUSH; \ No newline at end of file diff --git a/proto/meta.proto b/proto/meta.proto index adf97e6352d4f..77e9c486a120b 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -310,6 +310,7 @@ enum ThrottleTarget { TABLE_WITH_SOURCE = 3; CDC_TABLE = 4; TABLE_DML = 5; + SINK = 6; } message ApplyThrottleRequest { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index de5dec126fda4..8aabadfd2b74d 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -279,6 +279,7 @@ message SinkNode { // A sink with a kv log store should have a table. catalog.Table table = 2; SinkLogStoreType log_store_type = 3; + optional uint32 rate_limit = 4; } message ProjectNode { diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 307283726c340..6955b1c761137 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -359,6 +359,25 @@ impl StreamChunk { data: self.data.with_visibility(vis), } } + + // Derive the chunk permits based on the provided rate limit + pub fn compute_rate_limit_chunk_permits(&self, limit: usize) -> usize { + let chunk_size = self.capacity(); + let ends_with_update = if chunk_size >= 2 { + // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. + // If something inconsistent happens in the stream, we may not have `U+` after this `U-`. + self.ops()[chunk_size - 2].is_update_delete() + } else { + false + }; + if chunk_size == limit + 1 && ends_with_update { + // If the chunk size exceed limit because of the last `Update` operation, + // we should minus 1 to make sure the permits consumed is within the limit (max burst). + chunk_size - 1 + } else { + chunk_size + } + } } impl Deref for StreamChunk { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index e5bc658d4236b..9b216f084cf41 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -58,6 +58,7 @@ type SessionConfigResult = std::result::Result; const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1; const DISABLE_SOURCE_RATE_LIMIT: i32 = -1; const DISABLE_DML_RATE_LIMIT: i32 = -1; +const DISABLE_SINK_RATE_LIMIT: i32 = -1; /// Default to bypass cluster limits iff in debug mode. const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions); @@ -289,6 +290,12 @@ pub struct SessionConfig { #[parameter(default = DISABLE_DML_RATE_LIMIT)] dml_rate_limit: i32, + /// Set sink rate limit (rows per second) for each parallelism for external sink. + /// If set to -1, disable rate limit. + /// If set to 0, this pauses the sink. + #[parameter(default = DISABLE_SINK_RATE_LIMIT)] + sink_rate_limit: i32, + /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`". #[serde_as(as = "DisplayFromStr")] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 5341d4bb40b77..3f12926c9edac 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -50,6 +50,7 @@ csv = "1.3" deltalake = { workspace = true } duration-str = "0.11.2" easy-ext = "1" +either = "1" elasticsearch = { version = "8.15.0-alpha.1", features = ["rustls-tls"] } enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } @@ -60,6 +61,11 @@ google-cloud-bigquery = { version = "0.13.0", features = ["auth"] } google-cloud-gax = "0.19.0" google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] } google-cloud-pubsub = "0.29" +governor = { version = "0.6", default-features = false, features = [ + "std", + "dashmap", + "jitter", +] } http = "0.2" iceberg = { workspace = true } iceberg-catalog-glue = { workspace = true } diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index f8e6435259c96..1333cd2c8b9ca 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -16,18 +16,27 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::fmt::Debug; use std::future::{poll_fn, Future}; +use std::num::NonZeroU32; +use std::pin::pin; use std::sync::Arc; use std::task::Poll; use std::time::Instant; use await_tree::InstrumentAwait; +use either::Either; use futures::{TryFuture, TryFutureExt}; +use governor::clock::MonotonicClock; +use governor::middleware::NoOpMiddleware; +use governor::state::{InMemoryState, NotKeyed}; +use governor::{Quota, RateLimiter}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge}; use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH}; use risingwave_common_estimate_size::EstimateSize; +use tokio::select; +use tokio::sync::mpsc::UnboundedReceiver; pub type LogStoreResult = Result; pub type ChunkId = usize; @@ -324,6 +333,252 @@ impl LogReader for MonitoredLogReader { } } +type UpstreamChunkOffset = TruncateOffset; +type DownstreamChunkOffset = TruncateOffset; + +struct SplitChunk { + chunk: StreamChunk, + upstream_chunk_offset: UpstreamChunkOffset, + // Indicate whether this is the last split chunk from the same `upstream_chunk_offset` + is_last: bool, +} + +struct RateLimitedLogReaderCore { + inner: R, + // Newer items at the front + consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>, + // Newer items at the front + unconsumed_chunk_queue: VecDeque, + next_chunk_id: usize, +} + +impl RateLimitedLogReaderCore { + // Returns: Left - chunk, Right - Barrier/UpdateVnodeBitmap + async fn next_item(&mut self) -> LogStoreResult> { + // Get upstream chunk from unconsumed_chunk_queue first. + // If unconsumed_chunk_queue is empty, get the chunk from the inner log reader. + match self.unconsumed_chunk_queue.pop_back() { + Some(split_chunk) => Ok(Either::Left(split_chunk)), + None => { + let (epoch, item) = self.inner.next_item().await?; + match item { + LogStoreReadItem::StreamChunk { chunk, chunk_id } => { + Ok(Either::Left(SplitChunk { + chunk, + upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id }, + is_last: true, + })) + } + LogStoreReadItem::Barrier { .. } => { + self.consumed_offset_queue.push_front(( + TruncateOffset::Barrier { epoch }, + TruncateOffset::Barrier { epoch }, + )); + self.next_chunk_id = 0; + Ok(Either::Right((epoch, item))) + } + LogStoreReadItem::UpdateVnodeBitmap(_) => Ok(Either::Right((epoch, item))), + } + } + } + } +} + +pub struct RateLimitedLogReader { + core: RateLimitedLogReaderCore, + // None: unlimited. 0: paused. + rate_limit: Option, + rate_limiter: + Option>>, + control_rx: UnboundedReceiver>, +} + +impl RateLimitedLogReader { + pub fn new(inner: R, control_rx: UnboundedReceiver>) -> Self { + Self { + core: RateLimitedLogReaderCore { + inner, + consumed_offset_queue: VecDeque::new(), + unconsumed_chunk_queue: VecDeque::new(), + next_chunk_id: 0, + }, + rate_limit: None, + rate_limiter: None, + control_rx, + } + } +} + +impl RateLimitedLogReader { + // Returns old limit + fn update_rate_limit(&mut self, rate_limit: Option) -> Option { + let prev = self.rate_limit; + self.rate_limit = rate_limit; + if let Some(limit) = rate_limit { + if limit == 0 { + self.rate_limiter = None; + return prev; + } + let quota = Quota::per_second(NonZeroU32::new(limit).unwrap()); + let clock = MonotonicClock; + let limiter = RateLimiter::direct_with_clock(quota, &clock); + self.rate_limiter = Some(limiter); + } else { + self.rate_limiter = None; + } + prev + } + + async fn apply_rate_limit( + &mut self, + split_chunk: SplitChunk, + ) -> LogStoreResult<(u64, LogStoreReadItem)> { + // Apply rate limit. If the chunk is too large, split it into smaller chunks. + let split_chunk = if let Some(limiter) = self.rate_limiter.as_mut() { + // Rate limit is set and not paused + let limit = self.rate_limit.unwrap(); + assert!(limit > 0); + + let required_permits: usize = split_chunk + .chunk + .compute_rate_limit_chunk_permits(limit as _); + if required_permits <= limit as _ { + let n = NonZeroU32::new(required_permits as u32).unwrap(); + // `InsufficientCapacity` should never happen because we have check the cardinality + limiter.until_n_ready(n).await.unwrap(); + split_chunk + } else { + // Cut the chunk into smaller chunks + let mut chunks = split_chunk.chunk.split(limit as _).into_iter(); + let mut is_last = split_chunk.is_last; + let upstream_chunk_offset = split_chunk.upstream_chunk_offset; + + // The first chunk after splitting will be returned + let first_chunk = chunks.next().unwrap(); + + // The remaining chunks will be pushed to the queue + for chunk in chunks.rev() { + // The last chunk after splitting inherits the `is_last` from the original chunk + self.core.unconsumed_chunk_queue.push_back(SplitChunk { + chunk, + upstream_chunk_offset, + is_last, + }); + is_last = false; + } + + // Trigger rate limit and return the first chunk + let n = NonZeroU32::new( + first_chunk.compute_rate_limit_chunk_permits(limit as _) as u32 + ) + .unwrap(); + // chunks split should have effective chunk size <= limit + limiter.until_n_ready(n).await.unwrap(); + SplitChunk { + chunk: first_chunk, + upstream_chunk_offset, + is_last, + } + } + } else { + // Rate limit is not set + assert_eq!(self.rate_limit, None); + split_chunk + }; + + // Update the consumed_offset_queue if the `split_chunk` is the last chunk of the upstream chunk + let epoch = split_chunk.upstream_chunk_offset.epoch(); + let downstream_chunk_id = self.core.next_chunk_id; + self.core.next_chunk_id += 1; + if split_chunk.is_last { + self.core.consumed_offset_queue.push_front(( + TruncateOffset::Chunk { + epoch, + chunk_id: downstream_chunk_id, + }, + split_chunk.upstream_chunk_offset, + )); + } + + Ok(( + epoch, + LogStoreReadItem::StreamChunk { + chunk: split_chunk.chunk, + chunk_id: downstream_chunk_id, + }, + )) + } +} + +impl LogReader for RateLimitedLogReader { + async fn init(&mut self) -> LogStoreResult<()> { + self.core.inner.init().await + } + + async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { + let mut paused = false; + loop { + select! { + biased; + rate_limit_change = pin!(self.control_rx.recv()) => { + if let Some(new_rate_limit) = rate_limit_change { + let prev = self.update_rate_limit(new_rate_limit); + paused = self.rate_limit == Some(0); + tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", prev, self.rate_limit); + } else { + bail!("rate limit control channel closed"); + } + }, + item = self.core.next_item(), if !paused => { + let item = item?; + match item { + Either::Left(split_chunk) => { + return self.apply_rate_limit(split_chunk).await; + }, + Either::Right(item) => { + assert!(matches!(item.1, LogStoreReadItem::Barrier{..} | LogStoreReadItem::UpdateVnodeBitmap(_))); + return Ok(item); + }, + } + } + } + } + } + + fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> { + let mut truncate_offset = None; + while let Some((downstream_offset, upstream_offset)) = + self.core.consumed_offset_queue.back() + { + if *downstream_offset <= offset { + truncate_offset = Some(*upstream_offset); + self.core.consumed_offset_queue.pop_back(); + } else { + break; + } + } + tracing::trace!( + "rate limited log store reader truncate offset {:?}, downstream offset {:?}", + truncate_offset, + offset + ); + if let Some(offset) = truncate_offset { + self.core.inner.truncate(offset) + } else { + Ok(()) + } + } + + fn rewind( + &mut self, + ) -> impl Future)>> + Send + '_ { + self.core.unconsumed_chunk_queue.clear(); + self.core.consumed_offset_queue.clear(); + self.core.next_chunk_id = 0; + self.core.inner.rewind() + } +} + #[easy_ext::ext(LogReaderExt)] impl T where @@ -344,6 +599,10 @@ where wait_new_future_duration, ) } + + pub fn rate_limited(self, control_rx: UnboundedReceiver>) -> impl LogReader { + RateLimitedLogReader::new(self, control_rx) + } } pub struct MonitoredLogWriter { diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index d41104f87d488..c089b31e92f2c 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -90,10 +90,7 @@ pub async fn handle_alter_streaming_rate_limit( let (table, schema_name) = reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?; if table.table_type != TableType::Table { - return Err(ErrorCode::InvalidInputSyntax(format!( - "\"{table_name}\" is not a table", - )) - .into()); + return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into()); } session.check_privilege_for_drop_alter(schema_name, &**table)?; (StatementType::ALTER_TABLE, table.id.table_id) @@ -108,6 +105,16 @@ pub async fn handle_alter_streaming_rate_limit( session.check_privilege_for_drop_alter(schema_name, &**table)?; (StatementType::ALTER_TABLE, table.id.table_id) } + PbThrottleTarget::Sink => { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_sink_by_name(db_name, schema_path, &real_table_name)?; + if table.target_table.is_some() { + bail!("ALTER SINK_RATE_LIMIT is not for sink into table") + } + session.check_privilege_for_drop_alter(schema_name, &**table)?; + (StatementType::ALTER_SINK, table.id.sink_id) + } _ => bail!("Unsupported throttle target: {:?}", kind), }; diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index b48be779df40b..f67d103d6e782 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -955,6 +955,18 @@ pub async fn handle( ) .await } + Statement::AlterSink { + name, + operation: AlterSinkOperation::SetSinkRateLimit { rate_limit }, + } => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::Sink, + name, + rate_limit, + ) + .await + } Statement::AlterSubscription { name, operation: AlterSubscriptionOperation::RenameSubscription { subscription_name }, diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 61572fe179005..d9e0d116a61e5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -602,6 +602,7 @@ impl StreamNode for StreamSink { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), log_store_type: self.log_store_type as i32, + rate_limit: self.base.ctx().overwrite_options().sink_rate_limit, }) } } diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index b6ffab6a71ef5..a24f11fd94029 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -19,11 +19,13 @@ pub struct OverwriteOptions { pub source_rate_limit: Option, pub backfill_rate_limit: Option, pub dml_rate_limit: Option, + pub sink_rate_limit: Option, } impl OverwriteOptions { pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit"; pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit"; + pub(crate) const SINK_RATE_LIMIT_KEY: &'static str = "sink_rate_limit"; pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit"; pub fn new(args: &mut HandlerArgs) -> Self { @@ -66,10 +68,24 @@ impl OverwriteOptions { } } }; + let sink_rate_limit = { + if let Some(x) = args.with_options.remove(Self::SINK_RATE_LIMIT_KEY) { + // FIXME(tabVersion): validate the value + Some(x.parse::().unwrap()) + } else { + let rate_limit = args.session.config().sink_rate_limit(); + if rate_limit < 0 { + None + } else { + Some(rate_limit as u32) + } + } + }; Self { source_rate_limit, backfill_rate_limit, dml_rate_limit, + sink_rate_limit, } } } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 748c118b5a611..495451590b01b 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -23,7 +23,7 @@ use risingwave_meta::manager::MetadataManager; use risingwave_meta::model; use risingwave_meta::model::ActorId; use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig}; -use risingwave_meta_model::{ObjectId, SourceId, StreamingParallelism}; +use risingwave_meta_model::{ObjectId, SinkId, SourceId, StreamingParallelism}; use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; use risingwave_pb::meta::list_actor_splits_response::FragmentType; use risingwave_pb::meta::list_table_fragments_response::{ @@ -131,6 +131,11 @@ impl StreamManagerService for StreamServiceImpl { .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } + ThrottleTarget::Sink => { + self.metadata_manager + .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate) + .await? + } ThrottleTarget::Unspecified => { return Err(Status::invalid_argument("unspecified throttle target")) } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 046eb530f1399..c3c5ff4aa248a 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -52,7 +52,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ - PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, + PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, PbStreamNode, }; use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; @@ -1422,12 +1422,15 @@ impl CatalogController { Ok(fragment_actors) } - // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments + // edit the content of fragments in given `table_id` // return the actor_ids to be applied - pub async fn update_backfill_rate_limit_by_job_id( + async fn mutate_fragments_by_job_id( &self, job_id: ObjectId, - rate_limit: Option, + // returns true if the mutation is applied + mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool, + // error message when no relevant fragments is found + err_msg: &'static str, ) -> MetaResult>> { let inner = self.inner.read().await; let txn = inner.db.begin().await?; @@ -1449,32 +1452,15 @@ impl CatalogController { .collect_vec(); fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { - let mut found = false; - if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 { - visit_stream_node(stream_node, |node| match node { - PbNodeBody::StreamCdcScan(node) => { - node.rate_limit = rate_limit; - found = true; - } - PbNodeBody::StreamScan(node) => { - node.rate_limit = rate_limit; - found = true; - } - PbNodeBody::SourceBackfill(node) => { - node.rate_limit = rate_limit; - found = true; - } - _ => {} - }); - } - found + fragments_mutation_fn(fragment_type_mask, stream_node) }); - if fragments.is_empty() { return Err(MetaError::invalid_parameter(format!( - "stream scan node or source node not found in job id {job_id}" + "job id {job_id}: {}", + err_msg ))); } + let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); for (id, _, stream_node) in fragments { fragment::ActiveModel { @@ -1492,63 +1478,94 @@ impl CatalogController { Ok(fragment_actors) } - pub async fn update_dml_rate_limit_by_job_id( + // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments + // return the actor_ids to be applied + pub async fn update_backfill_rate_limit_by_job_id( &self, job_id: ObjectId, rate_limit: Option, ) -> MetaResult>> { - let inner = self.inner.read().await; - let txn = inner.db.begin().await?; + let update_backfill_rate_limit = + |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 { + visit_stream_node(stream_node, |node| match node { + PbNodeBody::StreamCdcScan(node) => { + node.rate_limit = rate_limit; + found = true; + } + PbNodeBody::StreamScan(node) => { + node.rate_limit = rate_limit; + found = true; + } + PbNodeBody::SourceBackfill(node) => { + node.rate_limit = rate_limit; + found = true; + } + PbNodeBody::Sink(node) => { + node.rate_limit = rate_limit; + found = true; + } + _ => {} + }); + } + found + }; - let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() - .select_only() - .columns([ - fragment::Column::FragmentId, - fragment::Column::FragmentTypeMask, - fragment::Column::StreamNode, - ]) - .filter(fragment::Column::JobId.eq(job_id)) - .into_tuple() - .all(&txn) - .await?; - let mut fragments = fragments - .into_iter() - .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) - .collect_vec(); + self.mutate_fragments_by_job_id( + job_id, + update_backfill_rate_limit, + "stream scan node or source node not found", + ) + .await + } - fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { - let mut found = false; - if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 { - visit_stream_node(stream_node, |node| { - if let PbNodeBody::Dml(node) = node { - node.rate_limit = rate_limit; - found = true; - } - }); - } - found - }); + // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments + // return the actor_ids to be applied + pub async fn update_sink_rate_limit_by_job_id( + &self, + job_id: ObjectId, + rate_limit: Option, + ) -> MetaResult>> { + let update_sink_rate_limit = + |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 { + visit_stream_node(stream_node, |node| { + if let PbNodeBody::Sink(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + found + }; - if fragments.is_empty() { - return Err(MetaError::invalid_parameter(format!( - "dml node not found in job id {job_id}" - ))); - } - let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); - for (id, _, stream_node) in fragments { - fragment::ActiveModel { - fragment_id: Set(id), - stream_node: Set(StreamNode::from(&stream_node)), - ..Default::default() - } - .update(&txn) - .await?; - } - let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?; + self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found") + .await + } - txn.commit().await?; + pub async fn update_dml_rate_limit_by_job_id( + &self, + job_id: ObjectId, + rate_limit: Option, + ) -> MetaResult>> { + let update_dml_rate_limit = + |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 { + visit_stream_node(stream_node, |node| { + if let PbNodeBody::Dml(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + found + }; - Ok(fragment_actors) + self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found") + .await } pub async fn post_apply_reschedules( @@ -1952,6 +1969,14 @@ impl CatalogController { rate_limit = node.rate_limit; node_name = Some("STREAM_CDC_SCAN"); } + PbNodeBody::Sink(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("SINK"); + } _ => {} } }); diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index c31a47a17d4df..231db4f93c0d3 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -20,7 +20,7 @@ use std::time::Duration; use anyhow::anyhow; use futures::future::{select, Either}; use risingwave_common::catalog::{DatabaseId, TableId, TableOption}; -use risingwave_meta_model::{ObjectId, SourceId, WorkerId}; +use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId}; use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; @@ -655,6 +655,21 @@ impl MetadataManager { .collect()) } + pub async fn update_sink_rate_limit_by_sink_id( + &self, + sink_id: SinkId, + rate_limit: Option, + ) -> MetaResult>> { + let fragment_actors = self + .catalog_controller + .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit) + .await?; + Ok(fragment_actors + .into_iter() + .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect())) + .collect()) + } + pub async fn update_dml_rate_limit_by_table_id( &self, table_id: TableId, diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 4c57d4f0e472e..8fa584522354d 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -344,9 +344,16 @@ impl stream_plan::FragmentTypeFlag { stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32 } + /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`. + pub fn sink_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::Sink as i32 + } + /// Note: this doesn't include `FsFetch` created in old versions. pub fn rate_limit_fragments() -> i32 { - Self::backfill_rate_limit_fragments() | Self::source_rate_limit_fragments() + Self::backfill_rate_limit_fragments() + | Self::source_rate_limit_fragments() + | Self::sink_rate_limit_fragments() } pub fn dml_rate_limit_fragments() -> i32 { diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index a17dc9297089b..6b66b53be8ffb 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -184,6 +184,9 @@ pub enum AlterSinkOperation { SwapRenameSink { target_sink: ObjectName, }, + SetSinkRateLimit { + rate_limit: i32, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -420,6 +423,9 @@ impl fmt::Display for AlterSinkOperation { AlterSinkOperation::SwapRenameSink { target_sink } => { write!(f, "SWAP WITH {}", target_sink) } + AlterSinkOperation::SetSinkRateLimit { rate_limit } => { + write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit) + } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index f9ee9f7275433..dda74a64b4369 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3430,6 +3430,28 @@ impl Parser<'_> { }) } + /// SINK_RATE_LIMIT = default | NUMBER + /// SINK_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_sink_rate_limit(&mut self) -> PResult> { + if !self.parse_word("SINK_RATE_LIMIT") { + return Ok(None); + } + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected("TO or = after ALTER SINK SET SINK_RATE_LIMIT"); + } + let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { + -1 + } else { + let s = self.parse_number_value()?; + if let Ok(n) = s.parse::() { + n + } else { + return self.expected("number or DEFAULT"); + } + }; + Ok(Some(rate_limit)) + } + pub fn parse_alter_sink(&mut self) -> PResult { let sink_name = self.parse_object_name()?; let operation = if self.parse_keyword(Keyword::RENAME) { @@ -3464,6 +3486,8 @@ impl Parser<'_> { parallelism: value, deferred, } + } else if let Some(rate_limit) = self.parse_alter_sink_rate_limit()? { + AlterSinkOperation::SetSinkRateLimit { rate_limit } } else { return self.expected("SCHEMA/PARALLELISM after SET"); } diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index d343db7698cef..b462b32e8a6b9 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -31,7 +31,6 @@ use tokio::sync::oneshot; use crate::executor::prelude::*; use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::utils::compute_rate_limit_chunk_permits; /// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific /// table. The two streams will be merged into one and then sent to downstream. @@ -400,7 +399,7 @@ async fn apply_dml_rate_limit( } let rate_limiter = guard.rate_limiter.as_ref().unwrap(); let max_permits = guard.row_per_second.unwrap() as usize; - let required_permits = compute_rate_limit_chunk_permits(&chunk, max_permits); + let required_permits = chunk.compute_rate_limit_chunk_permits(max_permits); if required_permits <= max_permits { let n = NonZeroU32::new(required_permits as u32).unwrap(); // `InsufficientCapacity` should never happen because we have check the cardinality. @@ -410,7 +409,7 @@ async fn apply_dml_rate_limit( // Split the chunk into smaller chunks. for small_chunk in chunk.split(max_permits) { let required_permits = - compute_rate_limit_chunk_permits(&small_chunk, max_permits); + small_chunk.compute_rate_limit_chunk_permits(max_permits); let n = NonZeroU32::new(required_permits as u32).unwrap(); // Smaller chunks should have effective chunk size <= max_permits. rate_limiter.until_n_ready(n).await.unwrap(); diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 55127fcba3278..73108bd5e64ba 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -34,10 +34,10 @@ use risingwave_connector::sink::{ build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, GLOBAL_SINK_METRICS, }; use thiserror_ext::AsReport; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::common::compact_chunk::{merge_chunk_row, StreamChunkCompactor}; use crate::executor::prelude::*; - pub struct SinkExecutor { actor_context: ActorContextRef, info: ExecutorInfo, @@ -52,6 +52,7 @@ pub struct SinkExecutor { need_advance_delete: bool, re_construct_with_sink_pk: bool, compact_chunk: bool, + rate_limit: Option, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -93,6 +94,7 @@ impl SinkExecutor { log_store_factory: F, chunk_size: usize, input_data_types: Vec, + rate_limit: Option, ) -> StreamExecutorResult { let sink = build_sink(sink_param.clone()) .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?; @@ -180,6 +182,7 @@ impl SinkExecutor { need_advance_delete, re_construct_with_sink_pk, compact_chunk, + rate_limit, }) } @@ -218,6 +221,7 @@ impl SinkExecutor { ); if self.sink.is_sink_into_table() { + // TODO(hzxa21): support rate limit? processed_input.boxed() } else { let labels = [ @@ -240,6 +244,10 @@ impl SinkExecutor { log_store_write_rows, }; + let (rate_limit_tx, rate_limit_rx) = unbounded_channel(); + // Init the rate limit + rate_limit_tx.send(self.rate_limit).unwrap(); + self.log_store_factory .build() .map(move |(log_reader, log_writer)| { @@ -247,6 +255,7 @@ impl SinkExecutor { processed_input, log_writer.monitored(log_writer_metrics), actor_id, + rate_limit_tx, ); let consume_log_stream_future = dispatch_sink!(self.sink, sink, { @@ -257,6 +266,7 @@ impl SinkExecutor { self.sink_param, self.sink_writer_param, self.actor_context, + rate_limit_rx, ) .instrument_await(format!("consume_log (sink_id {sink_id})")) .map_ok(|never| match never {}); // unify return type to `Message` @@ -276,6 +286,7 @@ impl SinkExecutor { input: impl MessageStream, mut log_writer: impl LogWriter, actor_id: ActorId, + rate_limit_tx: UnboundedSender>, ) { pin_mut!(input); let barrier = expect_first_barrier(&mut input).await?; @@ -315,6 +326,23 @@ impl SinkExecutor { log_writer.resume()?; is_paused = false; } + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) { + tracing::info!( + rate_limit = new_rate_limit, + "received sink rate limit on actor {actor_id}" + ); + if let Err(e) = rate_limit_tx.send(*new_rate_limit) { + error!( + error = %e.as_report(), + "fail to send sink ate limit update" + ); + return Err(StreamExecutorError::from( + e.to_report_string(), + )); + } + } + } _ => (), } } @@ -464,6 +492,7 @@ impl SinkExecutor { sink_param: SinkParam, mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, + rate_limit_rx: UnboundedReceiver>, ) -> StreamExecutorResult { let visible_columns = columns .iter() @@ -506,7 +535,8 @@ impl SinkExecutor { chunk } }) - .monitored(metrics); + .monitored(metrics) + .rate_limited(rate_limit_rx); log_reader.init().await?; @@ -664,6 +694,7 @@ mod test { BoundedInMemLogStoreFactory::new(1), 1024, vec![DataType::Int32, DataType::Int32, DataType::Int32], + None, ) .await .unwrap(); @@ -793,6 +824,7 @@ mod test { BoundedInMemLogStoreFactory::new(1), 1024, vec![DataType::Int64, DataType::Int64, DataType::Int64], + None, ) .await .unwrap(); @@ -895,6 +927,7 @@ mod test { BoundedInMemLogStoreFactory::new(1), 1024, vec![DataType::Int64, DataType::Int64], + None, ) .await .unwrap(); diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 7c22db7f52233..dcbf0a12bdd1e 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -53,7 +53,6 @@ use tokio::sync::mpsc::UnboundedReceiver; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use crate::executor::error::StreamExecutorError; -use crate::executor::utils::compute_rate_limit_chunk_permits; use crate::executor::{Barrier, Message}; /// Receive barriers from barrier manager with the channel, error on channel close. @@ -150,7 +149,7 @@ pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Opti let limiter = limiter.as_ref().unwrap(); let limit = rate_limit_rps.unwrap() as usize; - let required_permits = compute_rate_limit_chunk_permits(&chunk, limit); + let required_permits: usize = chunk.compute_rate_limit_chunk_permits(limit); if required_permits <= limit { let n = NonZeroU32::new(required_permits as u32).unwrap(); // `InsufficientCapacity` should never happen because we have check the cardinality @@ -159,8 +158,8 @@ pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Opti } else { // Cut the chunk into smaller chunks for chunk in chunk.split(limit) { - let n = NonZeroU32::new(compute_rate_limit_chunk_permits(&chunk, limit) as u32) - .unwrap(); + let n = + NonZeroU32::new(chunk.compute_rate_limit_chunk_permits(limit) as u32).unwrap(); // chunks split should have effective chunk size <= limit limiter.until_n_ready(n).await.unwrap(); yield chunk; diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index ad2e1b8a4268c..37ed7803a44c7 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -22,21 +22,3 @@ impl Execute for DummyExecutor { futures::stream::pending().boxed() } } - -pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { - let chunk_size = chunk.capacity(); - let ends_with_update = if chunk_size >= 2 { - // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. - // If something inconsistent happens in the stream, we may not have `U+` after this `U-`. - chunk.ops()[chunk_size - 2].is_update_delete() - } else { - false - }; - if chunk_size == limit + 1 && ends_with_update { - // If the chunk size exceed limit because of the last `Update` operation, - // we should minus 1 to make sure the permits consumed is within the limit (max burst). - chunk_size - 1 - } else { - chunk_size - } -} diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 74cedfaee83dc..0bda6027eff07 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -276,6 +276,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { factory, chunk_size, input_data_types, + node.rate_limit.map(|x| x as _), ) .await? .boxed() @@ -313,6 +314,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { factory, chunk_size, input_data_types, + node.rate_limit.map(|x| x as _), ) .await? .boxed() diff --git a/src/tests/simulation/tests/integration_tests/sink/mod.rs b/src/tests/simulation/tests/integration_tests/sink/mod.rs index 81cd51019e3cc..518a422a60616 100644 --- a/src/tests/simulation/tests/integration_tests/sink/mod.rs +++ b/src/tests/simulation/tests/integration_tests/sink/mod.rs @@ -17,6 +17,8 @@ mod basic; #[cfg(madsim)] mod err_isolation; #[cfg(madsim)] +mod rate_limit; +#[cfg(madsim)] mod recovery; #[cfg(madsim)] mod scale; diff --git a/src/tests/simulation/tests/integration_tests/sink/rate_limit.rs b/src/tests/simulation/tests/integration_tests/sink/rate_limit.rs new file mode 100644 index 0000000000000..b4cc50f6b3ebf --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/sink/rate_limit.rs @@ -0,0 +1,89 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use tokio::time::sleep; + +use crate::sink::utils::{ + start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, + DROP_SINK, DROP_SOURCE, +}; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; + +#[tokio::test] +async fn test_sink_decouple_rate_limit() -> Result<()> { + let mut cluster = start_sink_test_cluster().await?; + + let source_parallelism = 6; + + let test_sink = SimulationTestSink::register_new(); + // There will be 1000 * 0.2 = 200 rows generated + let test_source = SimulationTestSource::register_new(source_parallelism, 0..1000, 0.2, 20); + + let mut session = cluster.start_session(); + + session.run("set streaming_parallelism = 2").await?; + session.run("set sink_decouple = true").await?; + session.run("set sink_rate_limit = 0").await?; + session.run(CREATE_SOURCE).await?; + session.run(CREATE_SINK).await?; + test_sink.wait_initial_parallelism(2).await?; + + // sink should not write anything due to 0 rate limit + sleep(Duration::from_secs(1)).await; + assert_eq!(0, test_sink.store.id_count()); + + // sink should write small number of rows with rate limit set to 1 + // the estimated sink throughput is 2 rows per second and we use 8 rows as the upper bound to check + session + .run("alter sink test_sink set sink_rate_limit to 1") + .await?; + sleep(Duration::from_secs(1)).await; + assert!(test_sink.store.id_count() > 0); + assert!(test_sink.store.id_count() <= 8); + + // sink should be paused + session + .run("alter sink test_sink set sink_rate_limit to 0") + .await?; + sleep(Duration::from_secs(1)).await; + let id_count = test_sink.store.id_count(); + sleep(Duration::from_secs(1)).await; + assert_eq!(id_count, test_sink.store.id_count()); + + // sink should be running without rate limit + session + .run("alter sink test_sink set sink_rate_limit to default") + .await?; + sleep(Duration::from_secs(1)).await; + assert!(test_sink.store.id_count() > id_count); + + test_sink + .store + .wait_for_count(test_source.id_list.len()) + .await?; + + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; + + assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); + test_sink.store.check_simple_result(&test_source.id_list)?; + assert!(test_sink.store.inner().checkpoint_count > 0); + + Ok(()) +}