Skip to content

Commit

Permalink
remove old cache handling fn
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 12, 2024
1 parent add4f59 commit 08ac933
Showing 1 changed file with 0 additions and 221 deletions.
221 changes: 0 additions & 221 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,227 +991,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
}

/// For the probe-side row, we need to check if it has values in cache, if not, we need to
/// fetch the matched rows from the state table.
///
/// Every matched build-side row being processed needs to go through the following phases:
/// 1. Handle join condition evaluation.
/// 2. Always do cache refill, if the state count is good.
/// 3. Handle state cleaning.
/// 4. Handle degree table update.
#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
async fn handle_cached_matched_rows<'a, const SIDE: SideTypePrimitive>(
matched_rows: Option<HashValueType>,
row: RowRef<'a>,
op: Op,
key: &'a K,
hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
side_match: &'a mut JoinSide<K, S>,
side_update: &'a mut JoinSide<K, S>,
useful_state_clean_columns: &'a [(usize, &'a Watermark)],
cond: &'a mut Option<NonStrictExpression>,
append_only_optimize: bool,
) {
match op {
Op::Insert | Op::UpdateInsert => {
let mut degree = 0;
let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
if let Some(mut matched_rows) = matched_rows {
let mut matched_rows_to_clean = vec![];
for (matched_row_ref, matched_row) in
matched_rows.values_mut(&side_match.all_data_types)
{
let mut matched_row = matched_row?;

let join_condition_satisfied = Self::check_join_condition(
row,
side_update.start_pos,
&matched_row.row,
side_match.start_pos,
cond,
)
.await;
let mut need_state_clean = false;
if join_condition_satisfied {
degree += 1;
if side_match.need_degree_table {
side_match.ht.inc_degree(&mut matched_row);
// Specific to matched row.
matched_row_ref.degree += 1;
}
if !forward_exactly_once(T, SIDE) {
if let Some(chunk) =
hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row)
{
yield chunk;
}
}
} else {
for (column_idx, watermark) in useful_state_clean_columns {
if matched_row
.row
.datum_at(*column_idx)
.map_or(false, |scalar| {
scalar
.default_cmp(&watermark.val.as_scalar_ref_impl())
.is_lt()
})
{
need_state_clean = true;
break;
}
}
}
// If the stream is append-only and the join key covers pk in both side,
// then we can remove matched rows since pk is unique and will not be
// inserted again
if append_only_optimize {
// Since join key contains pk and pk is unique, there should be only
// one row if matched.
assert!(append_only_matched_row.is_none());
append_only_matched_row = Some(matched_row);
} else if need_state_clean {
// `append_only_optimize` and `need_state_clean` won't both be true.
// 'else' here is only to suppress compiler error.
matched_rows_to_clean.push(matched_row);
}
}
if degree == 0 {
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
{
yield chunk;
}
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row)
{
yield chunk;
}
// Insert back the state taken from ht.
side_match.ht.update_state(key, matched_rows);
for matched_row in matched_rows_to_clean {
if side_match.need_degree_table {
side_match.ht.delete(key, matched_row)?;
} else {
side_match.ht.delete_row(key, matched_row.row)?;
}
}

if append_only_optimize && let Some(row) = append_only_matched_row {
if side_match.need_degree_table {
side_match.ht.delete(key, row)?;
} else {
side_match.ht.delete_row(key, row.row)?;
}
} else if side_update.need_degree_table {
side_update.ht.insert(key, JoinRow::new(row, degree))?;
} else {
side_update.ht.insert_row(key, row)?;
}
} else {
// Row which violates null-safe bitmap will never be matched so we need not
// store.
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
{
yield chunk;
}
}
}
Op::Delete | Op::UpdateDelete => {
let mut degree = 0;
if let Some(mut matched_rows) = matched_rows {
let mut matched_rows_to_clean = vec![];
for (matched_row_ref, matched_row) in
matched_rows.values_mut(&side_match.all_data_types)
{
let mut matched_row = matched_row?;
// TODO(yuhao-su): We should find a better way to eval the expression
// without concat two rows.
// if there are non-equi expressions
let join_condition_satisfied = Self::check_join_condition(
row,
side_update.start_pos,
&matched_row.row,
side_match.start_pos,
cond,
)
.await;
let mut need_state_clean = false;
if join_condition_satisfied {
degree += 1;
if side_match.need_degree_table {
side_match.ht.dec_degree(&mut matched_row);
/// Specific to matched_row.
matched_row_ref.degree -= 1;
}
if !forward_exactly_once(T, SIDE) {
if let Some(chunk) =
hashjoin_chunk_builder.with_match_on_delete(&row, &matched_row)
{
yield chunk;
}
}
} else {
for (column_idx, watermark) in useful_state_clean_columns {
if matched_row
.row
.datum_at(*column_idx)
.map_or(false, |scalar| {
scalar
.default_cmp(&watermark.val.as_scalar_ref_impl())
.is_lt()
})
{
need_state_clean = true;
break;
}
}
}
if need_state_clean {
matched_rows_to_clean.push(matched_row);
}
}
if degree == 0 {
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
{
yield chunk;
}
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row)
{
yield chunk;
}
// Insert back the state taken from ht.
side_match.ht.update_state(key, matched_rows);
for matched_row in matched_rows_to_clean {
if side_match.need_degree_table {
side_match.ht.delete(key, matched_row)?;
} else {
side_match.ht.delete_row(key, matched_row.row)?;
}
}

if append_only_optimize {
unreachable!();
} else if side_update.need_degree_table {
side_update.ht.delete(key, JoinRow::new(row, degree))?;
} else {
side_update.ht.delete_row(key, row)?;
};
} else {
// We do not store row which violates null-safe bitmap.
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
{
yield chunk;
}
}
}
}
}

/// For the probe-side row, we need to check if it has values in cache, if not, we need to
/// fetch the matched rows from the state table.
///
Expand Down

0 comments on commit 08ac933

Please sign in to comment.