-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(memory): use thread-local sequence-based memory eviction policy #16087
Merged
Merged
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
9eb816a
perf(memory): use thread-local squence-based memory eviction policy
MrCroxx d998a13
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx b87e911
test(bench): add sequencer benchmark
MrCroxx f9678c3
fix: fix license header
MrCroxx 3959a16
fix: do not init sequence when insert lru
MrCroxx 1ba5fd6
perf: add lru bench
MrCroxx 6483b58
fix: clear lru cache after drop
MrCroxx 7a9a7c8
refactor: simplify clear
MrCroxx f76da99
fix: drop inited field when clear
MrCroxx f12c141
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 8eccfee
refactor: update metrics in rw
MrCroxx 594111d
chore: update grafana
MrCroxx dc0737a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 5d98779
refactor: make sequencer args configurable
MrCroxx f77d980
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx dce4999
chore: tiny refactors
MrCroxx 7ebc5d3
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 2339eee
chore: make clippy happier
MrCroxx ebc27aa
fix: enable unstabl feature
MrCroxx d19b1ae
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 8bc7ee1
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 8e6140c
Merge branch 'main' into xx/thread-local-sequence
MrCroxx dc43b6a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 6ae11d5
chore: fill rust docs for Sequencer
MrCroxx c934b8e
chore: refine docs for controller
MrCroxx 3dfd2d6
fix: fix bench build
MrCroxx 207df01
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 7b68659
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx 93858bb
fix: resolve grafana build
MrCroxx 3d5917c
refactor: remove `update_epoch`
MrCroxx a46061b
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
use std::cell::RefCell; | ||
MrCroxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
use std::hint::black_box; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::{Duration, Instant}; | ||
|
||
use itertools::Itertools; | ||
use risingwave_common::sequence::*; | ||
|
||
thread_local! { | ||
pub static SEQUENCER_64_8: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 8)); | ||
pub static SEQUENCER_64_16: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 16)); | ||
pub static SEQUENCER_64_32: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 32)); | ||
pub static SEQUENCER_128_8: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 8)); | ||
pub static SEQUENCER_128_16: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 16)); | ||
pub static SEQUENCER_128_32: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 32)); | ||
} | ||
|
||
fn coarse(loops: usize) -> Duration { | ||
let now = Instant::now(); | ||
for _ in 0..loops { | ||
let _ = coarsetime::Instant::now(); | ||
} | ||
now.elapsed() | ||
} | ||
|
||
fn primitive(loops: usize) -> Duration { | ||
let mut cnt = 0usize; | ||
let now = Instant::now(); | ||
for _ in 0..loops { | ||
cnt += 1; | ||
let _ = cnt; | ||
} | ||
now.elapsed() | ||
} | ||
|
||
fn atomic(loops: usize, atomic: Arc<AtomicUsize>) -> Duration { | ||
let now = Instant::now(); | ||
for _ in 0..loops { | ||
let _ = atomic.fetch_add(1, Ordering::Relaxed); | ||
} | ||
now.elapsed() | ||
} | ||
|
||
fn atomic_skip(loops: usize, atomic: Arc<AtomicUsize>, skip: usize) -> Duration { | ||
let mut cnt = 0usize; | ||
let now = Instant::now(); | ||
for _ in 0..loops { | ||
cnt += 1; | ||
let _ = cnt; | ||
if cnt % skip == 0 { | ||
let _ = atomic.fetch_add(skip, Ordering::Relaxed); | ||
} else { | ||
let _ = atomic.load(Ordering::Relaxed); | ||
} | ||
} | ||
now.elapsed() | ||
} | ||
|
||
fn sequencer(loops: usize, step: Sequence, lag_amp: Sequence) -> Duration { | ||
let sequencer = match (step, lag_amp) { | ||
(64, 8) => &SEQUENCER_64_8, | ||
(64, 16) => &SEQUENCER_64_16, | ||
(64, 32) => &SEQUENCER_64_32, | ||
(128, 8) => &SEQUENCER_128_8, | ||
(128, 16) => &SEQUENCER_128_16, | ||
(128, 32) => &SEQUENCER_128_32, | ||
_ => unimplemented!(), | ||
}; | ||
let now = Instant::now(); | ||
for _ in 0..loops { | ||
let _ = sequencer.with(|s| s.borrow_mut().inc()); | ||
} | ||
now.elapsed() | ||
} | ||
|
||
fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F) | ||
where | ||
F: Fn() -> Duration + Clone + Send + 'static, | ||
{ | ||
let handles = (0..threads) | ||
.map(|_| std::thread::spawn(black_box(f.clone()))) | ||
.collect_vec(); | ||
let mut dur = Duration::from_nanos(0); | ||
for handle in handles { | ||
dur += handle.join().unwrap(); | ||
} | ||
println!( | ||
"{:20} {} threads {} loops: {:?} per iter", | ||
name, | ||
threads, | ||
loops, | ||
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64) | ||
); | ||
} | ||
|
||
fn main() { | ||
for (threads, loops) in [ | ||
(1, 10_000_000), | ||
(4, 10_000_000), | ||
(8, 10_000_000), | ||
(16, 10_000_000), | ||
(32, 10_000_000), | ||
] { | ||
println!(); | ||
|
||
benchmark("primitive", threads, loops, move || primitive(loops)); | ||
|
||
let a = Arc::new(AtomicUsize::new(0)); | ||
benchmark("atomic", threads, loops, move || atomic(loops, a.clone())); | ||
|
||
let a = Arc::new(AtomicUsize::new(0)); | ||
benchmark("atomic skip 8", threads, loops, move || { | ||
atomic_skip(loops, a.clone(), 8) | ||
}); | ||
|
||
let a = Arc::new(AtomicUsize::new(0)); | ||
benchmark("atomic skip 16", threads, loops, move || { | ||
atomic_skip(loops, a.clone(), 16) | ||
}); | ||
|
||
let a = Arc::new(AtomicUsize::new(0)); | ||
benchmark("atomic skip 32", threads, loops, move || { | ||
atomic_skip(loops, a.clone(), 32) | ||
}); | ||
|
||
let a = Arc::new(AtomicUsize::new(0)); | ||
benchmark("atomic skip 64", threads, loops, move || { | ||
atomic_skip(loops, a.clone(), 64) | ||
}); | ||
|
||
benchmark("sequencer(64,8)", threads, loops, move || { | ||
sequencer(loops, 64, 8) | ||
}); | ||
benchmark("sequencer(64,16)", threads, loops, move || { | ||
sequencer(loops, 64, 16) | ||
}); | ||
benchmark("sequencer(64,32)", threads, loops, move || { | ||
sequencer(loops, 64, 32) | ||
}); | ||
benchmark("sequencer(128,8)", threads, loops, move || { | ||
sequencer(loops, 128, 8) | ||
}); | ||
benchmark("sequencer(128,16)", threads, loops, move || { | ||
sequencer(loops, 128, 16) | ||
}); | ||
benchmark("sequencer(128,32)", threads, loops, move || { | ||
sequencer(loops, 128, 32) | ||
}); | ||
|
||
benchmark("coarse", threads, loops, move || coarse(loops)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I heard that some stateless queries in NexMark were negatively affected by this PR for some "unknown" cause. Have we found the reason now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not. But the regression hasn't appear these weeks.