diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py new file mode 100755 index 0000000000000..a152416f71b47 --- /dev/null +++ b/src/stream/benches/stream_hash_join.py @@ -0,0 +1,73 @@ +# Copyright 2024 RisingWave Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/usr/bin/env python3 +import json +# Executes full benchmark for stream_hash_join runtime and memory consumption +# Outputs a json file with the results. + +import subprocess +import re +import sys + +# Print header +results = ["Amp,Workload,JoinType,Total Blocks,Total Bytes,Runtime (ns)"] + +# Run benchmarks and capture results +for amp in [20_000, 40_000, 200_000, 400_000]: + for workload in ["NotInCache", "InCache"]: + for join_type in ["Inner", "LeftOuter"]: + # Construct the command + cmd_mem = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + cmd_rt = f'cargo criterion --message-format json --bench stream_hash_join_rt -- hash_join_rt_{amp}_{workload}_{join_type}' + + s = "" + + try: + # Run cmd_mem and capture output + output = subprocess.check_output(cmd_mem, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) + + # Extract total blocks and bytes + total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output) + total_bytes_match = re.search(r'max_bytes:\s*(\d+)', output) + + if total_blocks_match and total_bytes_match: + total_blocks = total_blocks_match.group(1) + total_bytes = total_bytes_match.group(1) + + s+=f"{amp},{workload},{join_type},{total_blocks},{total_bytes}" + else: + print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + # Run cmd_rt and capture output + json_output = subprocess.check_output(cmd_rt, shell=True, universal_newlines=True) + json_output = json_output.split('\n') + try: + time_ns = json.loads(json_output[0])["typical"]["estimate"] + except Exception as e: + print(f"could not parse {json_output[0]} due to {e}") + exit(1) + if time_ns: + s+=f",{time_ns}" + else: + print(f"No runtime found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + results.append(s) + + except subprocess.CalledProcessError as e: + print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + print(f"Error output: {e.output}", file=sys.stderr) + +for result in results: + print(result) \ No newline at end of file diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 7f4275162ad99..b97dfc992a5ca 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -14,18 +14,14 @@ #![feature(let_chains)] -//! To run this benchmark you can use the following command: +//! Specify the amplification_size,workload,join_type e.g. 40000 //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join -//! ``` -//! -//! You may also specify the amplification size, e.g. 40000 -//! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join -- 40000 +//! ARGS=40000,NotInCache,Inner cargo bench --features dhat-heap --bench stream_hash_join_mem //! ``` use std::env; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; risingwave_expr_impl::enable!(); @@ -36,20 +32,36 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[tokio::main] async fn main() { - let args: Vec<_> = env::args().collect(); - let amp = if let Some(raw_arg) = args.get(1) - && let Ok(arg) = raw_arg.parse() - { - arg + let arg = env::var("ARGS"); + let (amp, workload, join_type) = if let Ok(raw_arg) = arg { + let parts = raw_arg.split(',').collect::>(); + let amp = parts[0] + .parse::() + .expect(format!("invalid amplification_size: {}", parts[0]).as_str()); + let workload = match parts[1] { + "NotInCache" => HashJoinWorkload::NotInCache, + "InCache" => HashJoinWorkload::InCache, + _ => panic!("Invalid workload: {}", parts[1]), + }; + let join_type = match parts[2] { + "Inner" => JoinType::Inner, + "LeftOuter" => JoinType::LeftOuter, + _ => panic!("Invalid join type: {}", parts[2]), + }; + (amp, workload, join_type) } else { - 100_000 + panic!("invalid ARGS: {:?}", arg); }; - let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp).await; + + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await; { // Start the profiler later, after we have ingested the data for hash join build-side. #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); - handle_streams(amp, tx_l, tx_r, out).await; + handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; + let stats = dhat::HeapStats::get(); + println!("max_blocks: {}", stats.max_blocks); + println!("max_bytes: {}", stats.max_bytes); } } diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 5beed531575e6..3ec107294c2ec 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -18,9 +18,15 @@ //! ```sh //! cargo bench --bench stream_hash_join_rt //! ``` +//! +//! Generate flamegraph: +//! ```sh +//! sudo cargo flamegraph --bench stream_hash_join_rt -- hash_join_rt_40000_InCache_Inner +//! ``` use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; @@ -32,14 +38,20 @@ fn bench_hash_join(c: &mut Criterion) { let rt = Runtime::new().unwrap(); for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] { - let name = format!("hash_join_rt_{}", amp); - group.bench_function(&name, |b| { - b.to_async(&rt).iter_batched( - || block_on(setup_bench_stream_hash_join(amp)), - |(tx_l, tx_r, out)| handle_streams(amp, tx_l, tx_r, out), - BatchSize::SmallInput, - ) - }); + for workload in [HashJoinWorkload::NotInCache, HashJoinWorkload::InCache] { + for join_type in [JoinType::Inner, JoinType::LeftOuter] { + let name = format!("hash_join_rt_{}_{}_{:#?}", amp, workload, join_type); + group.bench_function(&name, |b| { + b.to_async(&rt).iter_batched( + || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), + |(tx_l, tx_r, out)| { + handle_streams(workload, join_type, amp, tx_l, tx_r, out) + }, + BatchSize::SmallInput, + ) + }); + } + } } } diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 2b02c5882eace..256f0bbc6f620 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -630,18 +630,26 @@ pub mod hash_join_executor { use std::sync::Arc; use itertools::Itertools; - use risingwave_common::array::{I64Array, Op, StreamChunkTestExt}; + use risingwave_common::array::{I64Array, Op}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::plan_common::JoinType; use risingwave_storage::memory::MemoryStateStore; + use strum_macros::Display; use super::*; use crate::common::table::test_utils::gen_pbtable; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::StateTable; use crate::executor::test_utils::{MessageSender, MockSource}; - use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType}; + use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType as ConstJoinType}; + + #[derive(Clone, Copy, Debug, Display)] + pub enum HashJoinWorkload { + InCache, + NotInCache, + } pub async fn create_in_memory_state_table( mem_state: MemoryStateStore, @@ -701,6 +709,8 @@ pub mod hash_join_executor { /// 4. Check memory utilization. pub async fn setup_bench_stream_hash_join( amp: usize, + workload: HashJoinWorkload, + join_type: JoinType, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let fields = vec![DataType::Int64, DataType::Int64, DataType::Int64]; let orders = vec![OrderType::ascending(), OrderType::ascending()]; @@ -715,27 +725,8 @@ pub mod hash_join_executor { create_in_memory_state_table(state_store.clone(), &fields, &orders, &[0, 1], 2).await; // Insert 100K records into the build side. - { - // Create column [0]: join key. Each record has the same value, to trigger join amplification. - let mut int64_jk_builder = DataType::Int64.create_array_builder(amp); - int64_jk_builder - .append_array(&I64Array::from_iter(vec![Some(200_000); amp].into_iter()).into()); - let jk = int64_jk_builder.finish(); - - // Create column [1]: pk. The original pk will be here, it will be unique. - let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(amp); - let seq = I64Array::from_iter((0..amp as i64).map(Some)); - int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); - let pk = int64_pk_data_chunk_builder.finish(); - - // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. - let values = pk.clone(); - - // Build the stream chunk. - let columns = vec![jk.into(), pk.into(), values.into()]; - let ops = vec![Op::Insert; amp]; - let stream_chunk = StreamChunk::new(ops, columns); - + if matches!(workload, HashJoinWorkload::NotInCache) { + let stream_chunk = build_chunk(amp, 200_000); // Write to state table. rhs_state_table.write_chunk(stream_chunk); } @@ -765,31 +756,89 @@ pub mod hash_join_executor { let params_l = JoinParams::new(vec![0], vec![1]); let params_r = JoinParams::new(vec![0], vec![1]); - let executor = HashJoinExecutor::::new( - ActorContext::for_test(123), - info, - source_l, - source_r, - params_l, - params_r, - vec![false], // null-safe - (0..schema_len).collect_vec(), - None, // condition, it is an eq join, we have no condition - vec![], // ineq pairs - lhs_state_table, - lhs_degree_state_table, - rhs_state_table, - rhs_degree_state_table, - Arc::new(AtomicU64::new(0)), // watermark epoch - false, // is_append_only - Arc::new(StreamingMetrics::unused()), - 1024, // chunk_size - 2048, // high_join_amplification_threshold - ); - (tx_l, tx_r, executor.boxed().execute()) + match join_type { + JoinType::Inner => { + let executor = + HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + JoinType::LeftOuter => { + let executor = HashJoinExecutor::< + Key128, + MemoryStateStore, + { ConstJoinType::LeftOuter }, + >::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + _ => panic!("Unsupported join type"), + } + } + + fn build_chunk(size: usize, join_key_value: i64) -> StreamChunk { + // Create column [0]: join key. Each record has the same value, to trigger join amplification. + let mut int64_jk_builder = DataType::Int64.create_array_builder(size); + int64_jk_builder + .append_array(&I64Array::from_iter(vec![Some(join_key_value); size]).into()); + let jk = int64_jk_builder.finish(); + + // Create column [1]: pk. The original pk will be here, it will be unique. + let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(size); + let seq = I64Array::from_iter((0..size as i64).map(Some)); + int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); + let pk = int64_pk_data_chunk_builder.finish(); + + // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. + let values = pk.clone(); + + // Build the stream chunk. + let columns = vec![jk.into(), pk.into(), values.into()]; + let ops = vec![Op::Insert; size]; + StreamChunk::new(ops, columns) } pub async fn handle_streams( + hash_join_workload: HashJoinWorkload, + join_type: JoinType, amp: usize, mut tx_l: MessageSender, mut tx_r: MessageSender, @@ -798,11 +847,28 @@ pub mod hash_join_executor { // Init executors tx_l.push_barrier(test_epoch(1), false); tx_r.push_barrier(test_epoch(1), false); - // Push a single record into tx_l, matches 100K records in the build side. - let chunk = StreamChunk::from_pretty( - " I I I - + 200000 0 1", - ); + + if matches!(hash_join_workload, HashJoinWorkload::InCache) { + // Push a single record into tx_r, so 100K records to be matched are cached. + let chunk = build_chunk(amp, 200_000); + tx_r.push_chunk(chunk); + } + + tx_l.push_barrier(test_epoch(2), false); + tx_r.push_barrier(test_epoch(2), false); + + // Push a chunk of records into tx_l, matches 100K records in the build side. + let chunk_size = match hash_join_workload { + HashJoinWorkload::InCache => 64, + HashJoinWorkload::NotInCache => 1, + }; + let chunk = match join_type { + // Make sure all match + JoinType::Inner => build_chunk(chunk_size, 200_000), + // Make sure no match is found. + JoinType::LeftOuter => build_chunk(chunk_size, 300_000), + _ => panic!("Unsupported join type"), + }; tx_l.push_chunk(chunk); match stream.next().await { @@ -814,27 +880,30 @@ pub mod hash_join_executor { } } - let chunks = amp / 1024; - let remainder = amp % 1024; + match stream.next().await { + Some(Ok(Message::Barrier(b))) => { + assert_eq!(b.epoch.curr, test_epoch(2)); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } + } - for _ in 0..chunks { + let expected_count = match join_type { + JoinType::LeftOuter => chunk_size, + JoinType::Inner => amp * chunk_size, + _ => panic!("Unsupported join type"), + }; + let mut current_count = 0; + while current_count < expected_count { match stream.next().await { Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), 1024); + current_count += c.cardinality(); } other => { panic!("Expected a barrier, got {:?}", other); } } } - - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), remainder); - } - other => { - panic!("Expected a barrier, got {:?}", other); - } - } } }