Skip to content

Commit

Permalink
extract common merge vnode logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 10, 2024
1 parent 5cbf414 commit a3f0881
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 94 deletions.
211 changes: 118 additions & 93 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::Arc;

use auto_enums::auto_enum;
use await_tree::InstrumentAwait;
use bytes::{Bytes, BytesMut};
use foyer::CacheHint;
use futures::future::try_join_all;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use more_asserts::assert_gt;
Expand All @@ -37,8 +37,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, next_key, prefixed_range_with_vnode, CopyFromSlice, TableKey,
TableKeyRange,
end_bound_of_prefix, next_key, prefixed_range_with_vnode, CopyFromSlice, TableKeyRange,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
Expand All @@ -53,8 +52,8 @@ use crate::store::{
PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt,
TryWaitEpochOptions,
};
use crate::table::merge_sort::merge_sort;
use crate::table::{ChangeLogRow, KeyedChangeLogRow, KeyedRow, TableDistribution, TableIter};
use crate::table::merge_sort::NodePeek;
use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
use crate::StateStore;

/// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with
Expand Down Expand Up @@ -496,8 +495,94 @@ impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
}
}

// TODO: may use `Vec`
type SortKeyType = Bytes;
mod merge_vnode_stream {

use bytes::Bytes;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_hummock_sdk::key::TableKey;

use crate::error::StorageResult;
use crate::table::merge_sort::{merge_sort, NodePeek};
use crate::table::KeyedRow;

pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
Single(RowSt),
Unordered(Vec<RowSt>),
Ordered(Vec<KeyedRowSt>),
}

pub(super) type MergedVnodeStream<
R: Send,
RowSt: Stream<Item = StorageResult<((), R)>> + Send,
KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
>
where
KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
= impl Stream<Item = StorageResult<R>> + Send;

pub(super) type SortKeyType = Bytes; // TODO: may use Vec

pub(super) fn merge_stream<
R: Send,
RowSt: Stream<Item = StorageResult<((), R)>> + Send,
KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
>(
stream: VnodeStreamType<RowSt, KeyedRowSt>,
) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
where
KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
{
#[auto_enums::auto_enum(futures03::Stream)]
match stream {
VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
VnodeStreamType::Unordered(streams) => futures::stream::iter(
streams
.into_iter()
.map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
)
.flatten_unordered(1024),
VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
Box::pin(stream.map_ok(|(key, row)| KeyedRow {
vnode_prefixed_key: TableKey(key),
row,
}))
}))
.map_ok(|keyed_row| keyed_row.row),
}
}
}

use merge_vnode_stream::*;

async fn build_vnode_stream<
R: Send,
RowSt: Stream<Item = StorageResult<((), R)>> + Send,
KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
RowStFut: Future<Output = StorageResult<RowSt>>,
KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
>(
row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
vnodes: &[VirtualNode],
ordered: bool,
) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
where
KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
{
let stream = match vnodes {
[] => unreachable!(),
[vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
// Concat all iterators if not to preserve order.
vnodes if !ordered => VnodeStreamType::Unordered(
try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
),
// Merge all iterators if to preserve order.
vnodes => VnodeStreamType::Ordered(
try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
),
};
Ok(merge_stream(stream))
}

/// Iterators
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Expand Down Expand Up @@ -527,59 +612,29 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
None => self.distribution.vnodes().iter_vnodes().collect_vec(),
};

// For each key range, construct an iterator.
#[auto_enum(futures03::Stream)]
let iter = match vnodes.as_slice() {
[] => unreachable!(),
[vnode] => self
.iter_vnode_with_encoded_key_range::<()>(
build_vnode_stream(
|vnode| {
self.iter_vnode_with_encoded_key_range(
prefix_hint.clone(),
(start_bound.as_ref(), end_bound.as_ref()),
wait_epoch,
*vnode,
vnode,
prefetch_options,
)
.await?
.map_ok(|(_, row)| row),
// Concat all iterators if not to preserve order.
vnodes if !ordered => futures::stream::iter(
try_join_all(vnodes.iter().map(|vnode| {
self.iter_vnode_with_encoded_key_range::<()>(
prefix_hint.clone(),
(start_bound.as_ref(), end_bound.as_ref()),
wait_epoch,
*vnode,
prefetch_options,
)
}))
.await?
.into_iter()
.map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
)
.flatten_unordered(1024),
// Merge all iterators if to preserve order.
vnodes => merge_sort(
try_join_all(vnodes.iter().map(|vnode| {
self.iter_vnode_with_encoded_key_range::<SortKeyType>(
prefix_hint.clone(),
(start_bound.as_ref(), end_bound.as_ref()),
wait_epoch,
*vnode,
prefetch_options,
)
.map_ok(|stream| {
Box::pin(stream.map_ok(|(key, row)| KeyedRow {
vnode_prefixed_key: TableKey(key),
row,
}))
})
}))
.await?,
)
.map_ok(|keyed_row| keyed_row.row),
};

Ok(iter)
},
|vnode| {
self.iter_vnode_with_encoded_key_range(
prefix_hint.clone(),
(start_bound.as_ref(), end_bound.as_ref()),
wait_epoch,
vnode,
prefetch_options,
)
},
&vnodes,
ordered,
)
.await
}

async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
Expand Down Expand Up @@ -923,43 +978,13 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
ordered: bool,
) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static> {
let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();

#[auto_enum(futures03::Stream)]
let iter = match vnodes.as_slice() {
[] => unreachable!(),
[vnode] => {
let stream = self
.batch_iter_log_inner::<()>(start_epoch, end_epoch, None, *vnode)
.await?;
stream.map_ok(|(_, row)| row)
}
// Concat all iterators if not to preserve order.
vnodes if !ordered => futures::stream::iter(
try_join_all(vnodes.iter().map(|vnode| {
self.batch_iter_log_inner::<()>(start_epoch, end_epoch, None, *vnode)
.map_ok(|stream| Box::pin(stream.map_ok(|(_, row)| row)))
}))
.await?,
)
.flatten_unordered(1024),
// Merge all iterators if to preserve order.
vnodes => merge_sort(
try_join_all(vnodes.iter().map(|vnode| {
self.batch_iter_log_inner::<SortKeyType>(start_epoch, end_epoch, None, *vnode)
.map_ok(|stream| {
Box::pin(
stream.map_ok(|(key, row)| {
KeyedChangeLogRow::new(TableKey(key), row)
}),
)
})
}))
.await?,
)
.map_ok(|key_row| key_row.into_owned_row()),
};

Ok(iter)
build_vnode_stream(
|vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
|vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
&vnodes,
ordered,
)
.await
}

/// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/table/merge_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<S, R: NodePeek> Ord for Node<S, R> {
}

#[try_stream(ok=KO, error=E)]
pub async fn merge_sort<E, KO, R>(streams: Vec<R>)
pub async fn merge_sort<E, KO, R>(streams: impl IntoIterator<Item = R>)
where
KO: NodePeek + Send + Sync,
E: Error,
Expand Down

0 comments on commit a3f0881

Please sign in to comment.