diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 154ac0c260fb6..334ea018617e8 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1150,6 +1150,11 @@ pub struct StreamingDeveloperConfig { /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..." /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks. pub switch_jdbc_pg_to_native: bool, + + /// Configure the system-wide cache row cardinality of hash join. + /// For example, if this is set to 1000, it means we can have at most 1000 rows in cache. + #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")] + pub hash_join_entry_state_max_rows: usize, } /// The subsections `[batch.developer]`. @@ -2113,6 +2118,11 @@ pub mod default { pub fn switch_jdbc_pg_to_native() -> bool { false } + + pub fn streaming_hash_join_entry_state_max_rows() -> usize { + // NOTE(kwannoel): This is just an arbitrary number. + 30000 + } } pub use crate::system_param::default as system; diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index cfd81aea06a15..62e3b671cb796 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -11,13 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::{BTreeMap, HashSet}; use std::time::Duration; +use anyhow::Context; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::row::RowExt; use risingwave_common::types::{DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -31,6 +34,7 @@ use super::join::hash_join::*; use super::join::row::JoinRow; use super::join::*; use super::watermark::*; +use crate::consistency::enable_strict_consistency; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::prelude::*; @@ -157,7 +161,11 @@ pub struct HashJoinExecutor `BufferedWatermarks` watermark_buffers: BTreeMap>, + /// When to alert high join amplification high_join_amplification_threshold: usize, + + /// Max number of rows that will be cached in the entry state. + entry_state_max_rows: usize, } impl std::fmt::Debug @@ -195,6 +203,12 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> { chunk_size: usize, cnt_rows_received: &'a mut u32, high_join_amplification_threshold: usize, + entry_state_max_rows: usize, +} + +enum CacheResult { + Miss, // Cache-miss + Hit(Option), // Cache-hit, no match or match. } impl HashJoinExecutor { @@ -220,6 +234,10 @@ impl HashJoinExecutor Self { + let entry_state_max_rows = ctx + .streaming_config + .developer + .hash_join_entry_state_max_rows; let side_l_column_n = input_l.schema().len(); let schema_fields = match T { @@ -447,6 +465,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor, ) -> StreamExecutorResult> { + // If the key contains null values, but these are not null-safe on the match side predicate, + // we will never match. So just return `None` to indicate this. if !key.null_bitmap().is_subset(ht.null_matched()) { Ok(None) } else { @@ -733,10 +756,22 @@ impl HashJoinExecutor) -> CacheResult { + if !key.null_bitmap().is_subset(ht.null_matched()) { + CacheResult::Hit(None) + } else if let Some(state) = ht.take_state_opt(key) { + CacheResult::Hit(Some(state)) + } else { + CacheResult::Miss + } + } + fn row_concat( - row_update: &RowRef<'_>, + row_update: impl Row, update_start_pos: usize, - row_matched: &OwnedRow, + row_matched: impl Row, matched_start_pos: usize, ) -> OwnedRow { let mut new_row = vec![None; row_update.len() + row_matched.len()]; @@ -744,8 +779,8 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor = if side_update + let key_satisfies_non_null_requirement = side_update .non_null_fields .iter() - .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() }) - { - Self::hash_eq_match(key, &mut side_match.ht).await? + .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() }); + + let cache_lookup_result = if key_satisfies_non_null_requirement { + if enable_strict_consistency() { + Self::hash_eq_match_opt(key, &mut side_match.ht) + } else { + let result = Self::hash_eq_match(key, &mut side_match.ht).await?; + CacheResult::Hit(result) + } } else { - None + CacheResult::Hit(None) }; - if let Some(rows) = &matched_rows { - join_matched_join_keys.observe(rows.len() as _); - if rows.len() > high_join_amplification_threshold { - let join_key_data_types = side_update.ht.join_key_data_types(); - let key = key.deserialize(join_key_data_types)?; - tracing::warn!(target: "high_join_amplification", - matched_rows_len = rows.len(), - update_table_id = side_update.ht.table_id(), - match_table_id = side_match.ht.table_id(), - join_key = ?key, - actor_id = ctx.id, - fragment_id = ctx.fragment_id, - "large rows matched for join key" - ); + let mut total_matches = 0; + + let (cache_hit, rows) = match cache_lookup_result { + CacheResult::Hit(rows) => (true, rows), + CacheResult::Miss => (false, None), + }; + + macro_rules! match_rows { + ($op:ident, $from_cache:literal) => { + Self::handle_match_rows::( + rows, + row, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + entry_state_max_rows, + ) + }; + } + + // NOTE(kwannoel): The performance might be slightly worse because of the `box`. + // But the code readability is better. + let stream = match (cache_hit, op) { + (true, Op::Insert | Op::UpdateInsert) => match_rows!(Insert, true).boxed(), + (true, Op::Delete | Op::UpdateDelete) => match_rows!(Delete, true).boxed(), + (false, Op::Insert | Op::UpdateInsert) => match_rows!(Insert, false).boxed(), + (false, Op::Delete | Op::UpdateDelete) => match_rows!(Delete, false).boxed(), + }; + + #[for_await] + for chunk in stream { + let chunk = chunk?; + total_matches += chunk.cardinality(); + yield chunk; + } + + join_matched_join_keys.observe(total_matches as _); + if total_matches > high_join_amplification_threshold { + let join_key_data_types = side_update.ht.join_key_data_types(); + let key = key.deserialize(join_key_data_types)?; + tracing::warn!(target: "high_join_amplification", + matched_rows_len = total_matches, + update_table_id = side_update.ht.table_id(), + match_table_id = side_match.ht.table_id(), + join_key = ?key, + actor_id = ctx.id, + fragment_id = ctx.fragment_id, + "large rows matched for join key" + ); + } + } + // NOTE(kwannoel): We don't track metrics for this last chunk. + if let Some(chunk) = hashjoin_chunk_builder.take() { + yield chunk; + } + } + + /// For the probe-side row, we need to check if it has values in cache, if not, we need to + /// fetch the matched rows from the state table. + /// + /// Every matched build-side row being processed needs to go through the following phases: + /// 1. Handle join condition evaluation. + /// 2. Always do cache refill, if the state count is good. + /// 3. Handle state cleaning. + /// 4. Handle degree table update. + #[allow(clippy::too_many_arguments)] + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn handle_match_rows< + 'a, + const SIDE: SideTypePrimitive, + const JOIN_OP: JoinOpPrimitive, + const MATCHED_ROWS_FROM_CACHE: bool, + >( + cached_rows: Option, + row: RowRef<'a>, + key: &'a K, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + side_match: &'a mut JoinSide, + side_update: &'a mut JoinSide, + useful_state_clean_columns: &'a [(usize, &'a Watermark)], + cond: &'a mut Option, + append_only_optimize: bool, + entry_state_max_rows: usize, + ) { + let mut entry_state = JoinEntryState::default(); + let mut entry_state_count = 0; + + let mut degree = 0; + let mut append_only_matched_row: Option> = None; + let mut matched_rows_to_clean = vec![]; + + macro_rules! match_row { + ($degree_table:expr, $matched_row:expr, $matched_row_ref:expr) => { + Self::handle_match_row::( + row, + $matched_row, + $matched_row_ref, + hashjoin_chunk_builder, + $degree_table, + side_update.start_pos, + side_match.start_pos, + cond, + &mut degree, + useful_state_clean_columns, + append_only_optimize, + &mut append_only_matched_row, + &mut matched_rows_to_clean, + ) + }; + } + + let entry_state = if MATCHED_ROWS_FROM_CACHE { + let Some(mut cached_rows) = cached_rows else { + // Handle rows with null-datums, these rows will never match. + let op = match JOIN_OP { + JoinOp::Insert => Op::Insert, + JoinOp::Delete => Op::Delete, + }; + if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) { + yield chunk; + } + return Ok(()); + }; + for (matched_row_ref, matched_row) in cached_rows.values_mut(&side_match.all_data_types) + { + let matched_row = matched_row?; + if let Some(chunk) = match_row!( + side_match.ht.get_degree_state_mut_ref(), + matched_row, + Some(matched_row_ref) + ) + .await + { + yield chunk; } - } else { - join_matched_join_keys.observe(0.0) } - match op { - Op::Insert | Op::UpdateInsert => { - let mut degree = 0; - let mut append_only_matched_row: Option> = None; - if let Some(mut matched_rows) = matched_rows { - let mut matched_rows_to_clean = vec![]; - for (matched_row_ref, matched_row) in - matched_rows.values_mut(&side_match.all_data_types) - { - let mut matched_row = matched_row?; - // TODO(yuhao-su): We should find a better way to eval the expression - // without concat two rows. - // if there are non-equi expressions - let check_join_condition = if let Some(ref mut cond) = cond { - let new_row = Self::row_concat( - &row, - side_update.start_pos, - &matched_row.row, - side_match.start_pos, - ); - - cond.eval_row_infallible(&new_row) - .await - .map(|s| *s.as_bool()) - .unwrap_or(false) - } else { - true - }; - let mut need_state_clean = false; - if check_join_condition { - degree += 1; - if !forward_exactly_once(T, SIDE) { - if let Some(chunk) = hashjoin_chunk_builder - .with_match_on_insert(&row, &matched_row) - { - yield chunk; - } - } - if side_match.need_degree_table { - side_match.ht.inc_degree(matched_row_ref, &mut matched_row); - } - } else { - for (column_idx, watermark) in &useful_state_clean_columns { - if matched_row.row.datum_at(*column_idx).map_or( - false, - |scalar| { - scalar - .default_cmp(&watermark.val.as_scalar_ref_impl()) - .is_lt() - }, - ) { - need_state_clean = true; - break; - } - } - } - // If the stream is append-only and the join key covers pk in both side, - // then we can remove matched rows since pk is unique and will not be - // inserted again - if append_only_optimize { - // Since join key contains pk and pk is unique, there should be only - // one row if matched. - assert!(append_only_matched_row.is_none()); - append_only_matched_row = Some(matched_row); - } else if need_state_clean { - // `append_only_optimize` and `need_state_clean` won't both be true. - // 'else' here is only to suppress compiler error. - matched_rows_to_clean.push(matched_row); - } - } - if degree == 0 { - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) - { - yield chunk; - } - } else if let Some(chunk) = - hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row) - { - yield chunk; - } - // Insert back the state taken from ht. - side_match.ht.update_state(key, matched_rows); - for matched_row in matched_rows_to_clean { - if side_match.need_degree_table { - side_match.ht.delete(key, matched_row)?; - } else { - side_match.ht.delete_row(key, matched_row.row)?; - } - } + cached_rows + } else { + let (matched_rows, degree_table) = side_match + .ht + .fetch_matched_rows_and_get_degree_table_ref(key) + .await?; + + #[for_await] + for matched_row in matched_rows { + let (encoded_pk, matched_row) = matched_row?; + + // cache refill + if entry_state_count <= entry_state_max_rows { + entry_state + .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join. + .with_context(|| format!("row: {}", row.display(),))?; + entry_state_count += 1; + } + if let Some(chunk) = match_row!(degree_table, matched_row, None).await { + yield chunk; + } + } + Box::new(entry_state) + }; + + // forward rows depending on join types + let op = match JOIN_OP { + JoinOp::Insert => Op::Insert, + JoinOp::Delete => Op::Delete, + }; + if degree == 0 { + if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) { + yield chunk; + } + } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row) + { + yield chunk; + } + + // cache refill + if MATCHED_ROWS_FROM_CACHE || entry_state_count <= entry_state_max_rows { + side_match.ht.update_state(key, entry_state); + } - if append_only_optimize && let Some(row) = append_only_matched_row { - if side_match.need_degree_table { - side_match.ht.delete(key, row)?; - } else { - side_match.ht.delete_row(key, row.row)?; - } - } else if side_update.need_degree_table { - side_update.ht.insert(key, JoinRow::new(row, degree))?; - } else { - side_update.ht.insert_row(key, row)?; + // watermark state cleaning + for matched_row in matched_rows_to_clean { + side_match.ht.delete_handle_degree(key, matched_row)?; + } + + // apply append_only optimization to clean matched_rows which have been persisted + if append_only_optimize && let Some(row) = append_only_matched_row { + assert_matches!(JOIN_OP, JoinOp::Insert); + side_match.ht.delete_handle_degree(key, row)?; + return Ok(()); + } + + // no append_only optimization, update state table(s). + match JOIN_OP { + JoinOp::Insert => { + side_update + .ht + .insert_handle_degree(key, JoinRow::new(row, degree))?; + } + JoinOp::Delete => { + side_update + .ht + .delete_handle_degree(key, JoinRow::new(row, degree))?; + } + } + } + + #[allow(clippy::too_many_arguments)] + async fn handle_match_row< + 'a, + const SIDE: SideTypePrimitive, + const JOIN_OP: JoinOpPrimitive, + const MATCHED_ROWS_FROM_CACHE: bool, + >( + update_row: RowRef<'a>, + mut matched_row: JoinRow, + mut matched_row_cache_ref: Option<&mut StateValueType>, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + degree_table: &mut Option>, + side_update_start_pos: usize, + side_match_start_pos: usize, + cond: &Option, + update_row_degree: &mut u64, + useful_state_clean_columns: &[(usize, &'a Watermark)], + append_only_optimize: bool, + append_only_matched_row: &mut Option>, + matched_rows_to_clean: &mut Vec>, + ) -> Option { + // check join cond + let join_condition_satisfied = Self::check_join_condition( + update_row, + side_update_start_pos, + &matched_row.row, + side_match_start_pos, + cond, + ) + .await; + let mut need_state_clean = false; + let mut chunk_opt = None; + if join_condition_satisfied { + // update degree + *update_row_degree += 1; + if let Some(degree_table) = degree_table { + update_degree::(degree_table, &mut matched_row); + if MATCHED_ROWS_FROM_CACHE { + // update matched row in cache + match JOIN_OP { + JoinOp::Insert => { + matched_row_cache_ref.as_mut().unwrap().degree += 1; } - } else { - // Row which violates null-safe bitmap will never be matched so we need not - // store. - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) - { - yield chunk; + JoinOp::Delete => { + matched_row_cache_ref.as_mut().unwrap().degree -= 1; } } } - Op::Delete | Op::UpdateDelete => { - let mut degree = 0; - if let Some(mut matched_rows) = matched_rows { - let mut matched_rows_to_clean = vec![]; - for (matched_row_ref, matched_row) in - matched_rows.values_mut(&side_match.all_data_types) - { - let mut matched_row = matched_row?; - // TODO(yuhao-su): We should find a better way to eval the expression - // without concat two rows. - // if there are non-equi expressions - let check_join_condition = if let Some(ref mut cond) = cond { - let new_row = Self::row_concat( - &row, - side_update.start_pos, - &matched_row.row, - side_match.start_pos, - ); - - cond.eval_row_infallible(&new_row) - .await - .map(|s| *s.as_bool()) - .unwrap_or(false) - } else { - true - }; - let mut need_state_clean = false; - if check_join_condition { - degree += 1; - if side_match.need_degree_table { - side_match.ht.dec_degree(matched_row_ref, &mut matched_row); - } - if !forward_exactly_once(T, SIDE) { - if let Some(chunk) = hashjoin_chunk_builder - .with_match_on_delete(&row, &matched_row) - { - yield chunk; - } - } - } else { - for (column_idx, watermark) in &useful_state_clean_columns { - if matched_row.row.datum_at(*column_idx).map_or( - false, - |scalar| { - scalar - .default_cmp(&watermark.val.as_scalar_ref_impl()) - .is_lt() - }, - ) { - need_state_clean = true; - break; - } - } - } - if need_state_clean { - matched_rows_to_clean.push(matched_row); - } - } - if degree == 0 { - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) - { - yield chunk; - } - } else if let Some(chunk) = - hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row) - { - yield chunk; - } - // Insert back the state taken from ht. - side_match.ht.update_state(key, matched_rows); - for matched_row in matched_rows_to_clean { - if side_match.need_degree_table { - side_match.ht.delete(key, matched_row)?; - } else { - side_match.ht.delete_row(key, matched_row.row)?; - } - } - - if append_only_optimize { - unreachable!(); - } else if side_update.need_degree_table { - side_update.ht.delete(key, JoinRow::new(row, degree))?; - } else { - side_update.ht.delete_row(key, row)?; - }; - } else { - // We do not store row which violates null-safe bitmap. - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) - { - yield chunk; - } - } + } + // send matched row downstream + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match::(&update_row, &matched_row) + { + chunk_opt = Some(chunk); + } + } + } else { + // check if need state cleaning + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; } } } - if let Some(chunk) = hashjoin_chunk_builder.take() { - yield chunk; + // If the stream is append-only and the join key covers pk in both side, + // then we can remove matched rows since pk is unique and will not be + // inserted again + if append_only_optimize { + assert_matches!(JOIN_OP, JoinOp::Insert); + // Since join key contains pk and pk is unique, there should be only + // one row if matched. + assert!(append_only_matched_row.is_none()); + *append_only_matched_row = Some(matched_row); + } else if need_state_clean { + // `append_only_optimize` and `need_state_clean` won't both be true. + // 'else' here is only to suppress compiler error, otherwise + // `matched_row` will be moved twice. + matched_rows_to_clean.push(matched_row); + } + + chunk_opt + } + + // TODO(yuhao-su): We should find a better way to eval the expression + // without concat two rows. + // if there are non-equi expressions + // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`. + async fn check_join_condition( + row: impl Row, + side_update_start_pos: usize, + matched_row: impl Row, + side_match_start_pos: usize, + join_condition: &Option, + ) -> bool { + if let Some(ref join_condition) = join_condition { + let new_row = Self::row_concat( + row, + side_update_start_pos, + matched_row, + side_match_start_pos, + ); + join_condition + .eval_row_infallible(&new_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true } } } diff --git a/src/stream/src/executor/join/builder.rs b/src/stream/src/executor/join/builder.rs index c39e385fdee96..9b65adb2ba63f 100644 --- a/src/stream/src/executor/join/builder.rs +++ b/src/stream/src/executor/join/builder.rs @@ -199,6 +199,20 @@ impl JoinChunkBuilder c.into() } + /// TODO(kwannoel): We can actually reuse a lot of the logic between `with_match_on_insert` + /// and `with_match_on_delete`. We should refactor this to avoid code duplication. + /// We just introduce this wrapper function to avoid large code diffs. + pub fn with_match( + &mut self, + row: &RowRef<'_>, + matched_row: &JoinRow, + ) -> Option { + match OP { + JoinOp::Insert => self.with_match_on_insert(row, matched_row), + JoinOp::Delete => self.with_match_on_delete(row, matched_row), + } + } + pub fn with_match_on_insert( &mut self, row: &RowRef<'_>, diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 1ee3694be4d03..641adcf907e3c 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::alloc::Global; use std::cmp::Ordering; use std::ops::{Bound, Deref, DerefMut, RangeBounds}; @@ -26,7 +25,7 @@ use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::metrics::LabelGuardedIntCounter; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::row::{once, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; @@ -45,6 +44,7 @@ use crate::consistency::{consistency_error, consistency_panic, enable_strict_con use crate::executor::error::StreamExecutorResult; use crate::executor::join::row::JoinRow; use crate::executor::monitor::StreamingMetrics; +use crate::executor::StreamExecutorError; use crate::task::{ActorId, AtomicU64Ref, FragmentId}; /// Memcomparable encoding. @@ -224,12 +224,151 @@ pub struct JoinHashMap { metrics: JoinHashMapMetrics, } +impl JoinHashMap { + pub(crate) fn get_degree_state_mut_ref(&mut self) -> &mut Option> { + &mut self.degree_state + } + + /// NOTE(kwannoel): This allows us to concurrently stream records from the `state_table`, + /// and update the degree table, without using `unsafe` code. + /// + /// This is because we obtain separate references to separate parts of the `JoinHashMap`, + /// instead of reusing the same reference to `JoinHashMap` for concurrent read access to `state_table`, + /// and write access to the degree table. + pub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>( + &'a mut self, + key: &'a K, + ) -> StreamExecutorResult<( + impl Stream)>> + 'a, + &'a mut Option>, + )> { + let degree_state = &mut self.degree_state; + let (pk_indices, state_table) = (&self.state.pk_indices, &mut self.state.table); + let degrees = if let Some(ref degree_state) = degree_state { + Some(fetch_degrees(key, &self.join_key_data_types, °ree_state.table).await?) + } else { + None + }; + let stream = into_stream( + &self.join_key_data_types, + pk_indices, + &self.pk_serializer, + state_table, + key, + degrees, + ); + Ok((stream, &mut self.degree_state)) + } +} + +#[try_stream(ok = (PkType, JoinRow), error = StreamExecutorError)] +pub(crate) async fn into_stream<'a, K: HashKey, S: StateStore>( + join_key_data_types: &'a [DataType], + pk_indices: &'a [usize], + pk_serializer: &'a OrderedRowSerde, + state_table: &'a StateTable, + key: &'a K, + degrees: Option>, +) { + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + let decoded_key = key.deserialize(join_key_data_types)?; + let table_iter = state_table + .iter_with_prefix(&decoded_key, sub_range, PrefetchOptions::default()) + .await?; + + #[for_await] + for (i, entry) in table_iter.enumerate() { + let encoded_row = entry?; + let encoded_pk = encoded_row + .as_ref() + .project(pk_indices) + .memcmp_serialize(pk_serializer); + let join_row = JoinRow::new( + encoded_row.into_owned_row(), + degrees.as_ref().map_or(0, |d| d[i]), + ); + yield (encoded_pk, join_row); + } +} + +/// We use this to fetch ALL degrees into memory. +/// We use this instead of a streaming interface. +/// It is necessary because we must update the `degree_state_table` concurrently. +/// If we obtain the degrees in a stream, +/// we will need to hold an immutable reference to the state table for the entire lifetime, +/// preventing us from concurrently updating the state table. +/// +/// The cost of fetching all degrees upfront is acceptable. We currently already do so +/// in `fetch_cached_state`. +/// The memory use should be limited since we only store a u64. +/// +/// Let's say we have amplification of 1B, we will have 1B * 8 bytes ~= 8GB +/// +/// We can also have further optimization, to permit breaking the streaming update, +/// to flush the in-memory degrees, if this is proven to have high memory consumption. +/// +/// TODO(kwannoel): Perhaps we can cache these separately from matched rows too. +/// Because matched rows may occupy a larger capacity. +/// +/// Argument for this: +/// We only hit this when cache miss. When cache miss, we will have this as one off cost. +/// Keeping this cached separately from matched rows is beneficial. +/// Then we can evict matched rows, without touching the degrees. +async fn fetch_degrees( + key: &K, + join_key_data_types: &[DataType], + degree_state_table: &StateTable, +) -> StreamExecutorResult> { + let key = key.deserialize(join_key_data_types)?; + let mut degrees = vec![]; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + let table_iter = degree_state_table + .iter_with_prefix(key, sub_range, PrefetchOptions::default()) + .await + .unwrap(); + #[for_await] + for entry in table_iter { + let degree_row = entry?; + let degree_i64 = degree_row + .datum_at(degree_row.len() - 1) + .expect("degree should not be NULL"); + degrees.push(degree_i64.into_int64() as u64); + } + Ok(degrees) +} + +// NOTE(kwannoel): This is not really specific to `TableInner`. +// A degree table is `TableInner`, a `TableInner` might not be a degree table. +// Hence we don't specify it in its impl block. +pub(crate) fn update_degree( + degree_state: &mut TableInner, + matched_row: &mut JoinRow, +) { + let old_degree_row = matched_row + .row + .as_ref() + .project(°ree_state.order_key_indices) + .chain(once(Some(ScalarImpl::Int64(matched_row.degree as i64)))); + if INCREMENT { + matched_row.degree += 1; + } else { + // DECREMENT + matched_row.degree -= 1; + } + let new_degree_row = matched_row + .row + .as_ref() + .project(°ree_state.order_key_indices) + .chain(once(Some(ScalarImpl::Int64(matched_row.degree as i64)))); + degree_state.table.update(old_degree_row, new_degree_row); +} + pub struct TableInner { /// Indices of the (cache) pk in a state row pk_indices: Vec, /// Indices of the join key in a state row join_key_indices: Vec, - // This should be identical to the pk in state table. + /// This should be identical to the pk in state table. order_key_indices: Vec, pub(crate) table: StateTable, } @@ -366,6 +505,28 @@ impl JoinHashMap { } } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call + /// `update_state` after some operations to put the state back. + /// + /// If the state does not exist in the cache, fetch the remote storage and return. If it still + /// does not exist in the remote storage, a [`JoinEntryState`] with empty cache will be + /// returned. + /// + /// Note: This will NOT remove anything from remote storage. + pub fn take_state_opt(&mut self, key: &K) -> Option { + self.metrics.total_lookup_count += 1; + if self.inner.contains(key) { + tracing::trace!("hit cache for join key: {:?}", key); + // Do not update the LRU statistics here with `peek_mut` since we will put the state + // back. + let mut state = self.inner.peek_mut(key).unwrap(); + Some(state.take()) + } else { + tracing::trace!("miss cache for join key: {:?}", key); + None + } + } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call /// `update_state` after some operations to put the state back. /// @@ -580,6 +741,18 @@ impl JoinHashMap { Ok(()) } + pub fn insert_handle_degree( + &mut self, + key: &K, + value: JoinRow, + ) -> StreamExecutorResult<()> { + if self.need_degree_table { + self.insert(key, value) + } else { + self.insert_row(key, value.row) + } + } + /// Insert a join row pub fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { let pk = self.serialize_pk_from_row(&value.row); @@ -626,6 +799,18 @@ impl JoinHashMap { Ok(()) } + pub fn delete_handle_degree( + &mut self, + key: &K, + value: JoinRow, + ) -> StreamExecutorResult<()> { + if self.need_degree_table { + self.delete(key, value) + } else { + self.delete_row(key, value.row) + } + } + /// Delete a join row pub fn delete(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { if let Some(mut entry) = self.inner.get_mut(key) { @@ -676,53 +861,6 @@ impl JoinHashMap { self.inner.put(key.clone(), HashValueWrapper(Some(state))); } - /// Manipulate the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in - /// memory and in the degree table. - fn manipulate_degree( - &mut self, - join_row_ref: &mut StateValueType, - join_row: &mut JoinRow, - action: impl Fn(&mut DegreeType), - ) { - // TODO: no need to `into_owned_row` here due to partial borrow. - let old_degree = join_row - .to_table_rows(&self.state.order_key_indices) - .1 - .into_owned_row(); - - action(&mut join_row_ref.degree); - action(&mut join_row.degree); - - let new_degree = join_row.to_table_rows(&self.state.order_key_indices).1; - let degree_state = self.degree_state.as_mut().unwrap(); - degree_state.table.update(old_degree, new_degree); - } - - /// Increment the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in - /// memory and in the degree table. - pub fn inc_degree( - &mut self, - join_row_ref: &mut StateValueType, - join_row: &mut JoinRow, - ) { - self.manipulate_degree(join_row_ref, join_row, |d| *d += 1) - } - - /// Decrement the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in - /// memory and in the degree table. - pub fn dec_degree( - &mut self, - join_row_ref: &mut StateValueType, - join_row: &mut JoinRow, - ) { - self.manipulate_degree(join_row_ref, join_row, |d| { - *d = d.checked_sub(1).unwrap_or_else(|| { - consistency_panic!("Tried to decrement zero join row degree"); - 0 - }); - }) - } - /// Evict the cache. pub fn evict(&mut self) { self.inner.evict(); @@ -773,6 +911,7 @@ use risingwave_common_estimate_size::KvSize; use thiserror::Error; use super::*; +use crate::executor::prelude::{try_stream, Stream}; /// We manages a `HashMap` in memory for all entries belonging to a join key. /// When evicted, `cached` does not hold any entries. diff --git a/src/stream/src/executor/join/mod.rs b/src/stream/src/executor/join/mod.rs index ea53a7992f265..01fb35c97f278 100644 --- a/src/stream/src/executor/join/mod.rs +++ b/src/stream/src/executor/join/mod.rs @@ -22,6 +22,16 @@ pub mod hash_join; pub mod join_row_set; pub mod row; +pub(crate) type JoinOpPrimitive = bool; + +#[allow(non_snake_case, non_upper_case_globals)] +pub(crate) mod JoinOp { + use super::JoinOpPrimitive; + + pub const Insert: JoinOpPrimitive = true; + pub const Delete: JoinOpPrimitive = false; +} + /// The `JoinType` and `SideType` are to mimic a enum, because currently /// enum is not supported in const generic. // TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed. diff --git a/src/stream/src/executor/join/row.rs b/src/stream/src/executor/join/row.rs index 6dfafc2b91df1..f6a8ec671744c 100644 --- a/src/stream/src/executor/join/row.rs +++ b/src/stream/src/executor/join/row.rs @@ -47,6 +47,10 @@ impl JoinRow { (&self.row, degree) } + pub fn to_degree_row<'a>(&'a self, state_order_key_indices: &'a [usize]) -> impl Row + 'a { + self.to_table_rows(state_order_key_indices).1 + } + pub fn encode(&self) -> EncodedJoinRow { EncodedJoinRow { compacted_row: (&self.row).into(),