From 6a8e6f70787cf2d2488438e73d4d386dd031b4e9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:07:17 +0800 Subject: [PATCH 01/13] add bench with join type and cache workload --- src/stream/benches/stream_hash_join_mem.rs | 7 +- src/stream/benches/stream_hash_join_rt.rs | 21 ++- src/stream/src/executor/test_utils.rs | 193 ++++++++++++++------- 3 files changed, 143 insertions(+), 78 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 7f4275162ad99..c0b319f943213 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -16,15 +16,16 @@ //! To run this benchmark you can use the following command: //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join +//! cargo bench --features dhat-heap --bench stream_hash_join_mem //! ``` //! //! You may also specify the amplification size, e.g. 40000 //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join -- 40000 +//! cargo bench --features dhat-heap --bench stream_hash_join_mem -- 40000 //! ``` use std::env; +use risingwave_stream::executor::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; @@ -44,7 +45,7 @@ async fn main() { } else { 100_000 }; - let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp).await; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, HashJoinWorkload::NotInCache, JoinType::Inner).await; { // Start the profiler later, after we have ingested the data for hash join build-side. #[cfg(feature = "dhat-heap")] diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 5beed531575e6..0e3a6bea62588 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -23,6 +23,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; +use risingwave_stream::executor::JoinType; risingwave_expr_impl::enable!(); @@ -32,14 +33,18 @@ fn bench_hash_join(c: &mut Criterion) { let rt = Runtime::new().unwrap(); for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] { - let name = format!("hash_join_rt_{}", amp); - group.bench_function(&name, |b| { - b.to_async(&rt).iter_batched( - || block_on(setup_bench_stream_hash_join(amp)), - |(tx_l, tx_r, out)| handle_streams(amp, tx_l, tx_r, out), - BatchSize::SmallInput, - ) - }); + for workload in [HashJoinWorkload::NotInCache, HashJoinWorkload::InCache] { + for join_type in [JoinType::Inner, JoinType::LeftOuter] { + let name = format!("hash_join_rt_{}_{}_{}", amp, workload, join_type); + group.bench_function(&name, |b| { + b.to_async(&rt).iter_batched( + || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), + |(tx_l, tx_r, out)| handle_streams(workload, join_type, amp, tx_l, tx_r, out), + BatchSize::SmallInput, + ) + }); + } + } } } diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 2b02c5882eace..5b2c75266b53f 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -630,7 +630,8 @@ pub mod hash_join_executor { use std::sync::Arc; use itertools::Itertools; - use risingwave_common::array::{I64Array, Op, StreamChunkTestExt}; + use strum_macros::Display; + use risingwave_common::array::{I64Array, Op}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; @@ -642,6 +643,13 @@ pub mod hash_join_executor { use crate::executor::prelude::StateTable; use crate::executor::test_utils::{MessageSender, MockSource}; use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType}; + use crate::executor::join::JoinTypePrimitive; + + #[derive(Clone, Copy, Debug, Display)] + pub enum HashJoinWorkload { + InCache, + NotInCache, + } pub async fn create_in_memory_state_table( mem_state: MemoryStateStore, @@ -701,6 +709,8 @@ pub mod hash_join_executor { /// 4. Check memory utilization. pub async fn setup_bench_stream_hash_join( amp: usize, + workload: HashJoinWorkload, + join_type_primitive: JoinTypePrimitive, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let fields = vec![DataType::Int64, DataType::Int64, DataType::Int64]; let orders = vec![OrderType::ascending(), OrderType::ascending()]; @@ -715,27 +725,8 @@ pub mod hash_join_executor { create_in_memory_state_table(state_store.clone(), &fields, &orders, &[0, 1], 2).await; // Insert 100K records into the build side. - { - // Create column [0]: join key. Each record has the same value, to trigger join amplification. - let mut int64_jk_builder = DataType::Int64.create_array_builder(amp); - int64_jk_builder - .append_array(&I64Array::from_iter(vec![Some(200_000); amp].into_iter()).into()); - let jk = int64_jk_builder.finish(); - - // Create column [1]: pk. The original pk will be here, it will be unique. - let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(amp); - let seq = I64Array::from_iter((0..amp as i64).map(Some)); - int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); - let pk = int64_pk_data_chunk_builder.finish(); - - // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. - let values = pk.clone(); - - // Build the stream chunk. - let columns = vec![jk.into(), pk.into(), values.into()]; - let ops = vec![Op::Insert; amp]; - let stream_chunk = StreamChunk::new(ops, columns); - + if matches!(workload, HashJoinWorkload::NotInCache) { + let stream_chunk = build_chunk(amp, 200_000); // Write to state table. rhs_state_table.write_chunk(stream_chunk); } @@ -765,31 +756,85 @@ pub mod hash_join_executor { let params_l = JoinParams::new(vec![0], vec![1]); let params_r = JoinParams::new(vec![0], vec![1]); - let executor = HashJoinExecutor::::new( - ActorContext::for_test(123), - info, - source_l, - source_r, - params_l, - params_r, - vec![false], // null-safe - (0..schema_len).collect_vec(), - None, // condition, it is an eq join, we have no condition - vec![], // ineq pairs - lhs_state_table, - lhs_degree_state_table, - rhs_state_table, - rhs_degree_state_table, - Arc::new(AtomicU64::new(0)), // watermark epoch - false, // is_append_only - Arc::new(StreamingMetrics::unused()), - 1024, // chunk_size - 2048, // high_join_amplification_threshold - ); - (tx_l, tx_r, executor.boxed().execute()) + match join_type_primitive { + JoinType::Inner => { + let executor = HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + JoinType::LeftOuter => { + let executor = HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + _ => panic!("Unsupported join type"), + } + + } + + fn build_chunk(size: usize, join_key_value: i64) -> StreamChunk { + // Create column [0]: join key. Each record has the same value, to trigger join amplification. + let mut int64_jk_builder = DataType::Int64.create_array_builder(size); + int64_jk_builder + .append_array(&I64Array::from_iter(vec![Some(join_key_value); size].into_iter()).into()); + let jk = int64_jk_builder.finish(); + + // Create column [1]: pk. The original pk will be here, it will be unique. + let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(size); + let seq = I64Array::from_iter((0..size as i64).map(Some)); + int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); + let pk = int64_pk_data_chunk_builder.finish(); + + // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. + let values = pk.clone(); + + // Build the stream chunk. + let columns = vec![jk.into(), pk.into(), values.into()]; + let ops = vec![Op::Insert; size]; + StreamChunk::new(ops, columns) } pub async fn handle_streams( + hash_join_workload: HashJoinWorkload, + join_type_primitive: JoinTypePrimitive, amp: usize, mut tx_l: MessageSender, mut tx_r: MessageSender, @@ -798,11 +843,22 @@ pub mod hash_join_executor { // Init executors tx_l.push_barrier(test_epoch(1), false); tx_r.push_barrier(test_epoch(1), false); - // Push a single record into tx_l, matches 100K records in the build side. - let chunk = StreamChunk::from_pretty( - " I I I - + 200000 0 1", - ); + + if matches!(hash_join_workload, HashJoinWorkload::InCache) { + // Push a single record into tx_r, so 100K records to be matched are cached. + let chunk = build_chunk(amp, 200_000); + tx_r.push_chunk(chunk); + } + + // Push a chunk of records into tx_l, matches 100K records in the build side. + let chunk_size = 1024; + let chunk = match join_type_primitive { + // Make sure all match + JoinType::Inner => build_chunk(chunk_size, 200_000), + // Make sure no match is found. + JoinType::LeftOuter => build_chunk(chunk_size, 300_000), + _ => panic!("Unsupported join type"), + }; tx_l.push_chunk(chunk); match stream.next().await { @@ -814,27 +870,30 @@ pub mod hash_join_executor { } } - let chunks = amp / 1024; - let remainder = amp % 1024; - - for _ in 0..chunks { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), 1024); - } - other => { - panic!("Expected a barrier, got {:?}", other); + match join_type_primitive { + JoinType::LeftOuter => { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + assert_eq!(c.cardinality(), 1024); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } } } - } - - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), remainder); - } - other => { - panic!("Expected a barrier, got {:?}", other); + JoinType::Inner => { + for _ in 0..amp { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + assert_eq!(c.cardinality(), chunk_size); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } + } + } } + _ => panic!("Unsupported join type"), } } } From 70a602b4aec14ca4165e9771c954b67d55ce640c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:21:02 +0800 Subject: [PATCH 02/13] loosen constrains --- src/stream/src/executor/test_utils.rs | 33 ++++++++++----------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 5b2c75266b53f..9ec6deed66c3b 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -870,30 +870,21 @@ pub mod hash_join_executor { } } - match join_type_primitive { - JoinType::LeftOuter => { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), 1024); - } - other => { - panic!("Expected a barrier, got {:?}", other); - } + let expected_count = match join_type_primitive { + JoinType::LeftOuter => amp * chunk_size, + JoinType::Inner => chunk_size, + _ => panic!("Unsupported join type"), + }; + let mut current_count = 0; + while current_count < expected_count { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + current_count += c.cardinality(); } - } - JoinType::Inner => { - for _ in 0..amp { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), chunk_size); - } - other => { - panic!("Expected a barrier, got {:?}", other); - } - } + other => { + panic!("Expected a barrier, got {:?}", other); } } - _ => panic!("Unsupported join type"), } } } From e69eb12f7906fdf53fc6e8a701ee12f3ccdbdef9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:28:19 +0800 Subject: [PATCH 03/13] fix --- src/stream/src/executor/test_utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 9ec6deed66c3b..3951f2a2fc2ac 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -871,8 +871,8 @@ pub mod hash_join_executor { } let expected_count = match join_type_primitive { - JoinType::LeftOuter => amp * chunk_size, - JoinType::Inner => chunk_size, + JoinType::LeftOuter => chunk_size, + JoinType::Inner => amp * chunk_size, _ => panic!("Unsupported join type"), }; let mut current_count = 0; From 800f211cc27a0a15f403d4745ead5dbffecabab9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:14:03 +0800 Subject: [PATCH 04/13] use 64 chunk_size --- src/stream/src/executor/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 3951f2a2fc2ac..b03b3bc7456b0 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -851,7 +851,7 @@ pub mod hash_join_executor { } // Push a chunk of records into tx_l, matches 100K records in the build side. - let chunk_size = 1024; + let chunk_size = 64; let chunk = match join_type_primitive { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), From ebc5d99ba3b4dece3915968debb556ad0794126f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:18:33 +0800 Subject: [PATCH 05/13] control chunk size based on workload --- src/stream/src/executor/test_utils.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index b03b3bc7456b0..25299dc4dca91 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -851,7 +851,10 @@ pub mod hash_join_executor { } // Push a chunk of records into tx_l, matches 100K records in the build side. - let chunk_size = 64; + let chunk_size = match hash_join_workload { + HashJoinWorkload::InCache => 64, + HashJoinWorkload::NotInCache => 1, + }; let chunk = match join_type_primitive { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), From f5cd053a06223aa8fa8a0426bb00c9b77fffc25b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:23:55 +0800 Subject: [PATCH 06/13] use pb join type instead to get name instead of number --- src/stream/benches/stream_hash_join_mem.rs | 2 +- src/stream/benches/stream_hash_join_rt.rs | 2 +- src/stream/src/executor/test_utils.rs | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index c0b319f943213..ddbecfd503351 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -25,7 +25,7 @@ //! ``` use std::env; -use risingwave_stream::executor::JoinType; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 0e3a6bea62588..58d61b82b550b 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -23,7 +23,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; -use risingwave_stream::executor::JoinType; +use risingwave_pb::plan_common::JoinType; risingwave_expr_impl::enable!(); diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 25299dc4dca91..ddd9a45a5ebe1 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -635,6 +635,7 @@ pub mod hash_join_executor { use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::plan_common::JoinType; use risingwave_storage::memory::MemoryStateStore; use super::*; @@ -642,8 +643,7 @@ pub mod hash_join_executor { use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::StateTable; use crate::executor::test_utils::{MessageSender, MockSource}; - use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType}; - use crate::executor::join::JoinTypePrimitive; + use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType as ConstJoinType}; #[derive(Clone, Copy, Debug, Display)] pub enum HashJoinWorkload { @@ -710,7 +710,7 @@ pub mod hash_join_executor { pub async fn setup_bench_stream_hash_join( amp: usize, workload: HashJoinWorkload, - join_type_primitive: JoinTypePrimitive, + join_type: JoinType, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let fields = vec![DataType::Int64, DataType::Int64, DataType::Int64]; let orders = vec![OrderType::ascending(), OrderType::ascending()]; @@ -756,9 +756,9 @@ pub mod hash_join_executor { let params_l = JoinParams::new(vec![0], vec![1]); let params_r = JoinParams::new(vec![0], vec![1]); - match join_type_primitive { + match join_type { JoinType::Inner => { - let executor = HashJoinExecutor::::new( + let executor = HashJoinExecutor::::new( ActorContext::for_test(123), info, source_l, @@ -782,7 +782,7 @@ pub mod hash_join_executor { (tx_l, tx_r, executor.boxed().execute()) } JoinType::LeftOuter => { - let executor = HashJoinExecutor::::new( + let executor = HashJoinExecutor::::new( ActorContext::for_test(123), info, source_l, @@ -834,7 +834,7 @@ pub mod hash_join_executor { pub async fn handle_streams( hash_join_workload: HashJoinWorkload, - join_type_primitive: JoinTypePrimitive, + join_type: JoinType, amp: usize, mut tx_l: MessageSender, mut tx_r: MessageSender, @@ -855,7 +855,7 @@ pub mod hash_join_executor { HashJoinWorkload::InCache => 64, HashJoinWorkload::NotInCache => 1, }; - let chunk = match join_type_primitive { + let chunk = match join_type { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), // Make sure no match is found. @@ -873,7 +873,7 @@ pub mod hash_join_executor { } } - let expected_count = match join_type_primitive { + let expected_count = match join_type { JoinType::LeftOuter => chunk_size, JoinType::Inner => amp * chunk_size, _ => panic!("Unsupported join type"), From 6490d0405c10d7615e5a8bed4d7583da060aeaa3 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:27:49 +0800 Subject: [PATCH 07/13] fix format string --- src/stream/benches/stream_hash_join_mem.rs | 6 ++++-- src/stream/benches/stream_hash_join_rt.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index ddbecfd503351..5a47703c3be4b 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -45,12 +45,14 @@ async fn main() { } else { 100_000 }; - let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, HashJoinWorkload::NotInCache, JoinType::Inner).await; + let workload = HashJoinWorkload::NotInCache; + let join_type = JoinType::Inner; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await; { // Start the profiler later, after we have ingested the data for hash join build-side. #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); - handle_streams(amp, tx_l, tx_r, out).await; + handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; } } diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 58d61b82b550b..0eb458c980534 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -35,7 +35,7 @@ fn bench_hash_join(c: &mut Criterion) { for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] { for workload in [HashJoinWorkload::NotInCache, HashJoinWorkload::InCache] { for join_type in [JoinType::Inner, JoinType::LeftOuter] { - let name = format!("hash_join_rt_{}_{}_{}", amp, workload, join_type); + let name = format!("hash_join_rt_{}_{}_{:#?}", amp, workload, join_type); group.bench_function(&name, |b| { b.to_async(&rt).iter_batched( || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), From 9818310aa5c020339cdfaea312c4791a4af3ed0e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 10:49:04 +0800 Subject: [PATCH 08/13] add program to monitor mem use --- src/stream/benches/stream_hash_join.py | 39 ++++++++++++++++++++++ src/stream/benches/stream_hash_join_mem.rs | 36 ++++++++++++-------- 2 files changed, 61 insertions(+), 14 deletions(-) create mode 100755 src/stream/benches/stream_hash_join.py diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py new file mode 100755 index 0000000000000..d5ca9994051fa --- /dev/null +++ b/src/stream/benches/stream_hash_join.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# Executes full benchmark for stream_hash_join runtime and memory consumption +# Outputs a json file with the results. + +import subprocess +import re +import sys + +# Print header +print("Amp,Workload,JoinType,Total Blocks,Total Bytes") + +# Run benchmarks and capture results +for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000]: + for workload in ["NotInCache", "InCache"]: + for join_type in ["Inner", "LeftOuter"]: + # Construct the command + cmd = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + + # Run the command and capture output + try: + output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) + + # Extract total blocks and bytes + total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output) + total_bytes_match = re.search(r'max_bytes:\s*(\d+)', output) + + if total_blocks_match and total_bytes_match: + total_blocks = total_blocks_match.group(1) + total_bytes = total_bytes_match.group(1) + + # Print results immediately + print(f"{amp},{workload},{join_type},{total_blocks},{total_bytes}") + else: + print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + except subprocess.CalledProcessError as e: + print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + print(f"Error output: {e.output}", file=sys.stderr) \ No newline at end of file diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 5a47703c3be4b..50df173bfe7d0 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -14,14 +14,9 @@ #![feature(let_chains)] -//! To run this benchmark you can use the following command: +//! Specify the amplification_size,workload,join_type e.g. 40000 //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join_mem -//! ``` -//! -//! You may also specify the amplification size, e.g. 40000 -//! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join_mem -- 40000 +//! ARGS=40000,NotInCache,Inner cargo bench --features dhat-heap --bench stream_hash_join_mem //! ``` use std::env; @@ -37,16 +32,26 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[tokio::main] async fn main() { - let args: Vec<_> = env::args().collect(); - let amp = if let Some(raw_arg) = args.get(1) - && let Ok(arg) = raw_arg.parse() + let arg = env::var("ARGS"); + let (amp, workload, join_type) = if let Ok(raw_arg) = arg { - arg + let parts = raw_arg.split(',').collect::>(); + let amp = parts[0].parse::().expect(format!("invalid amplification_size: {}", parts[0]).as_str()); + let workload = match parts[1] { + "NotInCache" => HashJoinWorkload::NotInCache, + "InCache" => HashJoinWorkload::InCache, + _ => panic!("Invalid workload: {}", parts[1]), + }; + let join_type = match parts[2] { + "Inner" => JoinType::Inner, + "LeftOuter" => JoinType::LeftOuter, + _ => panic!("Invalid join type: {}", parts[2]), + }; + (amp, workload, join_type) } else { - 100_000 + panic!("invalid ARGS: {:?}", arg); }; - let workload = HashJoinWorkload::NotInCache; - let join_type = JoinType::Inner; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await; { // Start the profiler later, after we have ingested the data for hash join build-side. @@ -54,5 +59,8 @@ async fn main() { let _profiler = dhat::Profiler::new_heap(); handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; + let stats= dhat::HeapStats::get(); + println!("max_blocks: {}", stats.max_blocks); + println!("max_bytes: {}", stats.max_bytes); } } From 35d4ad937848e77f7417e57a1afe2005e43adf67 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 12:04:28 +0800 Subject: [PATCH 09/13] update script to add rt --- src/stream/benches/stream_hash_join.py | 38 ++++++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py index d5ca9994051fa..07deebb631211 100755 --- a/src/stream/benches/stream_hash_join.py +++ b/src/stream/benches/stream_hash_join.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 - +import json # Executes full benchmark for stream_hash_join runtime and memory consumption # Outputs a json file with the results. @@ -8,18 +8,21 @@ import sys # Print header -print("Amp,Workload,JoinType,Total Blocks,Total Bytes") +results = ["Amp,Workload,JoinType,Total Blocks,Total Bytes,Runtime (ns)"] # Run benchmarks and capture results -for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000]: +for amp in [20_000, 40_000, 200_000, 400_000]: for workload in ["NotInCache", "InCache"]: for join_type in ["Inner", "LeftOuter"]: # Construct the command - cmd = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + cmd_mem = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + cmd_rt = f'cargo criterion --message-format json --bench stream_hash_join_rt -- hash_join_rt_{amp}_{workload}_{join_type}' + + s = "" - # Run the command and capture output try: - output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) + # Run cmd_mem and capture output + output = subprocess.check_output(cmd_mem, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) # Extract total blocks and bytes total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output) @@ -29,11 +32,28 @@ total_blocks = total_blocks_match.group(1) total_bytes = total_bytes_match.group(1) - # Print results immediately - print(f"{amp},{workload},{join_type},{total_blocks},{total_bytes}") + s+=f"{amp},{workload},{join_type},{total_blocks},{total_bytes}" else: print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + # Run cmd_rt and capture output + json_output = subprocess.check_output(cmd_rt, shell=True, universal_newlines=True) + json_output = json_output.split('\n') + try: + time_ns = json.loads(json_output[0])["typical"]["estimate"] + except Exception as e: + print(f"could not parse {json_output[0]} due to {e}") + exit(1) + if time_ns: + s+=f",{time_ns}" + else: + print(f"No runtime found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + results.append(s) + except subprocess.CalledProcessError as e: print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) - print(f"Error output: {e.output}", file=sys.stderr) \ No newline at end of file + print(f"Error output: {e.output}", file=sys.stderr) + +for result in results: + print(result) \ No newline at end of file From 60f66a456593b0c157a587f0ecd5a1f00410d765 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 14:42:44 +0800 Subject: [PATCH 10/13] docs --- src/stream/benches/stream_hash_join_rt.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 0eb458c980534..418ecf17761f4 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -18,6 +18,11 @@ //! ```sh //! cargo bench --bench stream_hash_join_rt //! ``` +//! +//! Generate flamegraph: +//! ```sh +//! sudo cargo flamegraph --bench stream_hash_join_rt -- hash_join_rt_40000_InCache_Inner +//! ``` use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; From 4a3afd2aca788451c3ab60207df90e4cb3e07faf Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 14 Dec 2024 20:09:48 +0800 Subject: [PATCH 11/13] add barrier to synchronize --- src/stream/src/executor/test_utils.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index ddd9a45a5ebe1..d4cd228c0ca63 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -850,6 +850,9 @@ pub mod hash_join_executor { tx_r.push_chunk(chunk); } + tx_l.push_barrier(test_epoch(2), false); + tx_r.push_barrier(test_epoch(2), false); + // Push a chunk of records into tx_l, matches 100K records in the build side. let chunk_size = match hash_join_workload { HashJoinWorkload::InCache => 64, @@ -873,6 +876,15 @@ pub mod hash_join_executor { } } + match stream.next().await { + Some(Ok(Message::Barrier(b))) => { + assert_eq!(b.epoch.curr, test_epoch(2)); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } + } + let expected_count = match join_type { JoinType::LeftOuter => chunk_size, JoinType::Inner => amp * chunk_size, From 4b1ba1134bee791a465dd77ddb32a02ef80531ab Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 27 Dec 2024 11:17:23 +0800 Subject: [PATCH 12/13] fmt --- src/stream/benches/stream_hash_join_mem.rs | 11 +++-- src/stream/benches/stream_hash_join_rt.rs | 6 ++- src/stream/src/executor/test_utils.rs | 54 ++++++++++++---------- 3 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 50df173bfe7d0..b97dfc992a5ca 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -20,8 +20,8 @@ //! ``` use std::env; -use risingwave_pb::plan_common::JoinType; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; risingwave_expr_impl::enable!(); @@ -33,10 +33,11 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[tokio::main] async fn main() { let arg = env::var("ARGS"); - let (amp, workload, join_type) = if let Ok(raw_arg) = arg - { + let (amp, workload, join_type) = if let Ok(raw_arg) = arg { let parts = raw_arg.split(',').collect::>(); - let amp = parts[0].parse::().expect(format!("invalid amplification_size: {}", parts[0]).as_str()); + let amp = parts[0] + .parse::() + .expect(format!("invalid amplification_size: {}", parts[0]).as_str()); let workload = match parts[1] { "NotInCache" => HashJoinWorkload::NotInCache, "InCache" => HashJoinWorkload::InCache, @@ -59,7 +60,7 @@ async fn main() { let _profiler = dhat::Profiler::new_heap(); handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; - let stats= dhat::HeapStats::get(); + let stats = dhat::HeapStats::get(); println!("max_blocks: {}", stats.max_blocks); println!("max_bytes: {}", stats.max_bytes); } diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 418ecf17761f4..3ec107294c2ec 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -26,9 +26,9 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; -use risingwave_pb::plan_common::JoinType; risingwave_expr_impl::enable!(); @@ -44,7 +44,9 @@ fn bench_hash_join(c: &mut Criterion) { group.bench_function(&name, |b| { b.to_async(&rt).iter_batched( || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), - |(tx_l, tx_r, out)| handle_streams(workload, join_type, amp, tx_l, tx_r, out), + |(tx_l, tx_r, out)| { + handle_streams(workload, join_type, amp, tx_l, tx_r, out) + }, BatchSize::SmallInput, ) }); diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index d4cd228c0ca63..256f0bbc6f620 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -630,13 +630,13 @@ pub mod hash_join_executor { use std::sync::Arc; use itertools::Itertools; - use strum_macros::Display; use risingwave_common::array::{I64Array, Op}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::JoinType; use risingwave_storage::memory::MemoryStateStore; + use strum_macros::Display; use super::*; use crate::common::table::test_utils::gen_pbtable; @@ -758,31 +758,36 @@ pub mod hash_join_executor { match join_type { JoinType::Inner => { - let executor = HashJoinExecutor::::new( - ActorContext::for_test(123), - info, - source_l, - source_r, - params_l, - params_r, - vec![false], // null-safe - (0..schema_len).collect_vec(), - None, // condition, it is an eq join, we have no condition - vec![], // ineq pairs - lhs_state_table, - lhs_degree_state_table, - rhs_state_table, - rhs_degree_state_table, - Arc::new(AtomicU64::new(0)), // watermark epoch - false, // is_append_only - Arc::new(StreamingMetrics::unused()), - 1024, // chunk_size - 2048, // high_join_amplification_threshold - ); + let executor = + HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); (tx_l, tx_r, executor.boxed().execute()) } JoinType::LeftOuter => { - let executor = HashJoinExecutor::::new( + let executor = HashJoinExecutor::< + Key128, + MemoryStateStore, + { ConstJoinType::LeftOuter }, + >::new( ActorContext::for_test(123), info, source_l, @@ -807,14 +812,13 @@ pub mod hash_join_executor { } _ => panic!("Unsupported join type"), } - } fn build_chunk(size: usize, join_key_value: i64) -> StreamChunk { // Create column [0]: join key. Each record has the same value, to trigger join amplification. let mut int64_jk_builder = DataType::Int64.create_array_builder(size); int64_jk_builder - .append_array(&I64Array::from_iter(vec![Some(join_key_value); size].into_iter()).into()); + .append_array(&I64Array::from_iter(vec![Some(join_key_value); size]).into()); let jk = int64_jk_builder.finish(); // Create column [1]: pk. The original pk will be here, it will be unique. From f21aa13bff3353d5f832f7bf9d2fa5b7bc344494 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:33:59 +0800 Subject: [PATCH 13/13] Apply suggestions from code review --- src/stream/benches/stream_hash_join.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py index 07deebb631211..a152416f71b47 100755 --- a/src/stream/benches/stream_hash_join.py +++ b/src/stream/benches/stream_hash_join.py @@ -1,3 +1,17 @@ +# Copyright 2024 RisingWave Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + #!/usr/bin/env python3 import json # Executes full benchmark for stream_hash_join runtime and memory consumption