Skip to content
This repository has been archived by the owner on Apr 16, 2020. It is now read-only.

Introduce client::Background #46

Merged
merged 5 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/client/background.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::body::LiftBody;
use futures::{Future, Poll};
use http_body::Body as HttpBody;
use hyper::client::conn::Connection as HyperConnection;
use log::error;
use std::fmt::{self, Debug};
use tokio_io::{AsyncRead, AsyncWrite};

/// Background task for a client connection.
///
/// This type is not used directly by a user of this library,
/// but it can show up in trait bounds of generic types.
pub struct Background<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
{
connection: HyperConnection<T, LiftBody<B>>,
}

impl<T, B> Background<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
{
pub(super) fn new(connection: HyperConnection<T, LiftBody<B>>) -> Self {
Background { connection }
}
}

impl<T, B> Future for Background<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
{
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.connection.poll().map_err(|e| {
error!("error with hyper: {}", e);
})
}
}

impl<T, B> Debug for Background<T, B>
where
T: Debug + AsyncRead + AsyncWrite + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Background")
.field("connection", &self.connection)
.finish()
}
}
27 changes: 14 additions & 13 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use super::Connection;
use super::{Background, Connection};
use crate::body::LiftBody;
use futures::{future::MapErr, try_ready, Async, Future, Poll};
use futures::{try_ready, Async, Future, Poll};
use http::Version;
use http_body::Body as HttpBody;
use http_connection::HttpConnection;
use hyper::body::Payload;
use hyper::client::conn::{Builder, Connection as HyperConnection, Handshake};
use hyper::client::conn::{Builder, Handshake};
use hyper::Error;
use log::error;
use std::fmt;
use std::marker::PhantomData;
use tokio_executor::{DefaultExecutor, TypedExecutor};
Expand All @@ -29,11 +27,12 @@ pub struct Connect<A, B, C, E> {
}

/// Executor that will spawn the background connection task.
pub trait ConnectExecutor<T, B>:
mzabaluev marked this conversation as resolved.
Show resolved Hide resolved
TypedExecutor<MapErr<HyperConnection<T, B>, fn(hyper::Error) -> ()>>
pub trait ConnectExecutor<T, B>: TypedExecutor<Background<T, B>>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
{
}

Expand Down Expand Up @@ -75,8 +74,10 @@ pub enum ConnectError<T> {
impl<E, T, B> ConnectExecutor<T, B> for E
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
E: TypedExecutor<MapErr<HyperConnection<T, B>, fn(hyper::Error) -> ()>>,
B: HttpBody + Send + 'static,
mzabaluev marked this conversation as resolved.
Show resolved Hide resolved
B::Data: Send,
B::Error: Into<crate::Error>,
E: TypedExecutor<Background<T, B>>,
{
}

Expand Down Expand Up @@ -136,7 +137,7 @@ where
B::Data: Send,
B::Error: Into<crate::Error>,
C::Connection: Send + 'static,
E: ConnectExecutor<C::Connection, LiftBody<B>> + Clone,
E: ConnectExecutor<C::Connection, B> + Clone,
{
type Response = Connection<B>;
type Error = ConnectError<C::Error>;
Expand Down Expand Up @@ -170,7 +171,7 @@ where
B::Data: Send,
B::Error: Into<crate::Error>,
C::Connection: Send + 'static,
E: ConnectExecutor<C::Connection, LiftBody<B>>,
E: ConnectExecutor<C::Connection, B>,
{
type Item = Connection<B>;
type Error = ConnectError<C::Error>;
Expand All @@ -187,7 +188,7 @@ where
let (sender, conn) = try_ready!(fut.poll().map_err(ConnectError::Handshake));

self.exec
.spawn(conn.map_err(|e| error!("error with hyper: {}", e)))
.spawn(Background::new(conn))
.map_err(|_| ConnectError::SpawnError)?;

let connection = Connection::new(sender);
Expand Down
4 changes: 3 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
//! [`Client`]: ./struct.Client.html
//! [`hyper::Client`]: ../../hyper/struct.Client.html

mod background;
mod connect;
mod connection;
mod future;

pub use self::connect::{Connect, ConnectError};
pub use self::background::Background;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the type needs to be re-exported. It can be "sealed".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the public ConnectExecutor, the only reason to have Background public is for completeness of documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is it is now part of the public API, meaning changes to the type == breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not published in a crates.io release, so it's not too late for #49 to fix this 😄

pub use self::connect::{Connect, ConnectError, ConnectExecutor};
pub use self::connection::Connection;
use self::future::ResponseFuture;
pub use hyper::client::conn::Builder;
Expand Down