diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 7f509fd293d46..e77df8b8ecca6 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -991,227 +991,6 @@ impl HashJoinExecutor( - matched_rows: Option, - row: RowRef<'a>, - op: Op, - key: &'a K, - hashjoin_chunk_builder: &'a mut JoinChunkBuilder, - side_match: &'a mut JoinSide, - side_update: &'a mut JoinSide, - useful_state_clean_columns: &'a [(usize, &'a Watermark)], - cond: &'a mut Option, - append_only_optimize: bool, - ) { - match op { - Op::Insert | Op::UpdateInsert => { - let mut degree = 0; - let mut append_only_matched_row: Option> = None; - if let Some(mut matched_rows) = matched_rows { - let mut matched_rows_to_clean = vec![]; - for (matched_row_ref, matched_row) in - matched_rows.values_mut(&side_match.all_data_types) - { - let mut matched_row = matched_row?; - - let join_condition_satisfied = Self::check_join_condition( - row, - side_update.start_pos, - &matched_row.row, - side_match.start_pos, - cond, - ) - .await; - let mut need_state_clean = false; - if join_condition_satisfied { - degree += 1; - if side_match.need_degree_table { - side_match.ht.inc_degree(&mut matched_row); - // Specific to matched row. - matched_row_ref.degree += 1; - } - if !forward_exactly_once(T, SIDE) { - if let Some(chunk) = - hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row) - { - yield chunk; - } - } - } else { - for (column_idx, watermark) in useful_state_clean_columns { - if matched_row - .row - .datum_at(*column_idx) - .map_or(false, |scalar| { - scalar - .default_cmp(&watermark.val.as_scalar_ref_impl()) - .is_lt() - }) - { - need_state_clean = true; - break; - } - } - } - // If the stream is append-only and the join key covers pk in both side, - // then we can remove matched rows since pk is unique and will not be - // inserted again - if append_only_optimize { - // Since join key contains pk and pk is unique, there should be only - // one row if matched. - assert!(append_only_matched_row.is_none()); - append_only_matched_row = Some(matched_row); - } else if need_state_clean { - // `append_only_optimize` and `need_state_clean` won't both be true. - // 'else' here is only to suppress compiler error. - matched_rows_to_clean.push(matched_row); - } - } - if degree == 0 { - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) - { - yield chunk; - } - } else if let Some(chunk) = - hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row) - { - yield chunk; - } - // Insert back the state taken from ht. - side_match.ht.update_state(key, matched_rows); - for matched_row in matched_rows_to_clean { - if side_match.need_degree_table { - side_match.ht.delete(key, matched_row)?; - } else { - side_match.ht.delete_row(key, matched_row.row)?; - } - } - - if append_only_optimize && let Some(row) = append_only_matched_row { - if side_match.need_degree_table { - side_match.ht.delete(key, row)?; - } else { - side_match.ht.delete_row(key, row.row)?; - } - } else if side_update.need_degree_table { - side_update.ht.insert(key, JoinRow::new(row, degree))?; - } else { - side_update.ht.insert_row(key, row)?; - } - } else { - // Row which violates null-safe bitmap will never be matched so we need not - // store. - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) - { - yield chunk; - } - } - } - Op::Delete | Op::UpdateDelete => { - let mut degree = 0; - if let Some(mut matched_rows) = matched_rows { - let mut matched_rows_to_clean = vec![]; - for (matched_row_ref, matched_row) in - matched_rows.values_mut(&side_match.all_data_types) - { - let mut matched_row = matched_row?; - // TODO(yuhao-su): We should find a better way to eval the expression - // without concat two rows. - // if there are non-equi expressions - let join_condition_satisfied = Self::check_join_condition( - row, - side_update.start_pos, - &matched_row.row, - side_match.start_pos, - cond, - ) - .await; - let mut need_state_clean = false; - if join_condition_satisfied { - degree += 1; - if side_match.need_degree_table { - side_match.ht.dec_degree(&mut matched_row); - /// Specific to matched_row. - matched_row_ref.degree -= 1; - } - if !forward_exactly_once(T, SIDE) { - if let Some(chunk) = - hashjoin_chunk_builder.with_match_on_delete(&row, &matched_row) - { - yield chunk; - } - } - } else { - for (column_idx, watermark) in useful_state_clean_columns { - if matched_row - .row - .datum_at(*column_idx) - .map_or(false, |scalar| { - scalar - .default_cmp(&watermark.val.as_scalar_ref_impl()) - .is_lt() - }) - { - need_state_clean = true; - break; - } - } - } - if need_state_clean { - matched_rows_to_clean.push(matched_row); - } - } - if degree == 0 { - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) - { - yield chunk; - } - } else if let Some(chunk) = - hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row) - { - yield chunk; - } - // Insert back the state taken from ht. - side_match.ht.update_state(key, matched_rows); - for matched_row in matched_rows_to_clean { - if side_match.need_degree_table { - side_match.ht.delete(key, matched_row)?; - } else { - side_match.ht.delete_row(key, matched_row.row)?; - } - } - - if append_only_optimize { - unreachable!(); - } else if side_update.need_degree_table { - side_update.ht.delete(key, JoinRow::new(row, degree))?; - } else { - side_update.ht.delete_row(key, row)?; - }; - } else { - // We do not store row which violates null-safe bitmap. - if let Some(chunk) = - hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) - { - yield chunk; - } - } - } - } - } - /// For the probe-side row, we need to check if it has values in cache, if not, we need to /// fetch the matched rows from the state table. ///