Skip to content

Commit

Permalink
feat(futures-bounded): add support for streams
Browse files Browse the repository at this point in the history
This is the next logical extension to the `futures-bounded` crate by adding support for streams. For the moment, this isn't used in `rust-libp2p` but given that it is a general-purpose crate, putting this code here makes sense.

Related: firezone/firezone#2279.

Pull-Request: #4616.
  • Loading branch information
thomaseizinger authored Oct 25, 2023
1 parent 7bbca11 commit 8b3d4e4
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ resolver = "2"
rust-version = "1.73.0"

[workspace.dependencies]
futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" }
futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" }
libp2p = { version = "0.53.0", path = "libp2p" }
libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" }
Expand Down
5 changes: 5 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.0

- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`.
See [PR 4616](https://github.com/libp2p/rust-lib2pp/pulls/4616).

## 0.1.0

Initial release.
2 changes: 1 addition & 1 deletion misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bounded"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
rust-version.workspace = true
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};

use crate::Timeout;
use crate::{PushError, Timeout};

/// Represents a map of [`Future`]s.
///
Expand All @@ -23,15 +23,6 @@ pub struct FuturesMap<ID, O> {
full_waker: Option<Waker>,
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<F> {
/// The length of the set is equal to the capacity
BeyondCapacity(F),
/// The set already contains the given future's ID
ReplacedFuture(F),
}

impl<ID, O> FuturesMap<ID, O> {
pub fn new(timeout: Duration, capacity: usize) -> Self {
Self {
Expand All @@ -54,7 +45,7 @@ where
/// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity],
/// that contains the passed future. In that case, the future is not inserted to the map.
/// If a future with the given `future_id` already exists, then the old future will be replaced by a new one.
/// In that case, the returned error [PushError::ReplacedFuture] contains the old future.
/// In that case, the returned error [PushError::Replaced] contains the old future.
pub fn try_push<F>(&mut self, future_id: ID, future: F) -> Result<(), PushError<BoxFuture<O>>>
where
F: Future<Output = O> + Send + 'static,
Expand Down Expand Up @@ -88,7 +79,7 @@ where
},
);

Err(PushError::ReplacedFuture(old_future.inner))
Err(PushError::Replaced(old_future.inner))
}
}
}
Expand Down Expand Up @@ -187,7 +178,7 @@ mod tests {
assert!(futures.try_push("ID", ready(())).is_ok());
matches!(
futures.try_push("ID", ready(())),
Err(PushError::ReplacedFuture(_))
Err(PushError::Replaced(_))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<O> FuturesSet<O> {
match self.inner.try_push(self.id, future) {
Ok(()) => Ok(()),
Err(PushError::BeyondCapacity(w)) => Err(w),
Err(PushError::ReplacedFuture(_)) => unreachable!("we never reuse IDs"),
Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
}
}

Expand Down
24 changes: 20 additions & 4 deletions misc/futures-bounded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
mod map;
mod set;
mod futures_map;
mod futures_set;
mod stream_map;
mod stream_set;

pub use futures_map::FuturesMap;
pub use futures_set::FuturesSet;
pub use stream_map::StreamMap;
pub use stream_set::StreamSet;

pub use map::{FuturesMap, PushError};
pub use set::FuturesSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;
Expand All @@ -25,4 +30,15 @@ impl fmt::Display for Timeout {
}
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<T> {
/// The length of the set is equal to the capacity
BeyondCapacity(T),
/// The map already contained an item with this key.
///
/// The old item is returned.
Replaced(T),
}

impl std::error::Error for Timeout {}
Loading

0 comments on commit 8b3d4e4

Please sign in to comment.