Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-backfill): introduce state to snapshot backfill #19720

Open
wants to merge 5 commits into
base: yiming/snapshot-backfill-vnode-stream
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ test_snapshot_backfill() {

wait

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/scale.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/scale.slt' &

wait

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_mv.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_mv.slt' &

Expand Down
15 changes: 15 additions & 0 deletions e2e_test/backfill/snapshot_backfill/scale.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

statement ok
alter materialized view ${TEST_NAME}_mv set parallelism to 1;

sleep 3s

include ./check_data_equal.slt.part

statement ok
alter materialized view ${TEST_NAME}_mv set parallelism to 4;

sleep 3s

include ./check_data_equal.slt.part
61 changes: 49 additions & 12 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl StreamTableScan {

/// Build catalog for backfill state
///
/// When `is_snapshot_backfill` is `false`:
///
/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
///
/// key: | vnode |
Expand Down Expand Up @@ -153,9 +155,18 @@ impl StreamTableScan {
/// the corresponding `no_shuffle_backfill`.
/// However this is not high priority, since we are working on supporting arrangement backfill,
/// which already has this capability.
///
///
/// When `is_snapshot_backfill` is true:
///
/// /// Schema: | vnode | `epoch` | `is_epoch_finished` | pk ...
///
/// key: | vnode |
/// value: | `epoch` | `is_epoch_finished` | pk ...
pub fn build_backfill_state_catalog(
&self,
state: &mut BuildFragmentGraphState,
is_snapshot_backfill: bool,
) -> TableCatalog {
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();
Expand All @@ -165,17 +176,31 @@ impl StreamTableScan {
catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode"));
catalog_builder.add_order_column(0, OrderType::ascending());

// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}
if !is_snapshot_backfill {
// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}

// `backfill_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
// `backfill_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));

// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
} else {
// `epoch` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "epoch"));

// `is_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "is_epoch_finished"));

// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}
}

// Reuse the state store pk (vnode) as the vnode as well.
catalog_builder.set_vnode_col_idx(0);
Expand Down Expand Up @@ -284,9 +309,21 @@ impl StreamTableScan {
column_ids: upstream_column_ids.clone(),
};

let catalog = self
.build_backfill_state_catalog(state)
.to_internal_table_prost();
let catalog = match self.stream_scan_type {
StreamScanType::SnapshotBackfill => self
.build_backfill_state_catalog(state, true)
.to_internal_table_prost(),
StreamScanType::Chain
| StreamScanType::Rearrange
| StreamScanType::Backfill
| StreamScanType::UpstreamOnly
| StreamScanType::ArrangementBackfill => self
.build_backfill_state_catalog(state, false)
.to_internal_table_prost(),
StreamScanType::Unspecified => {
unreachable!()
}
};

// For backfill, we first read pk + output_indices from upstream.
// On this, we need to further project `output_indices` to the downstream.
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ use risingwave_hummock_sdk::key::{
TableKey, TableKeyRange,
};
use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::catalog::Table;
use risingwave_storage::error::{ErrorKind, StorageError};
use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::mem_table::MemTableError;
use risingwave_storage::row_serde::find_columns_by_ids;
Expand All @@ -55,6 +56,7 @@ use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::{
InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions,
ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt,
TryWaitEpochOptions,
};
use risingwave_storage::table::merge_sort::merge_sort;
use risingwave_storage::table::{
Expand Down Expand Up @@ -183,6 +185,17 @@ where
Ok(())
}

pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
self.store
.try_wait_epoch(
HummockReadEpoch::Committed(prev_epoch),
TryWaitEpochOptions {
table_id: self.table_id,
},
)
.await
}

pub fn state_store(&self) -> &S {
&self.store
}
Expand Down
95 changes: 94 additions & 1 deletion src/stream/src/executor/backfill/snapshot_backfill/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ use risingwave_storage::StateStore;
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::backfill::snapshot_backfill::state::{
BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
};
use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
use crate::executor::backfill::utils::{create_builder, mapping_message};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::prelude::{try_stream, StateTable, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier,
DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError,
Expand All @@ -48,6 +51,10 @@ use crate::task::CreateMviewProgressReporter;
pub struct SnapshotBackfillExecutor<S: StateStore> {
/// Upstream table
upstream_table: StorageTable<S>,
pk_in_output_indices: Vec<usize>,

/// Backfill progress table
progress_state_table: StateTable<S>,

/// Upstream with the same schema with the upstream table.
upstream: MergeExecutorInput,
Expand All @@ -72,6 +79,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
upstream_table: StorageTable<S>,
progress_state_table: StateTable<S>,
upstream: MergeExecutorInput,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
Expand All @@ -83,6 +91,14 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
snapshot_epoch: Option<u64>,
) -> Self {
assert_eq!(&upstream.info.schema, upstream_table.schema());
let Some(pk_in_output_indices) = upstream_table.pk_in_output_indices() else {
panic!(
"storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
upstream_table.pk_indices(),
upstream_table.output_indices(),
upstream_table.schema()
)
};
if let Some(rate_limit) = rate_limit {
debug!(
rate_limit,
Expand All @@ -91,6 +107,8 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
}
Self {
upstream_table,
pk_in_output_indices,
progress_state_table,
upstream,
output_indices,
progress,
Expand Down Expand Up @@ -132,9 +150,18 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
};
let first_recv_barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);
let mut backfill_state = BackfillState::new(
self.progress_state_table,
first_recv_barrier_epoch,
self.upstream_table.pk_serializer().clone(),
)
.await?;

let (mut barrier_epoch, mut need_report_finish) = {
if should_backfill {
assert!(backfill_state
.latest_progress()
.all(|(_, progress)| progress.is_none()));
let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
let actor_id_str = format!("{}", self.actor_ctx.id);

Expand All @@ -161,11 +188,13 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
]);
let snapshot_stream = make_consume_snapshot_stream(
&self.upstream_table,
&self.pk_in_output_indices,
first_barrier_epoch.prev,
self.chunk_size,
self.rate_limit,
&mut self.barrier_rx,
&mut self.progress,
&mut backfill_state,
first_recv_barrier_epoch,
);

Expand All @@ -184,6 +213,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
assert_eq!(first_upstream_barrier.epoch, recv_barrier.epoch);
backfill_state.commit(recv_barrier.epoch).await?;
yield Message::Barrier(recv_barrier);
}

Expand Down Expand Up @@ -248,6 +278,12 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
barrier.epoch,
upstream_buffer.barrier_count(),
);

backfill_state.finish_epoch(
self.upstream_table.vnodes().iter_vnodes(),
barrier.epoch.prev,
);
backfill_state.commit(barrier.epoch).await?;
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
yield Message::Barrier(barrier);
if update_vnode_bitmap.is_some() {
Expand All @@ -265,6 +301,15 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
);
(barrier_epoch, true)
} else {
backfill_state
.latest_progress()
.for_each(|(vnode, progress)| {
let expected_progress = VnodeBackfillProgress {
epoch: first_upstream_barrier.epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
assert_eq!(progress, Some(&expected_progress), "vnode: {:?}", vnode);
});
info!(
table_id = self.upstream_table.table_id().table_id,
"skip backfill"
Expand All @@ -279,17 +324,39 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
match msg {
Message::Barrier(barrier) => {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
backfill_state.finish_epoch(
self.upstream_table.vnodes().iter_vnodes(),
barrier.epoch.prev,
);
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
}
backfill_state.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);
if let Some(new_vnode_bitmap) = update_vnode_bitmap {
let _prev_vnode_bitmap = self
.upstream_table
.update_vnode_bitmap(new_vnode_bitmap.clone());
backfill_state
.update_vnode_bitmap(new_vnode_bitmap, barrier_epoch)
.await?;
let expected_progress = VnodeBackfillProgress {
epoch: barrier_epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
backfill_state
.latest_progress()
.for_each(|(vnode, progress)| {
assert_eq!(
progress,
Some(&expected_progress),
"vnode {:?} has unexpected progress",
vnode
);
});
}
}
msg => {
Expand Down Expand Up @@ -547,14 +614,17 @@ async fn make_snapshot_stream(
Ok(VnodeStream::new(vnode_streams, builder))
}

#[expect(clippy::too_many_arguments)]
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn make_consume_snapshot_stream<'a, S: StateStore>(
upstream_table: &'a StorageTable<S>,
pk_in_output_indices: &'a [usize],
snapshot_epoch: u64,
chunk_size: usize,
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
progress: &'a mut CreateMviewProgressReporter,
backfill_state: &'a mut BackfillState<S>,
first_recv_barrier_epoch: EpochPair,
) {
let mut barrier_epoch = first_recv_barrier_epoch;
Expand Down Expand Up @@ -605,6 +675,21 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
if barrier_epoch.curr >= snapshot_epoch {
return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
}
if let Some(chunk) = snapshot_stream.consume_builder() {
count += chunk.cardinality();
epoch_row_count += chunk.cardinality();
yield Message::Chunk(chunk);
}
snapshot_stream
.for_vnode_pk_progress(pk_in_output_indices, |vnode, pk_progress| {
if let Some(pk) = pk_progress {
backfill_state.update_epoch_progress(vnode, snapshot_epoch, pk);
} else {
backfill_state.finish_epoch([vnode], snapshot_epoch);
}
})
.await?;
backfill_state.commit(barrier.epoch).await?;
debug!(?barrier_epoch, count, epoch_row_count, "update progress");
progress.update(barrier_epoch, barrier_epoch.prev, count as _);
epoch_row_count = 0;
Expand All @@ -627,6 +712,13 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier_to_report_finish.epoch;
info!(?barrier_epoch, count, "report finish");
snapshot_stream
.for_vnode_pk_progress(pk_in_output_indices, |vnode, pk_progress| {
assert_eq!(pk_progress, None);
backfill_state.finish_epoch([vnode], snapshot_epoch);
})
.await?;
backfill_state.commit(barrier_epoch).await?;
progress.finish(barrier_epoch, count as _);
yield Message::Barrier(barrier_to_report_finish);

Expand All @@ -635,6 +727,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
let barrier = receive_next_barrier(barrier_rx).await?;
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
backfill_state.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);
if barrier_epoch.curr == snapshot_epoch {
break;
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/backfill/snapshot_backfill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod executor;
mod state;
mod vnode_stream;

pub use executor::SnapshotBackfillExecutor;
Loading
Loading