diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 957f49b63e503..a1026e0d36c1f 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -462,6 +462,9 @@ message ChainNode { // The rate limit for the chain node. optional uint32 rate_limit = 8; + + // Snapshot read every N barriers + uint32 snapshot_read_barrier_interval = 9; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 367cf4ce35ac4..90ac1a31bc6f7 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -36,7 +36,7 @@ use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 38] = [ +const CONFIG_KEYS: [&str; 39] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -75,6 +75,7 @@ const CONFIG_KEYS: [&str; 38] = [ "CDC_BACKFILL", "RW_STREAMING_OVER_WINDOW_CACHE_POLICY", "BACKGROUND_DDL", + "BACKFILL_SNAPSHOT_BARRIER_INTERVAL", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -117,6 +118,7 @@ const STREAMING_RATE_LIMIT: usize = 34; const CDC_BACKFILL: usize = 35; const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36; const BACKGROUND_DDL: usize = 37; +const BACKFILL_SNAPSHOT_BARRIER_INTERVAL: usize = 38; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -342,6 +344,7 @@ type StandardConformingStrings = ConfigString; type StreamingRateLimit = ConfigU64; type CdcBackfill = ConfigBool; type BackgroundDdl = ConfigBool; +type BackfillSnapshotBarrierInterval = ConfigU64; /// Report status or notice to caller. pub trait ConfigReporter { @@ -491,6 +494,8 @@ pub struct ConfigMap { streaming_over_window_cache_policy: OverWindowCachePolicy, background_ddl: BackgroundDdl, + + backfill_snapshot_barrier_interval: BackfillSnapshotBarrierInterval, } impl ConfigMap { @@ -610,6 +615,8 @@ impl ConfigMap { self.streaming_over_window_cache_policy = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { self.background_ddl = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(BackfillSnapshotBarrierInterval::entry_name()) { + self.backfill_snapshot_barrier_interval = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -699,6 +706,8 @@ impl ConfigMap { Ok(self.streaming_over_window_cache_policy.to_string()) } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { Ok(self.background_ddl.to_string()) + } else if key.eq_ignore_ascii_case(BackfillSnapshotBarrierInterval::entry_name()) { + Ok(self.backfill_snapshot_barrier_interval.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -896,6 +905,11 @@ impl ConfigMap { setting: self.background_ddl.to_string(), description: String::from("Run DDL statements in background"), }, + VariableInfo { + name: BackfillSnapshotBarrierInterval::entry_name().to_lowercase(), + setting: self.backfill_snapshot_barrier_interval.to_string(), + description: String::from("Read from snapshot every N barriers"), + }, ] } @@ -1038,4 +1052,8 @@ impl ConfigMap { pub fn get_background_ddl(&self) -> bool { self.background_ddl.0 } + + pub fn get_backfill_snapshot_barrier_interval(&self) -> u64 { + self.backfill_snapshot_barrier_interval.0 + } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index bc841a3efd5cf..2520ce0a09cb1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -315,6 +315,12 @@ impl StreamTableScan { .session_ctx() .config() .get_streaming_rate_limit(), + snapshot_read_barrier_interval: self + .ctx() + .session_ctx() + .config() + .get_backfill_snapshot_barrier_interval() + as u32, })), stream_key, operator_id: self.base.id.0 as u64, diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index bd40ea8b34e7d..c69992fb679e1 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -101,6 +101,41 @@ pub struct BackfillExecutor { metrics: Arc, chunk_size: usize, + + // By default we read every barrier. + // With this we read every N barriers. + snapshot_read_interval: usize, +} + +struct SnapshotControl { + snapshot_read_interval: usize, + intervals: usize, +} + +impl SnapshotControl { + fn new(mut snapshot_read_interval: usize) -> Self { + if snapshot_read_interval == 0 { + snapshot_read_interval = 1; + } + Self { + snapshot_read_interval, + intervals: 0, + } + } + + fn try_read(&mut self) -> bool { + let should_read = self.intervals == 0; + if should_read { + tracing::trace!("backfill should snapshot_read, interval={}", self.intervals); + } else { + tracing::trace!( + "backfill should not snapshot_read, interval={}", + self.intervals + ); + } + self.intervals = (self.intervals + 1) % self.snapshot_read_interval; + should_read + } } impl BackfillExecutor @@ -119,6 +154,7 @@ where metrics: Arc, chunk_size: usize, executor_id: u64, + snapshot_read_interval: usize, ) -> Self { Self { info: ExecutorInfo { @@ -134,12 +170,15 @@ where progress, metrics, chunk_size, + snapshot_read_interval, } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. + let mut snapshot_control = SnapshotControl::new(self.snapshot_read_interval); + let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); let state_len = pk_in_output_indices.len() + METADATA_STATE_LEN; @@ -289,8 +328,82 @@ where // to flush remaining chunks from the chunk builder // on barrier. // Hence we break here and process it after this block. - pending_barrier = Some(barrier); - break; + if snapshot_control.try_read() { + pending_barrier = Some(barrier); + break; + } else { + // Process barrier: + // - consume upstream buffer chunk + // - switch snapshot + + // Consume upstream buffer chunk + // If no current_pos, means we did not process any snapshot + // yet. In that case + // we can just ignore the upstream buffer chunk, but still need to clean it. + if let Some(current_pos) = ¤t_pos { + for chunk in upstream_chunk_buffer.drain(..) { + cur_barrier_upstream_processed_rows += + chunk.cardinality() as u64; + yield Message::Chunk(mapping_chunk( + mark_chunk( + chunk, + current_pos, + &pk_in_output_indices, + pk_order, + ), + &self.output_indices, + )); + } + } else { + upstream_chunk_buffer.clear() + } + + self.metrics + .backfill_snapshot_read_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + self.actor_id.to_string().as_str(), + ]) + .inc_by(cur_barrier_snapshot_processed_rows); + + self.metrics + .backfill_upstream_output_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + self.actor_id.to_string().as_str(), + ]) + .inc_by(cur_barrier_upstream_processed_rows); + + // Update snapshot read epoch. + snapshot_read_epoch = barrier.epoch.prev; + + self.progress.update( + barrier.epoch.curr, + snapshot_read_epoch, + total_snapshot_processed_rows, + ); + + // Persist state on barrier + Self::persist_state( + barrier.epoch, + &mut self.state_table, + false, + ¤t_pos, + total_snapshot_processed_rows, + &mut old_state, + &mut current_state, + ) + .await?; + + tracing::trace!( + epoch = ?barrier.epoch, + ?current_pos, + total_snapshot_processed_rows, + "Backfill state persisted" + ); + + yield Message::Barrier(barrier); + } } Message::Chunk(chunk) => { // Buffer the upstream chunk. @@ -584,6 +697,7 @@ where ordered: bool, builder: &'a mut DataChunkBuilder, ) { + tracing::trace!("new snapshot_read, epoch={}", epoch); let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos); let range_bounds = match range_bounds { None => { diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 81030526b82f3..dbecae35542d8 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -162,6 +162,8 @@ impl ExecutorBuilder for ChainExecutorBuilder { None }; + let snapshot_read_interval = node.get_snapshot_read_barrier_interval(); + BackfillExecutor::new( upstream_table, mview, @@ -173,6 +175,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { stream.streaming_metrics.clone(), params.env.config().developer.chunk_size, params.executor_id, + snapshot_read_interval as usize, ) .boxed() }