Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #20 from stevenroose/roundtripper
Browse files Browse the repository at this point in the history
Make jsonrpc client-agnostic
  • Loading branch information
apoelstra authored Sep 26, 2020
2 parents 49d64af + cbd6876 commit e651798
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 66 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ name = "jsonrpc"
path = "src/lib.rs"

[dependencies]
base64-compat = "1.0.0"
http = "0.1"

serde = "1"
serde_derive = "1"
serde_json = "1"
hyper = "0.10"

111 changes: 60 additions & 51 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,58 @@
//! and parsing responses
//!
use std::{error, io};
use std::collections::HashMap;
use std::io;
use std::io::Read;
use std::sync::{Arc, Mutex};

use hyper;
use hyper::client::Client as HyperClient;
use hyper::header::{Authorization, Basic, ContentType, Headers};
use serde;
use base64;
use http;
use serde_json;

use super::{Request, Response};
use util::HashableValue;
use error::Error;

/// An interface for an HTTP roundtripper that handles HTTP requests.
pub trait HttpRoundTripper {
/// The type of the http::Response body.
type ResponseBody: io::Read;
/// The type for errors generated by the roundtripper.
type Err: error::Error;

/// Make an HTTP request. In practice only POST request will be made.
fn request(
&self,
http::Request<&[u8]>,
) -> Result<http::Response<Self::ResponseBody>, Self::Err>;
}

/// A handle to a remote JSONRPC server
pub struct Client {
pub struct Client<R: HttpRoundTripper> {
url: String,
user: Option<String>,
pass: Option<String>,
client: HyperClient,
roundtripper: R,
nonce: Arc<Mutex<u64>>,
}

impl Client {
impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
/// Creates a new client
pub fn new(url: String, user: Option<String>, pass: Option<String>) -> Client {
pub fn new(
roundtripper: Rt,
url: String,
user: Option<String>,
pass: Option<String>,
) -> Client<Rt> {
// Check that if we have a password, we have a username; other way around is ok
debug_assert!(pass.is_none() || user.is_some());

Client {
url: url,
user: user,
pass: pass,
client: HyperClient::new(),
roundtripper: roundtripper,
nonce: Arc::new(Mutex::new(0)),
}
}
Expand All @@ -78,52 +95,30 @@ impl Client {
// Build request
let request_raw = serde_json::to_vec(body)?;

// Setup connection
let mut headers = Headers::new();
headers.set(ContentType::json());
// Send request
let mut request_builder = http::Request::post(&self.url);
request_builder.header("Content-Type", "application/json-rpc");

// Set Authorization header
if let Some(ref user) = self.user {
headers.set(Authorization(Basic {
username: user.clone(),
password: self.pass.clone(),
}));
let mut auth = user.clone();
auth.push(':');
if let Some(ref pass) = self.pass {
auth.push_str(&pass[..]);
}
let value = format!("Basic {}", &base64::encode(auth.as_bytes()));
request_builder.header("Authorization", value);
}

// Send request
let retry_headers = headers.clone();
let hyper_request = self.client.post(&self.url).headers(headers).body(&request_raw[..]);
let mut stream = match hyper_request.send() {
Ok(s) => s,
// Hyper maintains a pool of TCP connections to its various clients,
// and when one drops it cannot tell until it tries sending. In this
// case the appropriate thing is to re-send, which will cause hyper
// to open a new connection. Jonathan Reem explained this to me on
// IRC, citing vague technical reasons that the library itself cannot
// do the retry transparently.
Err(hyper::error::Error::Io(e)) => {
if e.kind() == io::ErrorKind::BrokenPipe
|| e.kind() == io::ErrorKind::ConnectionAborted
{
try!(self
.client
.post(&self.url)
.headers(retry_headers)
.body(&request_raw[..])
.send()
.map_err(Error::Hyper))
} else {
return Err(Error::Hyper(hyper::error::Error::Io(e)));
}
}
Err(e) => {
return Err(Error::Hyper(e));
}
};
// Errors only on invalid header or builder reuse.
let http_request = request_builder.body(&request_raw[..]).unwrap();

let http_response =
self.roundtripper.request(http_request).map_err(|e| Error::Http(Box::new(e)))?;

// nb we ignore stream.status since we expect the body
// to contain information about any error
let response: R = serde_json::from_reader(&mut stream)?;
stream.bytes().count(); // Drain the stream so it can be reused
Ok(response)
Ok(serde_json::from_reader(http_response.into_body())?)
}

/// Sends a request to a client
Expand Down Expand Up @@ -204,10 +199,24 @@ impl Client {
#[cfg(test)]
mod tests {
use super::*;
use std::io;

struct RT();
impl HttpRoundTripper for RT {
type ResponseBody = io::Empty;
type Err = io::Error;

fn request(
&self,
_: http::Request<&[u8]>,
) -> Result<http::Response<Self::ResponseBody>, Self::Err> {
Err(io::ErrorKind::Other.into())
}
}

#[test]
fn sanity() {
let client = Client::new("localhost".to_owned(), None, None);
let client = Client::new(RT(), "localhost".to_owned(), None, None);
assert_eq!(client.last_nonce(), 0);
let req1 = client.build_request("test", &[]);
assert_eq!(client.last_nonce(), 1);
Expand Down
17 changes: 5 additions & 12 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use std::{error, fmt};

use hyper;
use serde_json;

use Response;
Expand All @@ -29,8 +28,8 @@ use Response;
pub enum Error {
/// Json error
Json(serde_json::Error),
/// Client error
Hyper(hyper::error::Error),
/// HTTP client error
Http(Box<error::Error>),
/// Error response
Rpc(RpcError),
/// Response to a request did not have the expected nonce
Expand All @@ -53,12 +52,6 @@ impl From<serde_json::Error> for Error {
}
}

impl From<hyper::error::Error> for Error {
fn from(e: hyper::error::Error) -> Error {
Error::Hyper(e)
}
}

impl From<RpcError> for Error {
fn from(e: RpcError) -> Error {
Error::Rpc(e)
Expand All @@ -69,7 +62,7 @@ impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Json(ref e) => write!(f, "JSON decode error: {}", e),
Error::Hyper(ref e) => write!(f, "Hyper error: {}", e),
Error::Http(ref e) => write!(f, "HTTP error: {}", e),
Error::Rpc(ref r) => write!(f, "RPC error response: {:?}", r),
Error::BatchDuplicateResponseId(ref v) => {
write!(f, "duplicate RPC batch response ID: {}", v)
Expand All @@ -84,7 +77,7 @@ impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::Json(_) => "JSON decode error",
Error::Hyper(_) => "Hyper error",
Error::Http(_) => "HTTP error",
Error::Rpc(_) => "RPC error response",
Error::NonceMismatch => "Nonce of response did not match nonce of request",
Error::VersionMismatch => "`jsonrpc` field set to non-\"2.0\"",
Expand All @@ -100,7 +93,7 @@ impl error::Error for Error {
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Json(ref e) => Some(e),
Error::Hyper(ref e) => Some(e),
Error::Http(ref e) => Some(&**e),
_ => None,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
#![deny(unused_mut)]
#![warn(missing_docs)]

extern crate hyper;

extern crate base64;
extern crate http;
extern crate serde;
#[macro_use]
extern crate serde_derive;
Expand Down

0 comments on commit e651798

Please sign in to comment.