diff --git a/src/common/buf.rs b/src/common/buf.rs new file mode 100644 index 0000000..3b42682 --- /dev/null +++ b/src/common/buf.rs @@ -0,0 +1,65 @@ +use hyper::body::Buf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +#[derive(Clone)] +pub struct EosSignaler { + notifier: Arc, +} + +impl EosSignaler { + fn notify_eos(&self) { + self.notifier.notify_waiters(); + } + + pub async fn wait_till_eos(self) { + self.notifier.notified().await; + } +} + +pub struct AlertOnEos { + inner: B, + signaler: EosSignaler, + // I'd rather we consumed the signaler, but it would require something like AtomicOption, + // arc_swap::ArcSwapOption was tried, but it only returns an Arc, and the value cannot be consumed (swapped). + // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/), + // but it requires both unsafe and additional heap allocation, which is not worth it. + has_already_signaled: AtomicBool, +} + +impl AlertOnEos { + pub fn new(inner: B) -> (Self, EosSignaler) { + let signal = EosSignaler { + notifier: Arc::new(Notify::new()), + }; + let this = Self { + inner, + signaler: signal.clone(), + has_already_signaled: AtomicBool::new(false), + }; + (this, signal) + } +} + +impl Buf for AlertOnEos { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) { + self.signaler.notify_eos(); + } + } +} + +#[cfg(test)] +mod tests { + +} \ No newline at end of file diff --git a/src/common/mod.rs b/src/common/mod.rs index 294040e..b8acb98 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -10,4 +10,5 @@ macro_rules! ready { } pub(crate) use ready; +pub mod buf; pub(crate) mod exec;