Skip to content

Commit

Permalink
refactor(snapshot-backfill): extract common logic of consuming snapsh…
Browse files Browse the repository at this point in the history
…ot and log store
  • Loading branch information
wenym1 committed Dec 25, 2024
1 parent c40eb04 commit 579b264
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@
use std::cmp::min;
use std::collections::VecDeque;
use std::future::{pending, ready, Future};
use std::mem::{replace, take};
use std::mem::take;
use std::sync::Arc;

use anyhow::anyhow;
use futures::future::Either;
use futures::{pin_mut, Stream, TryStreamExt};
use futures::future::{try_join_all, Either};
use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::ChangeLogRow;
use risingwave_storage::StateStore;
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
use crate::executor::backfill::utils::{create_builder, mapping_message};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message,
StreamExecutorError, StreamExecutorResult,
expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier,
DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError,
StreamExecutorResult,
};
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -128,6 +130,8 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
false
}
};
let first_recv_barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);

let (mut barrier_epoch, mut need_report_finish) = {
if should_backfill {
Expand Down Expand Up @@ -162,7 +166,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
self.rate_limit,
&mut self.barrier_rx,
&mut self.progress,
first_recv_barrier,
first_recv_barrier_epoch,
);

pin_mut!(snapshot_stream);
Expand Down Expand Up @@ -218,15 +222,13 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
// on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
// and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
let stream = upstream_buffer
.run_future(self.upstream_table.batch_iter_log_with_pk_bounds(
.run_future(make_log_stream(
&self.upstream_table,
barrier_epoch.prev,
HummockReadEpoch::Committed(barrier_epoch.prev),
false,
None,
self.chunk_size,
))
.await?;
let data_types = self.upstream_table.schema().data_types();
let builder = create_builder(None, self.chunk_size, data_types);
let stream = read_change_log(stream, builder);
pin_mut!(stream);
while let Some(chunk) =
upstream_buffer.run_future(stream.try_next()).await?
Expand All @@ -246,11 +248,16 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
barrier.epoch,
upstream_buffer.barrier_count(),
);

let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
yield Message::Barrier(barrier);
if update_vnode_bitmap.is_some() {
return Err(anyhow!(
"should not update vnode bitmap during consuming log store"
)
.into());
}
}
}

info!(
?barrier_epoch,
table_id = self.upstream_table.table_id().table_id,
Expand All @@ -262,23 +269,33 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
table_id = self.upstream_table.table_id().table_id,
"skip backfill"
);
assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_recv_barrier);
assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
(first_upstream_barrier.epoch, false)
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
if let Message::Barrier(barrier) = &msg {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
match msg {
Message::Barrier(barrier) => {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
}
yield Message::Barrier(barrier);
if let Some(new_vnode_bitmap) = update_vnode_bitmap {
let _prev_vnode_bitmap = self
.upstream_table
.update_vnode_bitmap(new_vnode_bitmap.clone());
}
}
msg => {
yield msg;
}
}
yield msg;
}
}
}
Expand All @@ -299,56 +316,6 @@ impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
}
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn read_change_log(
stream: impl Stream<Item = StorageResult<ChangeLogRow>>,
mut builder: DataChunkBuilder,
) {
let chunk_size = builder.batch_size();
pin_mut!(stream);
let mut ops = Vec::with_capacity(chunk_size);
while let Some(change_log_row) = stream.try_next().await? {
let change_log_row: ChangeLogRow = change_log_row;
match change_log_row {
ChangeLogRow::Insert(row) => {
ops.push(Op::Insert);
if let Some(chunk) = builder.append_one_row(row) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ChangeLogRow::Update {
old_value,
new_value,
} => {
if !builder.can_append(2) {
if let Some(chunk) = builder.consume_all() {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
assert!(builder.append_one_row(old_value).is_none());
if let Some(chunk) = builder.append_one_row(new_value) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ChangeLogRow::Delete(row) => {
ops.push(Op::Delete);
if let Some(chunk) = builder.append_one_row(row) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
}
}

if let Some(chunk) = builder.consume_all() {
yield StreamChunk::from_parts(ops, chunk);
}
}

struct ConsumingSnapshot;
struct ConsumingLogStore;

Expand Down Expand Up @@ -515,22 +482,69 @@ async fn receive_next_barrier(
.ok_or_else(|| anyhow!("end of barrier receiver"))?)
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
row_stream: impl Stream<Item = StreamExecutorResult<OwnedRow>> + 'a,
mut builder: DataChunkBuilder,
) {
pin_mut!(row_stream);
while let Some(row) = row_stream.try_next().await? {
if let Some(data_chunk) = builder.append_one_row(row) {
let ops = vec![Op::Insert; data_chunk.capacity()];
yield StreamChunk::from_parts(ops, data_chunk);
}
}
if let Some(data_chunk) = builder.consume_all() {
let ops = vec![Op::Insert; data_chunk.capacity()];
yield StreamChunk::from_parts(ops, data_chunk);
}
async fn make_log_stream(
upstream_table: &StorageTable<impl StateStore>,
prev_epoch: u64,
start_pk: Option<OwnedRow>,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::BackfillRowStream>> {
let data_types = upstream_table.schema().data_types();
let start_pk = start_pk.as_ref();
let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
upstream_table
.batch_iter_vnode_log(
prev_epoch,
HummockReadEpoch::Committed(prev_epoch),
start_pk,
vnode,
)
.map_ok(move |stream| {
let stream = stream.map(|result| {
Ok(match result? {
ChangeLogRow::Insert(row) => ((Op::Insert, row), None),
ChangeLogRow::Update {
new_value,
old_value,
} => (
(Op::UpdateDelete, old_value),
Some((Op::UpdateInsert, new_value)),
),
ChangeLogRow::Delete(row) => ((Op::Delete, row), None),
})
});
(vnode, stream)
})
}))
.await?;
let builder = create_builder(None, chunk_size, data_types.clone());
Ok(VnodeStream::new(vnode_streams, builder))
}

async fn make_snapshot_stream(
upstream_table: &StorageTable<impl StateStore>,
snapshot_epoch: u64,
start_pk: Option<OwnedRow>,
rate_limit: Option<usize>,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::BackfillRowStream>> {
let data_types = upstream_table.schema().data_types();
let start_pk = start_pk.as_ref();
let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
upstream_table
.batch_iter_vnode(
HummockReadEpoch::Committed(snapshot_epoch),
start_pk,
vnode,
PrefetchOptions::prefetch_for_large_range_scan(),
)
.map_ok(move |stream| {
let stream = stream.map(|result| Ok(((Op::Insert, result?), None)));
(vnode, stream)
})
}))
.await?;
let builder = create_builder(rate_limit, chunk_size, data_types.clone());
Ok(VnodeStream::new(vnode_streams, builder))
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -541,10 +555,9 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
progress: &'a mut CreateMviewProgressReporter,
first_recv_barrier: Barrier,
first_recv_barrier_epoch: EpochPair,
) {
let mut barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);
let mut barrier_epoch = first_recv_barrier_epoch;

info!(
table_id = upstream_table.table_id().table_id,
Expand All @@ -553,15 +566,8 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
);

// start consume upstream snapshot
let snapshot_row_stream = BackfillExecutor::snapshot_read(
upstream_table,
HummockReadEpoch::Committed(snapshot_epoch),
None,
);
let data_types = upstream_table.schema().data_types();
let builder = create_builder(rate_limit, chunk_size, data_types.clone());
let snapshot_stream = make_snapshot_stream(snapshot_row_stream, builder);
pin_mut!(snapshot_stream);
let mut snapshot_stream =
make_snapshot_stream(upstream_table, snapshot_epoch, None, rate_limit, chunk_size).await?;

async fn select_barrier_and_snapshot_stream(
barrier_rx: &mut UnboundedReceiver<Barrier>,
Expand Down Expand Up @@ -599,9 +605,10 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
if barrier_epoch.curr >= snapshot_epoch {
return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
}
debug!(?barrier_epoch, count, "update progress");
debug!(?barrier_epoch, count, epoch_row_count, "update progress");
progress.update(barrier_epoch, barrier_epoch.prev, count as _);
epoch_row_count = 0;

yield Message::Barrier(barrier);
}
Either::Right(Some(chunk)) => {
Expand Down
18 changes: 18 additions & 0 deletions src/stream/src/executor/backfill/snapshot_backfill/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod executor;
mod vnode_stream;

pub use executor::SnapshotBackfillExecutor;
Loading

0 comments on commit 579b264

Please sign in to comment.