Skip to content

Commit

Permalink
feat(stream): support snapshot read over N barriers (#12644)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 6, 2023
1 parent 3fbe1ab commit 461c4eb
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 3 deletions.
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ message ChainNode {

// The rate limit for the chain node.
optional uint32 rate_limit = 8;

// Snapshot read every N barriers
uint32 snapshot_read_barrier_interval = 9;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
20 changes: 19 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 38] = [
const CONFIG_KEYS: [&str; 39] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand Down Expand Up @@ -75,6 +75,7 @@ const CONFIG_KEYS: [&str; 38] = [
"CDC_BACKFILL",
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
"BACKGROUND_DDL",
"BACKFILL_SNAPSHOT_BARRIER_INTERVAL",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand Down Expand Up @@ -117,6 +118,7 @@ const STREAMING_RATE_LIMIT: usize = 34;
const CDC_BACKFILL: usize = 35;
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;
const BACKGROUND_DDL: usize = 37;
const BACKFILL_SNAPSHOT_BARRIER_INTERVAL: usize = 38;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -342,6 +344,7 @@ type StandardConformingStrings = ConfigString<STANDARD_CONFORMING_STRINGS>;
type StreamingRateLimit = ConfigU64<STREAMING_RATE_LIMIT, 0>;
type CdcBackfill = ConfigBool<CDC_BACKFILL, false>;
type BackgroundDdl = ConfigBool<BACKGROUND_DDL, false>;
type BackfillSnapshotBarrierInterval = ConfigU64<BACKFILL_SNAPSHOT_BARRIER_INTERVAL, 1>;

/// Report status or notice to caller.
pub trait ConfigReporter {
Expand Down Expand Up @@ -491,6 +494,8 @@ pub struct ConfigMap {
streaming_over_window_cache_policy: OverWindowCachePolicy,

background_ddl: BackgroundDdl,

backfill_snapshot_barrier_interval: BackfillSnapshotBarrierInterval,
}

impl ConfigMap {
Expand Down Expand Up @@ -610,6 +615,8 @@ impl ConfigMap {
self.streaming_over_window_cache_policy = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
self.background_ddl = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(BackfillSnapshotBarrierInterval::entry_name()) {
self.backfill_snapshot_barrier_interval = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -699,6 +706,8 @@ impl ConfigMap {
Ok(self.streaming_over_window_cache_policy.to_string())
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
Ok(self.background_ddl.to_string())
} else if key.eq_ignore_ascii_case(BackfillSnapshotBarrierInterval::entry_name()) {
Ok(self.backfill_snapshot_barrier_interval.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -896,6 +905,11 @@ impl ConfigMap {
setting: self.background_ddl.to_string(),
description: String::from("Run DDL statements in background"),
},
VariableInfo {
name: BackfillSnapshotBarrierInterval::entry_name().to_lowercase(),
setting: self.backfill_snapshot_barrier_interval.to_string(),
description: String::from("Read from snapshot every N barriers"),
},
]
}

Expand Down Expand Up @@ -1038,4 +1052,8 @@ impl ConfigMap {
pub fn get_background_ddl(&self) -> bool {
self.background_ddl.0
}

pub fn get_backfill_snapshot_barrier_interval(&self) -> u64 {
self.backfill_snapshot_barrier_interval.0
}
}
6 changes: 6 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ impl StreamTableScan {
.session_ctx()
.config()
.get_streaming_rate_limit(),
snapshot_read_barrier_interval: self
.ctx()
.session_ctx()
.config()
.get_backfill_snapshot_barrier_interval()
as u32,
})),
stream_key,
operator_id: self.base.id.0 as u64,
Expand Down
118 changes: 116 additions & 2 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,41 @@ pub struct BackfillExecutor<S: StateStore> {
metrics: Arc<StreamingMetrics>,

chunk_size: usize,

// By default we read every barrier.
// With this we read every N barriers.
snapshot_read_interval: usize,
}

struct SnapshotControl {
snapshot_read_interval: usize,
intervals: usize,
}

impl SnapshotControl {
fn new(mut snapshot_read_interval: usize) -> Self {
if snapshot_read_interval == 0 {
snapshot_read_interval = 1;
}
Self {
snapshot_read_interval,
intervals: 0,
}
}

fn try_read(&mut self) -> bool {
let should_read = self.intervals == 0;
if should_read {
tracing::trace!("backfill should snapshot_read, interval={}", self.intervals);
} else {
tracing::trace!(
"backfill should not snapshot_read, interval={}",
self.intervals
);
}
self.intervals = (self.intervals + 1) % self.snapshot_read_interval;
should_read
}
}

impl<S> BackfillExecutor<S>
Expand All @@ -119,6 +154,7 @@ where
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
executor_id: u64,
snapshot_read_interval: usize,
) -> Self {
Self {
info: ExecutorInfo {
Expand All @@ -134,12 +170,15 @@ where
progress,
metrics,
chunk_size,
snapshot_read_interval,
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
// The primary key columns, in the output columns of the upstream_table scan.
let mut snapshot_control = SnapshotControl::new(self.snapshot_read_interval);

let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();

let state_len = pk_in_output_indices.len() + METADATA_STATE_LEN;
Expand Down Expand Up @@ -289,8 +328,82 @@ where
// to flush remaining chunks from the chunk builder
// on barrier.
// Hence we break here and process it after this block.
pending_barrier = Some(barrier);
break;
if snapshot_control.try_read() {
pending_barrier = Some(barrier);
break;
} else {
// Process barrier:
// - consume upstream buffer chunk
// - switch snapshot

// Consume upstream buffer chunk
// If no current_pos, means we did not process any snapshot
// yet. In that case
// we can just ignore the upstream buffer chunk, but still need to clean it.
if let Some(current_pos) = &current_pos {
for chunk in upstream_chunk_buffer.drain(..) {
cur_barrier_upstream_processed_rows +=
chunk.cardinality() as u64;
yield Message::Chunk(mapping_chunk(
mark_chunk(
chunk,
current_pos,
&pk_in_output_indices,
pk_order,
),
&self.output_indices,
));
}
} else {
upstream_chunk_buffer.clear()
}

self.metrics
.backfill_snapshot_read_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_snapshot_processed_rows);

self.metrics
.backfill_upstream_output_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);

// Update snapshot read epoch.
snapshot_read_epoch = barrier.epoch.prev;

self.progress.update(
barrier.epoch.curr,
snapshot_read_epoch,
total_snapshot_processed_rows,
);

// Persist state on barrier
Self::persist_state(
barrier.epoch,
&mut self.state_table,
false,
&current_pos,
total_snapshot_processed_rows,
&mut old_state,
&mut current_state,
)
.await?;

tracing::trace!(
epoch = ?barrier.epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill state persisted"
);

yield Message::Barrier(barrier);
}
}
Message::Chunk(chunk) => {
// Buffer the upstream chunk.
Expand Down Expand Up @@ -584,6 +697,7 @@ where
ordered: bool,
builder: &'a mut DataChunkBuilder,
) {
tracing::trace!("new snapshot_read, epoch={}", epoch);
let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
let range_bounds = match range_bounds {
None => {
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ impl ExecutorBuilder for ChainExecutorBuilder {
None
};

let snapshot_read_interval = node.get_snapshot_read_barrier_interval();

BackfillExecutor::new(
upstream_table,
mview,
Expand All @@ -173,6 +175,7 @@ impl ExecutorBuilder for ChainExecutorBuilder {
stream.streaming_metrics.clone(),
params.env.config().developer.chunk_size,
params.executor_id,
snapshot_read_interval as usize,
)
.boxed()
}
Expand Down

0 comments on commit 461c4eb

Please sign in to comment.