Skip to content

Commit

Permalink
feat(common): bytes::Buf wrapper that notifies subscribers on EOS
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkarw committed Aug 19, 2022
1 parent 9214294 commit d61db23
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/common/buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use hyper::body::Buf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct EosSignaler {
notifier: Arc<Notify>,
}

impl EosSignaler {
fn notify_eos(&self) {
self.notifier.notify_waiters();
}

pub async fn wait_till_eos(self) {
self.notifier.notified().await;
}
}

pub struct AlertOnEos<B> {
inner: B,
signaler: EosSignaler,
// I'd rather we consumed the signaler, but it would require something like AtomicOption,
// arc_swap::ArcSwapOption was tried, but it only returns an Arc, and the value cannot be consumed (swapped).
// One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
// but it requires both unsafe and additional heap allocation, which is not worth it.
has_already_signaled: AtomicBool,
}

impl<B> AlertOnEos<B> {
pub fn new(inner: B) -> (Self, EosSignaler) {
let signal = EosSignaler {
notifier: Arc::new(Notify::new()),
};
let this = Self {
inner,
signaler: signal.clone(),
has_already_signaled: AtomicBool::new(false),
};
(this, signal)
}
}

impl<B: Buf> Buf for AlertOnEos<B> {
fn remaining(&self) -> usize {
self.inner.remaining()
}

fn chunk(&self) -> &[u8] {
self.inner.chunk()
}

fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt);
if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
self.signaler.notify_eos();
}
}
}

#[cfg(test)]
mod tests {

}
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ macro_rules! ready {
}

pub(crate) use ready;
pub mod buf;
pub(crate) mod exec;

0 comments on commit d61db23

Please sign in to comment.