Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(snapshot-backfill): extract common logic of consuming snapshot and log store #19936

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@
use std::cmp::min;
use std::collections::VecDeque;
use std::future::{pending, ready, Future};
use std::mem::{replace, take};
use std::mem::take;
use std::sync::Arc;

use anyhow::anyhow;
use futures::future::Either;
use futures::{pin_mut, Stream, TryStreamExt};
use futures::future::{try_join_all, Either};
use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::ChangeLogRow;
use risingwave_storage::StateStore;
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
use crate::executor::backfill::utils::{create_builder, mapping_message};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message,
StreamExecutorError, StreamExecutorResult,
expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier,
DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError,
StreamExecutorResult,
};
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -128,6 +130,8 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
false
}
};
let first_recv_barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);

let (mut barrier_epoch, mut need_report_finish) = {
if should_backfill {
Expand Down Expand Up @@ -162,7 +166,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
self.rate_limit,
&mut self.barrier_rx,
&mut self.progress,
first_recv_barrier,
first_recv_barrier_epoch,
);

pin_mut!(snapshot_stream);
Expand Down Expand Up @@ -218,15 +222,13 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
// on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
// and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
let stream = upstream_buffer
.run_future(self.upstream_table.batch_iter_log_with_pk_bounds(
.run_future(make_log_stream(
&self.upstream_table,
barrier_epoch.prev,
HummockReadEpoch::Committed(barrier_epoch.prev),
false,
None,
self.chunk_size,
))
.await?;
let data_types = self.upstream_table.schema().data_types();
let builder = create_builder(None, self.chunk_size, data_types);
let stream = read_change_log(stream, builder);
pin_mut!(stream);
while let Some(chunk) =
upstream_buffer.run_future(stream.try_next()).await?
Expand All @@ -246,11 +248,16 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
barrier.epoch,
upstream_buffer.barrier_count(),
);

let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
yield Message::Barrier(barrier);
if update_vnode_bitmap.is_some() {
return Err(anyhow!(
"should not update vnode bitmap during consuming log store"
)
.into());
}
}
}

info!(
?barrier_epoch,
table_id = self.upstream_table.table_id().table_id,
Expand All @@ -262,23 +269,33 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
table_id = self.upstream_table.table_id().table_id,
"skip backfill"
);
assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_recv_barrier);
assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
(first_upstream_barrier.epoch, false)
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
if let Message::Barrier(barrier) = &msg {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
match msg {
Message::Barrier(barrier) => {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
}
yield Message::Barrier(barrier);
if let Some(new_vnode_bitmap) = update_vnode_bitmap {
let _prev_vnode_bitmap = self
.upstream_table
.update_vnode_bitmap(new_vnode_bitmap.clone());
}
}
msg => {
yield msg;
}
}
yield msg;
}
}
}
Expand All @@ -299,56 +316,6 @@ impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
}
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn read_change_log(
stream: impl Stream<Item = StorageResult<ChangeLogRow>>,
mut builder: DataChunkBuilder,
) {
let chunk_size = builder.batch_size();
pin_mut!(stream);
let mut ops = Vec::with_capacity(chunk_size);
while let Some(change_log_row) = stream.try_next().await? {
let change_log_row: ChangeLogRow = change_log_row;
match change_log_row {
ChangeLogRow::Insert(row) => {
ops.push(Op::Insert);
if let Some(chunk) = builder.append_one_row(row) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ChangeLogRow::Update {
old_value,
new_value,
} => {
if !builder.can_append(2) {
if let Some(chunk) = builder.consume_all() {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
assert!(builder.append_one_row(old_value).is_none());
if let Some(chunk) = builder.append_one_row(new_value) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
ChangeLogRow::Delete(row) => {
ops.push(Op::Delete);
if let Some(chunk) = builder.append_one_row(row) {
let ops = replace(&mut ops, Vec::with_capacity(chunk_size));
yield StreamChunk::from_parts(ops, chunk);
}
}
}
}

if let Some(chunk) = builder.consume_all() {
yield StreamChunk::from_parts(ops, chunk);
}
}

struct ConsumingSnapshot;
struct ConsumingLogStore;

Expand Down Expand Up @@ -515,22 +482,69 @@ async fn receive_next_barrier(
.ok_or_else(|| anyhow!("end of barrier receiver"))?)
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
row_stream: impl Stream<Item = StreamExecutorResult<OwnedRow>> + 'a,
mut builder: DataChunkBuilder,
) {
pin_mut!(row_stream);
while let Some(row) = row_stream.try_next().await? {
if let Some(data_chunk) = builder.append_one_row(row) {
let ops = vec![Op::Insert; data_chunk.capacity()];
yield StreamChunk::from_parts(ops, data_chunk);
}
}
if let Some(data_chunk) = builder.consume_all() {
let ops = vec![Op::Insert; data_chunk.capacity()];
yield StreamChunk::from_parts(ops, data_chunk);
}
async fn make_log_stream(
upstream_table: &StorageTable<impl StateStore>,
prev_epoch: u64,
start_pk: Option<OwnedRow>,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::BackfillRowStream>> {
let data_types = upstream_table.schema().data_types();
let start_pk = start_pk.as_ref();
let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
upstream_table
.batch_iter_vnode_log(
prev_epoch,
HummockReadEpoch::Committed(prev_epoch),
start_pk,
vnode,
)
.map_ok(move |stream| {
let stream = stream.map(|result| {
Ok(match result? {
ChangeLogRow::Insert(row) => ((Op::Insert, row), None),
ChangeLogRow::Update {
new_value,
old_value,
} => (
(Op::UpdateDelete, old_value),
Some((Op::UpdateInsert, new_value)),
),
ChangeLogRow::Delete(row) => ((Op::Delete, row), None),
})
});
(vnode, stream)
})
}))
.await?;
let builder = create_builder(None, chunk_size, data_types.clone());
Ok(VnodeStream::new(vnode_streams, builder))
}

async fn make_snapshot_stream(
upstream_table: &StorageTable<impl StateStore>,
snapshot_epoch: u64,
start_pk: Option<OwnedRow>,
rate_limit: Option<usize>,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::BackfillRowStream>> {
let data_types = upstream_table.schema().data_types();
let start_pk = start_pk.as_ref();
let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
upstream_table
.batch_iter_vnode(
HummockReadEpoch::Committed(snapshot_epoch),
start_pk,
vnode,
PrefetchOptions::prefetch_for_large_range_scan(),
)
.map_ok(move |stream| {
let stream = stream.map(|result| Ok(((Op::Insert, result?), None)));
(vnode, stream)
})
}))
.await?;
let builder = create_builder(rate_limit, chunk_size, data_types.clone());
Ok(VnodeStream::new(vnode_streams, builder))
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -541,10 +555,9 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
progress: &'a mut CreateMviewProgressReporter,
first_recv_barrier: Barrier,
first_recv_barrier_epoch: EpochPair,
) {
let mut barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);
let mut barrier_epoch = first_recv_barrier_epoch;

info!(
table_id = upstream_table.table_id().table_id,
Expand All @@ -553,15 +566,8 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
);

// start consume upstream snapshot
let snapshot_row_stream = BackfillExecutor::snapshot_read(
upstream_table,
HummockReadEpoch::Committed(snapshot_epoch),
None,
);
let data_types = upstream_table.schema().data_types();
let builder = create_builder(rate_limit, chunk_size, data_types.clone());
let snapshot_stream = make_snapshot_stream(snapshot_row_stream, builder);
pin_mut!(snapshot_stream);
let mut snapshot_stream =
make_snapshot_stream(upstream_table, snapshot_epoch, None, rate_limit, chunk_size).await?;

async fn select_barrier_and_snapshot_stream(
barrier_rx: &mut UnboundedReceiver<Barrier>,
Expand Down Expand Up @@ -599,9 +605,10 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
if barrier_epoch.curr >= snapshot_epoch {
return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
}
debug!(?barrier_epoch, count, "update progress");
debug!(?barrier_epoch, count, epoch_row_count, "update progress");
progress.update(barrier_epoch, barrier_epoch.prev, count as _);
epoch_row_count = 0;

yield Message::Barrier(barrier);
}
Either::Right(Some(chunk)) => {
Expand Down
18 changes: 18 additions & 0 deletions src/stream/src/executor/backfill/snapshot_backfill/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod executor;
mod vnode_stream;

pub use executor::SnapshotBackfillExecutor;
Loading
Loading