Skip to content

Commit

Permalink
Optimize event serialization with pre-allocated buffer (#2794)
Browse files Browse the repository at this point in the history
* Optimize event serialization with pre-allocated buffer

- Added event_buffer field to LlmpEventManager
- Used to_slice instead of to_allocvec
- Pre-allocated buffer size is 4KB

Fixes #1082

* Fallback to to_allocvec in case of event_buffer overflow

Also combined the shared logic between compressed & uncompressed event
firing while keeping the same behavior

* Made the initial event_buffer size to a const

Also removed the unnecessary event_buffer.clear(), since we are already
resizing it
  • Loading branch information
mzfr authored Dec 30, 2024
1 parent 9309518 commit 8cd069c
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions libafl/src/events/llmp/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use crate::{
Error, HasMetadata,
};

/// Default initial capacity of the event buffer - 4KB
const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4;

/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, `llmp`.
pub struct LlmpEventManager<EMH, S, SP>
Expand Down Expand Up @@ -75,6 +78,7 @@ where
should_serialize_cnt: usize,
pub(crate) time_ref: Option<Handle<TimeObserver>>,
phantom: PhantomData<S>,
event_buffer: Vec<u8>,
}

impl LlmpEventManager<(), NopState<NopInput>, NopShMemProvider> {
Expand Down Expand Up @@ -165,6 +169,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
})
}

Expand Down Expand Up @@ -199,6 +204,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
})
}

Expand Down Expand Up @@ -233,6 +239,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
})
}

Expand Down Expand Up @@ -265,6 +272,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
})
}
}
Expand Down Expand Up @@ -409,6 +417,7 @@ where
+ EvaluatorObservers<E, Self, <S::Corpus as Corpus>::Input, S>
+ Evaluator<E, Self, <S::Corpus as Corpus>::Input, S>,
{
println!("Got event in client: {} from {:?}", event.name(), client_id);
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
Expand Down Expand Up @@ -512,44 +521,56 @@ where
true
}
}

#[cfg(feature = "llmp_compression")]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
#[cfg(feature = "llmp_compression")]
let flags = LLMP_FLAG_INITIALIZED;

match self.compressor.maybe_compress(&serialized) {
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
self.event_buffer.resize(self.event_buffer.capacity(), 0);

// Serialize the event, reallocating event_buffer if needed
let written_len = match postcard::to_slice(&event, &mut self.event_buffer) {
Ok(written) => written.len(),
Err(postcard::Error::SerializeBufferFull) => {
let serialized = postcard::to_allocvec(&event)?;
self.event_buffer = serialized;
self.event_buffer.len()
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Err(e) => return Err(Error::from(e)),
};

#[cfg(feature = "llmp_compression")]
{
match self
.compressor
.maybe_compress(&self.event_buffer[..written_len])
{
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
}
}
self.last_sent = current_time();

Ok(())
}
#[cfg(not(feature = "llmp_compression"))]
{
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len]);
}

#[cfg(not(feature = "llmp_compression"))]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
self.last_sent = current_time();
Ok(())
}

fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
where
OT: ObserversTuple<Self::Input, Self::State> + Serialize,
Expand Down

0 comments on commit 8cd069c

Please sign in to comment.