diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 6b7c8f921bd0f..2e8511af1c071 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::ops::RangeBounds; use std::sync::Arc; -use auto_enums::auto_enum; use await_tree::InstrumentAwait; use bytes::{Bytes, BytesMut}; use foyer::CacheHint; use futures::future::try_join_all; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use more_asserts::assert_gt; @@ -37,8 +37,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde}; use risingwave_hummock_sdk::key::{ - end_bound_of_prefix, next_key, prefixed_range_with_vnode, CopyFromSlice, TableKey, - TableKeyRange, + end_bound_of_prefix, next_key, prefixed_range_with_vnode, CopyFromSlice, TableKeyRange, }; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::plan_common::StorageTableDesc; @@ -53,8 +52,8 @@ use crate::store::{ PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt, TryWaitEpochOptions, }; -use crate::table::merge_sort::merge_sort; -use crate::table::{ChangeLogRow, KeyedChangeLogRow, KeyedRow, TableDistribution, TableIter}; +use crate::table::merge_sort::NodePeek; +use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -496,8 +495,94 @@ impl> + Send + Unpin> TableIter for S { } } -// TODO: may use `Vec` -type SortKeyType = Bytes; +mod merge_vnode_stream { + + use bytes::Bytes; + use futures::{Stream, StreamExt, TryStreamExt}; + use risingwave_hummock_sdk::key::TableKey; + + use crate::error::StorageResult; + use crate::table::merge_sort::{merge_sort, NodePeek}; + use crate::table::KeyedRow; + + pub(super) enum VnodeStreamType { + Single(RowSt), + Unordered(Vec), + Ordered(Vec), + } + + pub(super) type MergedVnodeStream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + > + where + KeyedRow: NodePeek + Send + Sync, + = impl Stream> + Send; + + pub(super) type SortKeyType = Bytes; // TODO: may use Vec + + pub(super) fn merge_stream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + >( + stream: VnodeStreamType, + ) -> MergedVnodeStream + where + KeyedRow: NodePeek + Send + Sync, + { + #[auto_enums::auto_enum(futures03::Stream)] + match stream { + VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row), + VnodeStreamType::Unordered(streams) => futures::stream::iter( + streams + .into_iter() + .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))), + ) + .flatten_unordered(1024), + VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| { + Box::pin(stream.map_ok(|(key, row)| KeyedRow { + vnode_prefixed_key: TableKey(key), + row, + })) + })) + .map_ok(|keyed_row| keyed_row.row), + } + } +} + +use merge_vnode_stream::*; + +async fn build_vnode_stream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + RowStFut: Future>, + KeyedRowStFut: Future>, +>( + row_stream_fn: impl Fn(VirtualNode) -> RowStFut, + keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut, + vnodes: &[VirtualNode], + ordered: bool, +) -> StorageResult> +where + KeyedRow: NodePeek + Send + Sync, +{ + let stream = match vnodes { + [] => unreachable!(), + [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?), + // Concat all iterators if not to preserve order. + vnodes if !ordered => VnodeStreamType::Unordered( + try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?, + ), + // Merge all iterators if to preserve order. + vnodes => VnodeStreamType::Ordered( + try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?, + ), + }; + Ok(merge_stream(stream)) +} /// Iterators impl StorageTableInner { @@ -527,59 +612,29 @@ impl StorageTableInner { None => self.distribution.vnodes().iter_vnodes().collect_vec(), }; - // For each key range, construct an iterator. - #[auto_enum(futures03::Stream)] - let iter = match vnodes.as_slice() { - [] => unreachable!(), - [vnode] => self - .iter_vnode_with_encoded_key_range::<()>( + build_vnode_stream( + |vnode| { + self.iter_vnode_with_encoded_key_range( prefix_hint.clone(), (start_bound.as_ref(), end_bound.as_ref()), wait_epoch, - *vnode, + vnode, prefetch_options, ) - .await? - .map_ok(|(_, row)| row), - // Concat all iterators if not to preserve order. - vnodes if !ordered => futures::stream::iter( - try_join_all(vnodes.iter().map(|vnode| { - self.iter_vnode_with_encoded_key_range::<()>( - prefix_hint.clone(), - (start_bound.as_ref(), end_bound.as_ref()), - wait_epoch, - *vnode, - prefetch_options, - ) - })) - .await? - .into_iter() - .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))), - ) - .flatten_unordered(1024), - // Merge all iterators if to preserve order. - vnodes => merge_sort( - try_join_all(vnodes.iter().map(|vnode| { - self.iter_vnode_with_encoded_key_range::( - prefix_hint.clone(), - (start_bound.as_ref(), end_bound.as_ref()), - wait_epoch, - *vnode, - prefetch_options, - ) - .map_ok(|stream| { - Box::pin(stream.map_ok(|(key, row)| KeyedRow { - vnode_prefixed_key: TableKey(key), - row, - })) - }) - })) - .await?, - ) - .map_ok(|keyed_row| keyed_row.row), - }; - - Ok(iter) + }, + |vnode| { + self.iter_vnode_with_encoded_key_range( + prefix_hint.clone(), + (start_bound.as_ref(), end_bound.as_ref()), + wait_epoch, + vnode, + prefetch_options, + ) + }, + &vnodes, + ordered, + ) + .await } async fn iter_vnode_with_encoded_key_range( @@ -923,43 +978,13 @@ impl StorageTableInner { ordered: bool, ) -> StorageResult> + Send + 'static> { let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec(); - - #[auto_enum(futures03::Stream)] - let iter = match vnodes.as_slice() { - [] => unreachable!(), - [vnode] => { - let stream = self - .batch_iter_log_inner::<()>(start_epoch, end_epoch, None, *vnode) - .await?; - stream.map_ok(|(_, row)| row) - } - // Concat all iterators if not to preserve order. - vnodes if !ordered => futures::stream::iter( - try_join_all(vnodes.iter().map(|vnode| { - self.batch_iter_log_inner::<()>(start_epoch, end_epoch, None, *vnode) - .map_ok(|stream| Box::pin(stream.map_ok(|(_, row)| row))) - })) - .await?, - ) - .flatten_unordered(1024), - // Merge all iterators if to preserve order. - vnodes => merge_sort( - try_join_all(vnodes.iter().map(|vnode| { - self.batch_iter_log_inner::(start_epoch, end_epoch, None, *vnode) - .map_ok(|stream| { - Box::pin( - stream.map_ok(|(key, row)| { - KeyedChangeLogRow::new(TableKey(key), row) - }), - ) - }) - })) - .await?, - ) - .map_ok(|key_row| key_row.into_owned_row()), - }; - - Ok(iter) + build_vnode_stream( + |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), + |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), + &vnodes, + ordered, + ) + .await } /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. diff --git a/src/storage/src/table/merge_sort.rs b/src/storage/src/table/merge_sort.rs index 44b5e03dd4b0d..b4443134ab89c 100644 --- a/src/storage/src/table/merge_sort.rs +++ b/src/storage/src/table/merge_sort.rs @@ -72,7 +72,7 @@ impl Ord for Node { } #[try_stream(ok=KO, error=E)] -pub async fn merge_sort(streams: Vec) +pub async fn merge_sort(streams: impl IntoIterator) where KO: NodePeek + Send + Sync, E: Error,