diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 74db1d267b1ac..ed14a6124749e 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -11,24 +11,28 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, Bound, HashSet}; use std::time::Duration; +use anyhow::Context; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::row::RowExt; use risingwave_common::types::{DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; +use risingwave_storage::store::PrefetchOptions; use tokio::time::Instant; use self::builder::JoinChunkBuilder; use super::barrier_align::*; use super::join::hash_join::*; -use super::join::row::JoinRow; +use super::join::row::{DegreeType, JoinRow}; use super::join::*; use super::watermark::*; use crate::executor::join::builder::JoinStreamChunkBuilder; @@ -118,6 +122,19 @@ impl JoinSide { pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.ht.init(epoch).await } + + fn get_table_mut_refs( + &mut self, + ) -> ( + &[usize], + &[DataType], + &[usize], + &OrderedRowSerde, + &mut StateTable, + Option<&mut StateTable>, + ) { + self.ht.get_table_mut_refs() + } } /// `HashJoinExecutor` takes two input streams and runs equal hash join on them. @@ -197,6 +214,11 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> { high_join_amplification_threshold: usize, } +enum CacheResult { + Miss, // Cache-miss + Hit(Option), // Cache-hit, no match or match. +} + impl HashJoinExecutor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -733,10 +755,22 @@ impl HashJoinExecutor) -> CacheResult { + if !key.null_bitmap().is_subset(ht.null_matched()) { + CacheResult::Hit(None) + } else if let Some(state) = ht.take_state_opt(key) { + CacheResult::Hit(Some(state)) + } else { + CacheResult::Miss + } + } + fn row_concat( - row_update: &RowRef<'_>, + row_update: impl Row, update_start_pos: usize, - row_matched: &OwnedRow, + row_matched: impl Row, matched_start_pos: usize, ) -> OwnedRow { let mut new_row = vec![None; row_update.len() + row_matched.len()]; @@ -744,8 +778,8 @@ impl HashJoinExecutor HashJoinExecutor, ) -> impl Stream> + '_ { - Self::eq_join_oneside::<{ SideType::Left }>(args) + Self::eq_join_oneside_opt::<{ SideType::Left }>(args) } /// Used to forward `eq_join_oneside` to show join side in stack. fn eq_join_right( args: EqJoinArgs<'_, K, S>, ) -> impl Stream> + '_ { - Self::eq_join_oneside::<{ SideType::Right }>(args) + Self::eq_join_oneside_opt::<{ SideType::Right }>(args) + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn eq_join_oneside_opt(args: EqJoinArgs<'_, K, S>) { + let EqJoinArgs { + ctx, + side_l, + side_r, + actual_output_data_types, + cond, + inequality_watermarks, + chunk, + append_only_optimize, + chunk_size, + cnt_rows_received, + high_join_amplification_threshold, + .. + } = args; + + let (side_update, side_match) = if SIDE == SideType::Left { + (side_l, side_r) + } else { + (side_r, side_l) + }; + + let useful_state_clean_columns = side_match + .state_clean_columns + .iter() + .filter_map(|(column_idx, inequality_index)| { + inequality_watermarks[*inequality_index] + .as_ref() + .map(|watermark| (*column_idx, watermark)) + }) + .collect_vec(); + + let mut hashjoin_chunk_builder = + JoinChunkBuilder::::new(JoinStreamChunkBuilder::new( + chunk_size, + actual_output_data_types.to_vec(), + side_update.i2o_mapping.clone(), + side_match.i2o_mapping.clone(), + )); + + let join_matched_join_keys = ctx + .streaming_metrics + .join_matched_join_keys + .with_guarded_label_values(&[ + &ctx.id.to_string(), + &ctx.fragment_id.to_string(), + &side_update.ht.table_id().to_string(), + ]); + + let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk()); + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { + let Some((op, row)) = r else { + continue; + }; + Self::evict_cache(side_update, side_match, cnt_rows_received); + + let key_satisfies_non_null_requirement = side_update + .non_null_fields + .iter() + .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() }); + + let matched_rows = if key_satisfies_non_null_requirement { + Self::hash_eq_match_opt(key, &mut side_match.ht) + } else { + CacheResult::Hit(None) + }; + + let mut total_matches = 0; + + match matched_rows { + CacheResult::Hit(rows) => { + #[for_await] + for chunk in Self::handle_cached_matched_rows( + rows, + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + let chunk = chunk?; + total_matches += chunk.cardinality(); + yield chunk; + } + } + CacheResult::Miss => { + #[for_await] + for chunk in Self::handle_fetch_matched_rows( + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + let chunk = chunk?; + total_matches += chunk.cardinality(); + yield chunk; + } + } + } + + join_matched_join_keys.observe(total_matches as _); + if total_matches > high_join_amplification_threshold { + let join_key_data_types = side_update.ht.join_key_data_types(); + let key = key.deserialize(join_key_data_types)?; + tracing::warn!(target: "high_join_amplification", + matched_rows_len = total_matches, + update_table_id = side_update.ht.table_id(), + match_table_id = side_match.ht.table_id(), + join_key = ?key, + actor_id = ctx.id, + fragment_id = ctx.fragment_id, + "large rows matched for join key" + ); + } + } + // NOTE(kwannoel): We don't track metrics for this last chunk. + if let Some(chunk) = hashjoin_chunk_builder.take() { + yield chunk; + } } #[try_stream(ok = StreamChunk, error = StreamExecutorError)] @@ -889,7 +1054,8 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor( + matched_rows: Option, + row: RowRef<'a>, + op: Op, + key: &'a K, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + side_match: &'a mut JoinSide, + side_update: &'a mut JoinSide, + useful_state_clean_columns: &'a [(usize, &'a Watermark)], + cond: &'a mut Option, + append_only_optimize: bool, + ) { + match op { + Op::Insert | Op::UpdateInsert => { + let mut degree = 0; + let mut append_only_matched_row: Option> = None; + if let Some(mut matched_rows) = matched_rows { + let mut matched_rows_to_clean = vec![]; + for (matched_row_ref, matched_row) in + matched_rows.values_mut(&side_match.all_data_types) + { + let mut matched_row = matched_row?; + + let join_condition_satisfied = Self::check_join_condition( + row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + cond, + ) + .await; + let mut need_state_clean = false; + if join_condition_satisfied { + degree += 1; + if side_match.need_degree_table { + side_match.ht.inc_degree(&mut matched_row); + matched_row_ref.degree += 1; + } + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row) + { + yield chunk; + } + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + // If the stream is append-only and the join key covers pk in both side, + // then we can remove matched rows since pk is unique and will not be + // inserted again + if append_only_optimize { + // Since join key contains pk and pk is unique, there should be only + // one row if matched. + assert!(append_only_matched_row.is_none()); + append_only_matched_row = Some(matched_row); + } else if need_state_clean { + // `append_only_optimize` and `need_state_clean` won't both be true. + // 'else' here is only to suppress compiler error. + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row) + { + yield chunk; + } + // Insert back the state taken from ht. + side_match.ht.update_state(key, matched_rows); + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize && let Some(row) = append_only_matched_row { + if side_match.need_degree_table { + side_match.ht.delete(key, row)?; + } else { + side_match.ht.delete_row(key, row.row)?; + } + } else if side_update.need_degree_table { + side_update.ht.insert(key, JoinRow::new(row, degree))?; + } else { + side_update.ht.insert_row(key, row)?; + } + } else { + // Row which violates null-safe bitmap will never be matched so we need not + // store. + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) + { + yield chunk; + } + } + } + Op::Delete | Op::UpdateDelete => { + let mut degree = 0; + if let Some(mut matched_rows) = matched_rows { + let mut matched_rows_to_clean = vec![]; + for (matched_row_ref, matched_row) in + matched_rows.values_mut(&side_match.all_data_types) + { + let mut matched_row = matched_row?; + // TODO(yuhao-su): We should find a better way to eval the expression + // without concat two rows. + // if there are non-equi expressions + let join_condition_satisfied = Self::check_join_condition( + row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + cond, + ) + .await; + let mut need_state_clean = false; + if join_condition_satisfied { + degree += 1; + if side_match.need_degree_table { + side_match.ht.dec_degree(&mut matched_row); + matched_row_ref.degree -= 1; + } + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_delete(&row, &matched_row) + { + yield chunk; + } + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + if need_state_clean { + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row) + { + yield chunk; + } + // Insert back the state taken from ht. + side_match.ht.update_state(key, matched_rows); + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize { + unreachable!(); + } else if side_update.need_degree_table { + side_update.ht.delete(key, JoinRow::new(row, degree))?; + } else { + side_update.ht.delete_row(key, row)?; + }; + } else { + // We do not store row which violates null-safe bitmap. + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) + { + yield chunk; + } + } + } + } + } + + /// For the probe-side row, we need to check if it has values in cache, if not, we need to + /// fetch the matched rows from the state table. + /// + /// Every matched build-side row being processed needs to go through the following phases: + /// 1. Handle join condition evaluation. + /// 2. Always do cache refill, if the state count is good. + /// 3. Handle state cleaning. + /// 4. Handle degree table update. + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn handle_fetch_matched_rows<'a, const SIDE: SideTypePrimitive>( + row: RowRef<'a>, + op: Op, + key: &'a K, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + side_match: &'a mut JoinSide, + side_update: &'a mut JoinSide, + useful_state_clean_columns: &'a [(usize, &'a Watermark)], + cond: &'a mut Option, + append_only_optimize: bool, + ) { + let mut entry_state = JoinEntryState::default(); + let mut entry_state_count = 0; + let entry_state_max_rows = 30000; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + match op { + Op::Insert | Op::UpdateInsert => { + let mut degree = 0; + let mut append_only_matched_row: Option> = None; + let mut matched_rows_to_clean = vec![]; + let ( + order_key_indices, + join_key_data_types, + pk_indices, + pk_serializer, + state_table, + mut degree_table, + ) = side_match.ht.get_table_mut_refs(); + let decoded_key = key.deserialize(join_key_data_types)?; + let degrees = if side_match.need_degree_table { + let degrees = Self::handle_fetch_degrees( + key, + join_key_data_types, + degree_table.as_ref().unwrap(), + ) + .await?; + Some(degrees) + } else { + None + }; + let table_iter = state_table + .iter_with_prefix(&decoded_key, sub_range, PrefetchOptions::default()) + .await?; + + #[for_await] + for (i, entry) in table_iter.enumerate() { + let encoded_row = entry?; + let encoded_pk = encoded_row + .as_ref() + .project(pk_indices) + .memcmp_serialize(pk_serializer); + let mut matched_row = JoinRow::new( + encoded_row.into_owned_row(), + degrees.as_ref().map_or(0, |d| d[i]), + ); + if entry_state_count <= entry_state_max_rows { + entry_state + .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join. + .with_context(|| { + let pk = row.project(pk_indices); + format!( + "pk: {}, row: {}, state_table_id: {}", + pk.display(), + row.display(), + state_table.table_id() + ) + })?; + entry_state_count += 1; + } + + let join_condition_satisfied = Self::check_join_condition( + row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + cond, + ) + .await; + let mut need_state_clean = false; + if join_condition_satisfied { + degree += 1; + if side_match.need_degree_table { + // TODO: no need to `into_owned_row` here due to partial borrow. + let old_degree = matched_row + .to_table_rows(order_key_indices) + .1 + .into_owned_row(); + + matched_row.degree += 1; + + let new_degree = matched_row.to_table_rows(order_key_indices).1; + degree_table + .as_mut() + .unwrap() + .update(old_degree, new_degree); + } + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row) + { + yield chunk; + } + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + // If the stream is append-only and the join key covers pk in both side, + // then we can remove matched rows since pk is unique and will not be + // inserted again + if append_only_optimize { + // Since join key contains pk and pk is unique, there should be only + // one row if matched. + assert!(append_only_matched_row.is_none()); + append_only_matched_row = Some(matched_row); + } else if need_state_clean { + // `append_only_optimize` and `need_state_clean` won't both be true. + // 'else' here is only to suppress compiler error. + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row) + { + yield chunk; + } + // cache refill + if entry_state_count <= entry_state_max_rows { + side_match.ht.update_state(key, Box::new(entry_state)); + } + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize && let Some(row) = append_only_matched_row { + if side_match.need_degree_table { + side_match.ht.delete(key, row)?; + } else { + side_match.ht.delete_row(key, row.row)?; + } + } else if side_update.need_degree_table { + side_update.ht.insert(key, JoinRow::new(row, degree))?; + } else { + side_update.ht.insert_row(key, row)?; + } + } + Op::Delete | Op::UpdateDelete => { + let mut degree = 0; + let mut matched_rows_to_clean = vec![]; + let ( + order_key_indices, + join_key_data_types, + pk_indices, + pk_serializer, + state_table, + mut degree_table, + ) = side_match.ht.get_table_mut_refs(); + let decoded_key = key.deserialize(join_key_data_types)?; + let degrees = if side_match.need_degree_table { + let degrees = Self::handle_fetch_degrees( + key, + join_key_data_types, + degree_table.as_ref().unwrap(), + ) + .await?; + Some(degrees) + } else { + None + }; + let table_iter = state_table + .iter_with_prefix(&decoded_key, sub_range, PrefetchOptions::default()) + .await?; + + #[for_await] + for (i, entry) in table_iter.enumerate() { + let encoded_row = entry?; + let encoded_pk = encoded_row + .as_ref() + .project(pk_indices) + .memcmp_serialize(pk_serializer); + let mut matched_row = JoinRow::new( + encoded_row.into_owned_row(), + degrees.as_ref().map_or(0, |d| d[i]), + ); + if entry_state_count <= entry_state_max_rows { + entry_state + .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join. + .with_context(|| { + let pk = row.project(pk_indices); + format!( + "pk: {}, row: {}, state_table_id: {}", + pk.display(), + row.display(), + state_table.table_id() + ) + })?; + entry_state_count += 1; + } + let join_condition_satisfied = Self::check_join_condition( + row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + cond, + ) + .await; + let mut need_state_clean = false; + if join_condition_satisfied { + degree += 1; + if side_match.need_degree_table { + // TODO: no need to `into_owned_row` here due to partial borrow. + let old_degree = matched_row + .to_table_rows(order_key_indices) + .1 + .into_owned_row(); + + matched_row.degree += 1; + + let new_degree = matched_row.to_table_rows(order_key_indices).1; + degree_table + .as_mut() + .unwrap() + .update(old_degree, new_degree); + } + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_delete(&row, &matched_row) + { + yield chunk; + } + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + if need_state_clean { + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row) + { + yield chunk; + } + // cache refill + if entry_state_count <= entry_state_max_rows { + side_match.ht.update_state(key, Box::new(entry_state)); + } + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize { + unreachable!(); + } else if side_update.need_degree_table { + side_update.ht.delete(key, JoinRow::new(row, degree))?; + } else { + side_update.ht.delete_row(key, row)?; + }; + } + } + } + + /// We use this to fetch ALL degrees into memory. + /// We use this instead of a streaming interface. + /// It is necessary because we must update the degree_state_table concurrently. + /// If we obtain the degrees in a stream, + /// we will need to hold an immutable reference to the state table for the entire lifetime, + /// preventing us from concurrently updating the state table. + /// + /// The cost of fetching all degrees upfront is acceptable. We currently already do so + /// in `fetch_cached_state`. + /// The memory use should be limited since we only store a u64. + /// + /// Let's say we have amplification of 1B, we will have 1B * 8 bytes ~= 8GB. + /// + /// We can also have further optimization, to permit breaking the streaming update, + /// to flush the in-memory degrees, if this is proven to have high memory consumption. + async fn handle_fetch_degrees( + key: &K, + join_key_data_types: &[DataType], + degree_state_table: &StateTable, + ) -> StreamExecutorResult> { + let key = key.deserialize(join_key_data_types)?; + let mut degrees = vec![]; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + let table_iter = degree_state_table + .iter_with_prefix(key, sub_range, PrefetchOptions::default()) + .await + .unwrap(); + #[for_await] + for entry in table_iter { + let degree_row = entry?; + let degree_i64 = degree_row + .datum_at(degree_row.len() - 1) + .expect("degree should not be NULL"); + degrees.push(degree_i64.into_int64() as u64); + } + Ok(degrees) + } + + // TODO(yuhao-su): We should find a better way to eval the expression + // without concat two rows. + // if there are non-equi expressions + async fn check_join_condition( + row: impl Row, + side_update_start_pos: usize, + matched_row: impl Row, + side_match_start_pos: usize, + join_condition: &Option, + ) -> bool { + if let Some(ref join_condition) = join_condition { + let new_row = Self::row_concat( + row, + side_update_start_pos, + matched_row, + side_match_start_pos, + ); + join_condition + .eval_row_infallible(&new_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true + } + } } #[cfg(test)] diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 287931076108e..7b55fa223c111 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::alloc::Global; use std::cmp::Ordering; use std::ops::{Bound, Deref, DerefMut, RangeBounds}; @@ -45,6 +44,7 @@ use crate::consistency::{consistency_error, consistency_panic, enable_strict_con use crate::executor::error::StreamExecutorResult; use crate::executor::join::row::JoinRow; use crate::executor::monitor::StreamingMetrics; +use crate::executor::StreamExecutorError; use crate::task::{ActorId, AtomicU64Ref, FragmentId}; /// Memcomparable encoding. @@ -224,6 +224,34 @@ pub struct JoinHashMap { metrics: JoinHashMapMetrics, } +impl JoinHashMap { + pub(crate) fn get_table_mut_refs( + &mut self, + ) -> ( + &[usize], + &[DataType], + &[usize], + &OrderedRowSerde, + &mut StateTable, + Option<&mut StateTable>, + ) { + let degree_state = self.degree_state.as_mut(); + let (order_key_indices, pk_indices, state_table) = ( + &self.state.order_key_indices, + &self.state.pk_indices, + &mut self.state.table, + ); + ( + order_key_indices, // degree table update + &self.join_key_data_types, // decode key from state_table + pk_indices, // decode pk from state_table + &self.pk_serializer, // serialize pk for entry_state (state table) + state_table, // state table + degree_state.map(|d| &mut d.table), // for degree table + ) + } +} + pub struct TableInner { /// Indices of the (cache) pk in a state row pk_indices: Vec, @@ -366,6 +394,28 @@ impl JoinHashMap { } } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call + /// `update_state` after some operations to put the state back. + /// + /// If the state does not exist in the cache, fetch the remote storage and return. If it still + /// does not exist in the remote storage, a [`JoinEntryState`] with empty cache will be + /// returned. + /// + /// Note: This will NOT remove anything from remote storage. + pub fn take_state_opt(&mut self, key: &K) -> Option { + self.metrics.total_lookup_count += 1; + if self.inner.contains(key) { + tracing::debug!("hit cache for join key: {:?}", key); + // Do not update the LRU statistics here with `peek_mut` since we will put the state + // back. + let mut state = self.inner.peek_mut(key).unwrap(); + Some(state.take()) + } else { + tracing::debug!("miss cache for join key: {:?}", key); + None + } + } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call /// `update_state` after some operations to put the state back. /// @@ -561,6 +611,10 @@ impl JoinHashMap { Ok(entry_state) } + pub fn error_context(&self, row: &impl Row) -> String { + self.state.error_context(row) + } + pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.metrics.flush(); self.state.table.commit(epoch).await?; @@ -678,7 +732,6 @@ impl JoinHashMap { /// memory and in the degree table. fn manipulate_degree( &mut self, - join_row_ref: &mut StateValueType, join_row: &mut JoinRow, action: impl Fn(&mut DegreeType), ) { @@ -688,7 +741,6 @@ impl JoinHashMap { .1 .into_owned_row(); - action(&mut join_row_ref.degree); action(&mut join_row.degree); let new_degree = join_row.to_table_rows(&self.state.order_key_indices).1; @@ -698,22 +750,14 @@ impl JoinHashMap { /// Increment the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in /// memory and in the degree table. - pub fn inc_degree( - &mut self, - join_row_ref: &mut StateValueType, - join_row: &mut JoinRow, - ) { - self.manipulate_degree(join_row_ref, join_row, |d| *d += 1) + pub fn inc_degree(&mut self, join_row: &mut JoinRow) { + self.manipulate_degree(join_row, |d| *d += 1) } /// Decrement the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in /// memory and in the degree table. - pub fn dec_degree( - &mut self, - join_row_ref: &mut StateValueType, - join_row: &mut JoinRow, - ) { - self.manipulate_degree(join_row_ref, join_row, |d| { + pub fn dec_degree(&mut self, join_row: &mut JoinRow) { + self.manipulate_degree(join_row, |d| { *d = d.checked_sub(1).unwrap_or_else(|| { consistency_panic!("Tried to decrement zero join row degree"); 0 @@ -767,10 +811,15 @@ impl JoinHashMap { } } +use risingwave_common::array::{Op, RowRef}; use risingwave_common_estimate_size::KvSize; +use risingwave_expr::expr::NonStrictExpression; use thiserror::Error; use super::*; +use crate::executor::join::builder::JoinChunkBuilder; +use crate::executor::prelude::try_stream; +use crate::executor::Watermark; /// We manages a `HashMap` in memory for all entries belonging to a join key. /// When evicted, `cached` does not hold any entries.