Skip to content

Commit

Permalink
File stats wrapped in result
Browse files Browse the repository at this point in the history
Instead of printing the I/O error within the crate, encapsulate the file
stats with a result that forwards the error. This avoids printing errors
for clients where it's expected the stats cannot be collected.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 5, 2024
1 parent 1508062 commit c9530d7
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 96 deletions.
23 changes: 14 additions & 9 deletions examples/disk_usage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Example that shows the disk usage for lgalloc.
fn main() {
let mut stats = lgalloc::LgAllocStats::default();
let buffer_size = 32 << 20;

let buffers = 32;
Expand All @@ -14,7 +14,7 @@ fn main() {
let mut regions: Vec<_> = (0..32)
.map(|_| lgalloc::allocate::<u8>(32 << 20).unwrap())
.collect();
print_stats(&mut stats);
print_stats();

for (ptr, cap, _handle) in &regions {
println!("Setting region at {ptr:?}...");
Expand All @@ -23,14 +23,14 @@ fn main() {
*i = 1;
}
}
print_stats(&mut stats);
print_stats();

let mut s = String::new();
let stdin = std::io::stdin();

println!("Enter to continue");
stdin.read_line(&mut s).unwrap();
print_stats(&mut stats);
print_stats();

println!("Dropping regions");
for (_ptr, _cap, handle) in regions.drain(..) {
Expand All @@ -39,13 +39,18 @@ fn main() {

println!("Enter to continue");
stdin.read_line(&mut s).unwrap();
print_stats(&mut stats);
print_stats();
}

fn print_stats(stats: &mut lgalloc::LgAllocStats) {
lgalloc::lgalloc_stats(stats);
fn print_stats() {
let stats = lgalloc::lgalloc_stats();

for file_stat in &stats.file_stats {
println!("{file_stat:?}");
match &stats.file_stats {
Ok(file_stats) => {
for file_stat in file_stats {
println!("{file_stat:?}");
}
}
Err(err) => eprintln!("Failed to get file stats: {err}"),
}
}
187 changes: 100 additions & 87 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use std::thread::{spawn, JoinHandle, ThreadId};
use std::thread::{JoinHandle, ThreadId};
use std::time::{Duration, Instant};

use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use memmap2::MmapMut;
use numa_maps::NumaMap;
use thiserror::Error;

mod readme {
Expand Down Expand Up @@ -319,7 +320,7 @@ impl ThreadLocalStealer {
///
/// Returns [`AllocError::Disabled`] if lgalloc is not enabled. Returns other error types
/// if out of memory, or an internal operation fails.
fn allocate(&mut self, size_class: SizeClass) -> Result<Handle, AllocError> {
fn allocate(&self, size_class: SizeClass) -> Result<Handle, AllocError> {
if !LGALLOC_ENABLED.load(Ordering::Relaxed) {
return Err(AllocError::Disabled);
}
Expand Down Expand Up @@ -559,8 +560,8 @@ impl Drop for LocalSizeClass {
}

/// Access the per-thread context.
fn thread_context<R, F: FnOnce(&mut ThreadLocalStealer) -> R>(f: F) -> R {
WORKER.with(|cell| f(&mut cell.borrow_mut()))
fn thread_context<R, F: FnOnce(&ThreadLocalStealer) -> R>(f: F) -> R {
WORKER.with(|cell| f(&cell.borrow()))
}

/// Allocate a memory area suitable to hold `capacity` consecutive elements of `T`.
Expand Down Expand Up @@ -685,7 +686,7 @@ impl BackgroundWorker {
size_class_state
.alloc_stats
.clear_slow
.fetch_add(count as u64, Ordering::Relaxed);
.fetch_add(count.try_into().expect("must fit"), Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -760,7 +761,7 @@ pub fn lgalloc_set_config(config: &LgAlloc) {
if let Some(config) = config {
let (sender, receiver) = std::sync::mpsc::channel();
let mut worker = BackgroundWorker::new(receiver);
let join_handle = spawn(move || worker.run());
let join_handle = std::thread::spawn(move || worker.run());
sender.send(config).expect("Receiver exists");
*lock = Some((join_handle, sender));
}
Expand Down Expand Up @@ -834,26 +835,14 @@ impl LgAlloc {
/// # Panics
///
/// Panics if the internal state of lgalloc is corrupted.
pub fn lgalloc_stats(stats: &mut LgAllocStats) {
stats.size_class.clear();
stats.file_stats.clear();

// `numa_maps` only exists on Linux, don't spam errors on other OSs.
#[cfg(target_os = "linux")]
let mut numa_map = numa_maps::NumaMap::from_file("/proc/self/numa_maps")
.map_err(|e| eprintln!("Failed to parse numa_maps: {e}"))
.unwrap_or_default();
#[cfg(not(target_os = "linux"))]
let mut numa_map = numa_maps::NumaMap::default();

// Normalize numa_maps, and sort by address.
for entry in &mut numa_map.ranges {
entry.normalize();
}
numa_map.ranges.sort();
pub fn lgalloc_stats() -> LgAllocStats {
let mut numa_map = NumaMap::from_file("/proc/self/numa_maps");

let global = GlobalStealer::get_static();

let mut size_classes = Vec::with_capacity(global.size_classes.len());
let mut file_stats = Vec::new();

for (index, size_class_state) in global.size_classes.iter().enumerate() {
let size_class = SizeClass::from_index(index);

Expand Down Expand Up @@ -897,7 +886,7 @@ pub fn lgalloc_stats(stats: &mut LgAllocStats) {
clear_eager_total += global_stats.clear_eager.load(Ordering::Relaxed);
clear_slow_total += global_stats.clear_slow.load(Ordering::Relaxed);

stats.size_class.push(SizeClassStats {
size_classes.push(SizeClassStats {
size_class,
areas,
area_total_bytes,
Expand All @@ -913,70 +902,99 @@ pub fn lgalloc_stats(stats: &mut LgAllocStats) {
clear_slow_total,
});

for (file, mmap) in areas_lock.iter().map(ManuallyDrop::deref) {
let (mapped, active, dirty) = {
let base = mmap.as_ptr().cast::<()>() as usize;
let range = match numa_map
.ranges
.binary_search_by(|range| range.address.cmp(&base))
{
Ok(pos) => Some(&numa_map.ranges[pos]),
// `numa_maps` only updates periodically, so we might be missing some
// expected ranges.
Err(_pos) => None,
};

let mut mapped = 0;
let mut active = 0;
let mut dirty = 0;
for property in range.iter().flat_map(|e| e.properties.iter()) {
match property {
numa_maps::Property::Dirty(d) => dirty = *d,
numa_maps::Property::Mapped(m) => mapped = *m,
numa_maps::Property::Active(a) => active = *a,
_ => {}
}
}
(mapped, active, dirty)
};
if let Ok(numa_map) = numa_map.as_mut() {
let areas = &areas_lock[..];
file_stats.extend(extract_file_stats(
size_class,
numa_map,
areas.iter().map(Deref::deref),
));
}
}

let mut stat: MaybeUninit<libc::stat> = MaybeUninit::uninit();
// SAFETY: File descriptor valid, stat object valid.
let ret = unsafe { libc::fstat(file.as_raw_fd(), stat.as_mut_ptr()) };
let stat = if ret == -1 {
None
} else {
// SAFETY: `stat` is initialized in the fstat non-error case.
Some(unsafe { stat.assume_init_ref() })
LgAllocStats {
file_stats: match numa_map {
Ok(_) => Ok(file_stats),
Err(err) => Err(err),
},
size_class: size_classes,
}
}

/// Extract for a size class area stats.
fn extract_file_stats<'a>(
size_class: usize,
numa_map: &'a mut NumaMap,
areas: impl IntoIterator<Item = &'a (File, MmapMut)> + 'a,
) -> impl Iterator<Item = FileStats> + 'a {
// Normalize numa_maps, and sort by address.
for entry in &mut numa_map.ranges {
entry.normalize();
}
numa_map.ranges.sort();

areas.into_iter().map(move |(file, mmap)| {
let (mapped, active, dirty) = {
let base = mmap.as_ptr().cast::<()>() as usize;
let range = match numa_map
.ranges
.binary_search_by(|range| range.address.cmp(&base))
{
Ok(pos) => Some(&numa_map.ranges[pos]),
// `numa_maps` only updates periodically, so we might be missing some
// expected ranges.
Err(_pos) => None,
};

let (blocks, file_size) = stat.map_or((0, 0), |stat| {
(
stat.st_blocks.try_into().unwrap_or(0),
stat.st_size.try_into().unwrap_or(0),
)
});
let mut mapped = 0;
let mut active = 0;
let mut dirty = 0;
for property in range.iter().flat_map(|e| e.properties.iter()) {
match property {
numa_maps::Property::Dirty(d) => dirty = *d,
numa_maps::Property::Mapped(m) => mapped = *m,
numa_maps::Property::Active(a) => active = *a,
_ => {}
}
}
(mapped, active, dirty)
};

stats.file_stats.push(FileStats {
size_class,
file_size,
// Documented as multiples of 512
allocated_size: blocks * 512,
mapped,
active,
dirty,
});
let mut stat: MaybeUninit<libc::stat> = MaybeUninit::uninit();
// SAFETY: File descriptor valid, stat object valid.
let ret = unsafe { libc::fstat(file.as_raw_fd(), stat.as_mut_ptr()) };
let stat = if ret == -1 {
None
} else {
// SAFETY: `stat` is initialized in the fstat non-error case.
Some(unsafe { stat.assume_init_ref() })
};

let (blocks, file_size) = stat.map_or((0, 0), |stat| {
(
stat.st_blocks.try_into().unwrap_or(0),
stat.st_size.try_into().unwrap_or(0),
)
});
FileStats {
size_class,
file_size,
// Documented as multiples of 512
allocated_size: blocks * 512,
mapped,
active,
dirty,
}
}
})
}

/// Statistics about lgalloc's internal behavior.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct LgAllocStats {
/// Per size-class statistics.
pub size_class: Vec<SizeClassStats>,
/// Per size-class and backing file statistics.
pub file_stats: Vec<FileStats>,
pub file_stats: Result<Vec<FileStats>, std::io::Error>,
}

/// Statistics per size class.
Expand Down Expand Up @@ -1037,13 +1055,10 @@ mod test {

use serial_test::serial;

use crate::{
allocate, deallocate, lgalloc_stats, AllocError, BackgroundWorkerConfig, Handle, LgAlloc,
LgAllocStats,
};
use super::*;

fn initialize() {
crate::lgalloc_set_config(
lgalloc_set_config(
LgAlloc::new()
.enable()
.with_background_config(BackgroundWorkerConfig {
Expand Down Expand Up @@ -1181,16 +1196,15 @@ mod test {
handle.join().unwrap();
}
std::thread::sleep(Duration::from_secs(1));
let mut stats = Default::default();
crate::lgalloc_stats(&mut stats);
let stats = lgalloc_stats();
println!("stats: {stats:?}");
Ok(())
}

#[test]
#[serial]
fn leak() -> Result<(), AllocError> {
crate::lgalloc_set_config(&crate::LgAlloc {
lgalloc_set_config(&LgAlloc {
enabled: Some(true),
path: Some(std::env::temp_dir()),
background_config: None,
Expand Down Expand Up @@ -1231,8 +1245,7 @@ mod test {
let (_ptr, _cap, handle) = allocate::<usize>(1024)?;
deallocate(handle);

let mut stats = LgAllocStats::default();
lgalloc_stats(&mut stats);
let stats = lgalloc_stats();

assert!(!stats.size_class.is_empty());

Expand All @@ -1242,7 +1255,7 @@ mod test {
#[test]
#[serial]
fn test_disable() {
crate::lgalloc_set_config(&*LgAlloc::new().disable());
lgalloc_set_config(&*LgAlloc::new().disable());
assert!(matches!(allocate::<u8>(1024), Err(AllocError::Disabled)));
}
}

0 comments on commit c9530d7

Please sign in to comment.