diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs similarity index 80% rename from src/stream/src/executor/backfill/snapshot_backfill.rs rename to src/stream/src/executor/backfill/snapshot_backfill/executor.rs index 6851d81ce790f..53ece87d9f955 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs @@ -15,31 +15,33 @@ use std::cmp::min; use std::collections::VecDeque; use std::future::{pending, ready, Future}; -use std::mem::{replace, take}; +use std::mem::take; use std::sync::Arc; use anyhow::anyhow; -use futures::future::Either; -use futures::{pin_mut, Stream, TryStreamExt}; +use futures::future::{try_join_all, Either}; +use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt}; use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::OwnedRow; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::HummockReadEpoch; -use risingwave_storage::error::StorageResult; +use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; use tokio::sync::mpsc::UnboundedReceiver; +use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream; use crate::executor::backfill::utils::{create_builder, mapping_message}; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ - expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, - StreamExecutorError, StreamExecutorResult, + expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, + DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError, + StreamExecutorResult, }; use crate::task::CreateMviewProgressReporter; @@ -128,6 +130,8 @@ impl SnapshotBackfillExecutor { false } }; + let first_recv_barrier_epoch = first_recv_barrier.epoch; + yield Message::Barrier(first_recv_barrier); let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { @@ -162,7 +166,7 @@ impl SnapshotBackfillExecutor { self.rate_limit, &mut self.barrier_rx, &mut self.progress, - first_recv_barrier, + first_recv_barrier_epoch, ); pin_mut!(snapshot_stream); @@ -218,15 +222,13 @@ impl SnapshotBackfillExecutor { // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. let stream = upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + .run_future(make_log_stream( + &self.upstream_table, barrier_epoch.prev, - HummockReadEpoch::Committed(barrier_epoch.prev), - false, + None, + self.chunk_size, )) .await?; - let data_types = self.upstream_table.schema().data_types(); - let builder = create_builder(None, self.chunk_size, data_types); - let stream = read_change_log(stream, builder); pin_mut!(stream); while let Some(chunk) = upstream_buffer.run_future(stream.try_next()).await? @@ -246,11 +248,16 @@ impl SnapshotBackfillExecutor { barrier.epoch, upstream_buffer.barrier_count(), ); - + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); yield Message::Barrier(barrier); + if update_vnode_bitmap.is_some() { + return Err(anyhow!( + "should not update vnode bitmap during consuming log store" + ) + .into()); + } } } - info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -262,23 +269,33 @@ impl SnapshotBackfillExecutor { table_id = self.upstream_table.table_id().table_id, "skip backfill" ); - assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch); - yield Message::Barrier(first_recv_barrier); + assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch); (first_upstream_barrier.epoch, false) } }; let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); // Phase 3: consume upstream while let Some(msg) = upstream.try_next().await? { - if let Message::Barrier(barrier) = &msg { - assert_eq!(barrier.epoch.prev, barrier_epoch.curr); - barrier_epoch = barrier.epoch; - if need_report_finish { - need_report_finish = false; - self.progress.finish_consuming_log_store(barrier_epoch); + match msg { + Message::Barrier(barrier) => { + assert_eq!(barrier.epoch.prev, barrier_epoch.curr); + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); + barrier_epoch = barrier.epoch; + if need_report_finish { + need_report_finish = false; + self.progress.finish_consuming_log_store(barrier_epoch); + } + yield Message::Barrier(barrier); + if let Some(new_vnode_bitmap) = update_vnode_bitmap { + let _prev_vnode_bitmap = self + .upstream_table + .update_vnode_bitmap(new_vnode_bitmap.clone()); + } + } + msg => { + yield msg; } } - yield msg; } } } @@ -299,56 +316,6 @@ impl Execute for SnapshotBackfillExecutor { } } -#[try_stream(ok = StreamChunk, error = StreamExecutorError)] -async fn read_change_log( - stream: impl Stream>, - mut builder: DataChunkBuilder, -) { - let chunk_size = builder.batch_size(); - pin_mut!(stream); - let mut ops = Vec::with_capacity(chunk_size); - while let Some(change_log_row) = stream.try_next().await? { - let change_log_row: ChangeLogRow = change_log_row; - match change_log_row { - ChangeLogRow::Insert(row) => { - ops.push(Op::Insert); - if let Some(chunk) = builder.append_one_row(row) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ChangeLogRow::Update { - old_value, - new_value, - } => { - if !builder.can_append(2) { - if let Some(chunk) = builder.consume_all() { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ops.extend([Op::UpdateDelete, Op::UpdateInsert]); - assert!(builder.append_one_row(old_value).is_none()); - if let Some(chunk) = builder.append_one_row(new_value) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ChangeLogRow::Delete(row) => { - ops.push(Op::Delete); - if let Some(chunk) = builder.append_one_row(row) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - } - } - - if let Some(chunk) = builder.consume_all() { - yield StreamChunk::from_parts(ops, chunk); - } -} - struct ConsumingSnapshot; struct ConsumingLogStore; @@ -515,22 +482,70 @@ async fn receive_next_barrier( .ok_or_else(|| anyhow!("end of barrier receiver"))?) } -#[try_stream(ok = StreamChunk, error = StreamExecutorError)] -async fn make_snapshot_stream<'a>( - row_stream: impl Stream> + 'a, - mut builder: DataChunkBuilder, -) { - pin_mut!(row_stream); - while let Some(row) = row_stream.try_next().await? { - if let Some(data_chunk) = builder.append_one_row(row) { - let ops = vec![Op::Insert; data_chunk.capacity()]; - yield StreamChunk::from_parts(ops, data_chunk); - } - } - if let Some(data_chunk) = builder.consume_all() { - let ops = vec![Op::Insert; data_chunk.capacity()]; - yield StreamChunk::from_parts(ops, data_chunk); - } +async fn make_log_stream( + upstream_table: &StorageTable, + prev_epoch: u64, + start_pk: Option, + chunk_size: usize, +) -> StreamExecutorResult> { + let data_types = upstream_table.schema().data_types(); + let start_pk = start_pk.as_ref(); + // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency. + let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| { + upstream_table + .batch_iter_vnode_log( + prev_epoch, + HummockReadEpoch::Committed(prev_epoch), + start_pk, + vnode, + ) + .map_ok(move |stream| { + let stream = stream.map(|result| { + Ok(match result? { + ChangeLogRow::Insert(row) => ((Op::Insert, row), None), + ChangeLogRow::Update { + new_value, + old_value, + } => ( + (Op::UpdateDelete, old_value), + Some((Op::UpdateInsert, new_value)), + ), + ChangeLogRow::Delete(row) => ((Op::Delete, row), None), + }) + }); + (vnode, stream) + }) + })) + .await?; + let builder = create_builder(None, chunk_size, data_types.clone()); + Ok(VnodeStream::new(vnode_streams, builder)) +} + +async fn make_snapshot_stream( + upstream_table: &StorageTable, + snapshot_epoch: u64, + start_pk: Option, + rate_limit: Option, + chunk_size: usize, +) -> StreamExecutorResult> { + let data_types = upstream_table.schema().data_types(); + let start_pk = start_pk.as_ref(); + let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| { + upstream_table + .batch_iter_vnode( + HummockReadEpoch::Committed(snapshot_epoch), + start_pk, + vnode, + PrefetchOptions::prefetch_for_large_range_scan(), + ) + .map_ok(move |stream| { + let stream = stream.map(|result| Ok(((Op::Insert, result?), None))); + (vnode, stream) + }) + })) + .await?; + let builder = create_builder(rate_limit, chunk_size, data_types.clone()); + Ok(VnodeStream::new(vnode_streams, builder)) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -541,10 +556,9 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, progress: &'a mut CreateMviewProgressReporter, - first_recv_barrier: Barrier, + first_recv_barrier_epoch: EpochPair, ) { - let mut barrier_epoch = first_recv_barrier.epoch; - yield Message::Barrier(first_recv_barrier); + let mut barrier_epoch = first_recv_barrier_epoch; info!( table_id = upstream_table.table_id().table_id, @@ -553,15 +567,8 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( ); // start consume upstream snapshot - let snapshot_row_stream = BackfillExecutor::snapshot_read( - upstream_table, - HummockReadEpoch::Committed(snapshot_epoch), - None, - ); - let data_types = upstream_table.schema().data_types(); - let builder = create_builder(rate_limit, chunk_size, data_types.clone()); - let snapshot_stream = make_snapshot_stream(snapshot_row_stream, builder); - pin_mut!(snapshot_stream); + let mut snapshot_stream = + make_snapshot_stream(upstream_table, snapshot_epoch, None, rate_limit, chunk_size).await?; async fn select_barrier_and_snapshot_stream( barrier_rx: &mut UnboundedReceiver, @@ -599,9 +606,10 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( if barrier_epoch.curr >= snapshot_epoch { return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into()); } - debug!(?barrier_epoch, count, "update progress"); + debug!(?barrier_epoch, count, epoch_row_count, "update progress"); progress.update(barrier_epoch, barrier_epoch.prev, count as _); epoch_row_count = 0; + yield Message::Barrier(barrier); } Either::Right(Some(chunk)) => { diff --git a/src/stream/src/executor/backfill/snapshot_backfill/mod.rs b/src/stream/src/executor/backfill/snapshot_backfill/mod.rs new file mode 100644 index 0000000000000..882fd87f3e138 --- /dev/null +++ b/src/stream/src/executor/backfill/snapshot_backfill/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod executor; +mod vnode_stream; + +pub use executor::SnapshotBackfillExecutor; diff --git a/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs b/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs new file mode 100644 index 0000000000000..8d93eb2521120 --- /dev/null +++ b/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs @@ -0,0 +1,200 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::mem::{replace, take}; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +use futures::stream::{FuturesUnordered, Peekable, StreamFuture}; +use futures::{Stream, StreamExt, TryStreamExt}; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::hash::VirtualNode; +use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + +use crate::executor::StreamExecutorResult; + +/// Can be either one row or two rows. When having two rows, the two rows should be included in the same stream chunk. +/// The case of two rows is for combining the `UpdateDelete` and `UpdateInsert` of `ChangeLogValue::Update` +pub(super) type BackfillRowItem = ((Op, OwnedRow), Option<(Op, OwnedRow)>); +pub(super) trait BackfillRowStream = + Stream> + Sized + 'static; + +struct StreamWithVnode { + stream: St, + vnode: VirtualNode, +} + +impl Stream for StreamWithVnode { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_next_unpin(cx) + } +} + +pub(super) struct VnodeStream { + #[expect(clippy::type_complexity)] + streams: FuturesUnordered>>>>>, + finished_vnode: HashSet, + data_chunk_builder: DataChunkBuilder, + ops: Vec, +} + +impl VnodeStream { + pub(super) fn new( + vnode_streams: impl IntoIterator, + data_chunk_builder: DataChunkBuilder, + ) -> Self { + assert!(data_chunk_builder.is_empty()); + assert!(data_chunk_builder.batch_size() >= 2); + let streams = + FuturesUnordered::from_iter(vnode_streams.into_iter().map(|(vnode, stream)| { + StreamWithVnode { + stream: Box::pin(stream.peekable()), + vnode, + } + .into_future() + })); + let ops = Vec::with_capacity(data_chunk_builder.batch_size()); + Self { + streams, + finished_vnode: HashSet::new(), + data_chunk_builder, + ops, + } + } +} + +impl VnodeStream { + fn poll_next_row( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + let ready_item = match ready!(self.streams.poll_next_unpin(cx)) { + None => Ok(None), + Some((None, stream)) => { + assert!(self.finished_vnode.insert(stream.vnode)); + continue; + } + Some((Some(Ok(item)), stream)) => { + self.streams.push(stream.into_future()); + Ok(Some(item)) + } + Some((Some(Err(e)), _stream)) => Err(e), + }; + break Poll::Ready(ready_item); + } + } + + #[expect(dead_code)] + pub(super) fn consume_builder(&mut self) -> Option { + self.data_chunk_builder.consume_all().map(|chunk| { + let ops = replace( + &mut self.ops, + Vec::with_capacity(self.data_chunk_builder.batch_size()), + ); + StreamChunk::from_parts(ops, chunk) + }) + } + + #[expect(dead_code)] + pub(super) async fn for_vnode_pk_progress( + &mut self, + pk_indices: &[usize], + mut on_vnode_progress: impl FnMut(VirtualNode, Option), + ) -> StreamExecutorResult<()> { + assert!(self.data_chunk_builder.is_empty()); + for vnode in &self.finished_vnode { + on_vnode_progress(*vnode, None); + } + for vnode_stream in &mut self.streams { + let vnode_stream = vnode_stream.get_mut().expect("should exist"); + match vnode_stream.stream.as_mut().peek().await { + Some(Ok(((_, row), second))) => { + let pk = row.project(pk_indices).to_owned_row(); + if cfg!(debug_assertions) + && let Some((_, second_row)) = second + { + assert_eq!(pk, second_row.project(pk_indices).to_owned_row()); + } + on_vnode_progress(vnode_stream.vnode, Some(pk)); + } + Some(Err(_)) => { + return Err(vnode_stream + .stream + .try_next() + .await + .expect_err("checked Err")); + } + None => { + on_vnode_progress(vnode_stream.vnode, None); + } + } + } + Ok(()) + } +} + +impl Stream for VnodeStream { + type Item = StreamExecutorResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let capacity = this.data_chunk_builder.batch_size(); + loop { + match ready!(this.poll_next_row(cx)) { + Ok(Some(((op, row), second))) => { + let may_chunk = if let Some((second_op, second_row)) = second { + if this.data_chunk_builder.can_append(2) { + this.ops.extend([op, second_op]); + assert!(this.data_chunk_builder.append_one_row(row).is_none()); + this.data_chunk_builder.append_one_row(second_row) + } else { + let chunk = this + .data_chunk_builder + .consume_all() + .expect("should be Some when not can_append"); + let ops = replace(&mut this.ops, Vec::with_capacity(capacity)); + this.ops.extend([op, second_op]); + assert!(this.data_chunk_builder.append_one_row(row).is_none()); + assert!(this.data_chunk_builder.append_one_row(second_row).is_none()); + break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))); + } + } else { + this.ops.push(op); + this.data_chunk_builder.append_one_row(row) + }; + if let Some(chunk) = may_chunk { + let ops = replace(&mut this.ops, Vec::with_capacity(capacity)); + break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))); + } + } + Ok(None) => { + break if let Some(chunk) = this.data_chunk_builder.consume_all() { + let ops = take(&mut this.ops); + Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))) + } else { + Poll::Ready(None) + }; + } + Err(e) => { + break Poll::Ready(Some(Err(e))); + } + } + } + } +}