Skip to content

Commit

Permalink
perf(stream): add hash join memory benchmarking for cache refill (#19712
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kwannoel authored Dec 10, 2024
1 parent 5585f51 commit bd82fe3
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,5 @@ e2e_test/iceberg/metastore_db

# mdbook
book

dhat-heap.json
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub struct Field {
pub type_name: String,
}

impl Field {
pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
}

impl std::fmt::Debug for Field {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{:?}", self.name, self.data_type)
Expand Down
12 changes: 12 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
criterion = { workspace = true, features = ["async_tokio", "async"] }
dhat = "0.3"
expect-test = "1"
risingwave_expr_impl = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
Expand All @@ -99,6 +100,9 @@ risingwave_hummock_test = { path = "../storage/hummock_test", features = [
serde_yaml = "0.9"
tracing-test = "0.2"

[features]
dhat-heap = []

[[bench]]
name = "stream_hash_agg"
harness = false
Expand All @@ -107,5 +111,13 @@ harness = false
name = "bench_state_table"
harness = false

[[bench]]
name = "stream_hash_join_rt"
harness = false

[[bench]]
name = "stream_hash_join_mem"
harness = false

[lints]
workspace = true
55 changes: 55 additions & 0 deletions src/stream/benches/stream_hash_join_mem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

//! To run this benchmark you can use the following command:
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join
//! ```
//!
//! You may also specify the amplification size, e.g. 40000
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join -- 40000
//! ```
use std::env;

use risingwave_stream::executor::test_utils::hash_join_executor::*;

risingwave_expr_impl::enable!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[tokio::main]
async fn main() {
let args: Vec<_> = env::args().collect();
let amp = if let Some(raw_arg) = args.get(1)
&& let Ok(arg) = raw_arg.parse()
{
arg
} else {
100_000
};
let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp).await;
{
// Start the profiler later, after we have ingested the data for hash join build-side.
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

handle_streams(amp, tx_l, tx_r, out).await;
}
}
47 changes: 47 additions & 0 deletions src/stream/benches/stream_hash_join_rt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

//! To run this benchmark you can use the following command:
//! ```sh
//! cargo bench --bench stream_hash_join_rt
//! ```
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use risingwave_stream::executor::test_utils::hash_join_executor::*;
use tokio::runtime::Runtime;

risingwave_expr_impl::enable!();

fn bench_hash_join(c: &mut Criterion) {
let mut group = c.benchmark_group("benchmark_hash_join");
group.sample_size(10);

let rt = Runtime::new().unwrap();
for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] {
let name = format!("hash_join_rt_{}", amp);
group.bench_function(&name, |b| {
b.to_async(&rt).iter_batched(
|| block_on(setup_bench_stream_hash_join(amp)),
|(tx_l, tx_r, out)| handle_streams(amp, tx_l, tx_r, out),
BatchSize::SmallInput,
)
});
}
}

criterion_group!(benches, bench_hash_join);
criterion_main!(benches);
Loading

0 comments on commit bd82fe3

Please sign in to comment.