Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

conditional compilation for async roundtripper #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ documentation = "https://docs.rs/jsonrpc/"
description = "Rust support for the JSON-RPC 2.0 protocol"
keywords = [ "protocol", "json", "http", "jsonrpc" ]
readme = "README.md"
edition = "2018"

[features]
async = []

[lib]
name = "jsonrpc"
Expand Down
267 changes: 191 additions & 76 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
//! and parsing responses
//!

use std::{error, io};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{error, io};

use serde;
use base64;
use http;
use serde;
use serde_json;

use super::{Request, Response};
use util::HashableValue;
use error::Error;
use crate::error::Error;
use crate::util::HashableValue;

/// An interface for an HTTP roundtripper that handles HTTP requests.
pub trait HttpRoundTripper {
Expand All @@ -38,30 +38,53 @@ pub trait HttpRoundTripper {
/// The type for errors generated by the roundtripper.
type Err: error::Error;

/// Make an HTTP request. In practice only POST request will be made.
/// Make a synchronous HTTP request. In practice only POST request will be made.
fn request(
&self,
http::Request<&[u8]>,
_request: http::Request<Vec<u8>>,
) -> Result<http::Response<Self::ResponseBody>, Self::Err>;
}

/// An interface for an asynchronous HTTP roundtripper that handles HTTP requests.
#[cfg(feature = "async")]
pub trait AsyncHttpRoundTripper {
/// The type of the http::Response body.
type ResponseBody: io::Read;
/// The type for errors generated by the roundtripper.
type Err: error::Error;

/// Make an asynchronous HTTP request. In practice only POST request will be made.
fn request<'life>(
&'life self,
_request: http::Request<Vec<u8>>,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<http::Response<Self::ResponseBody>, Self::Err>>
+ Send
+ 'life,
>,
>
where
Self: Sync + 'life;
}

/// A handle to a remote JSONRPC server
pub struct Client<R: HttpRoundTripper> {
pub struct Client<R> {
url: String,
user: Option<String>,
pass: Option<String>,
roundtripper: R,
nonce: Arc<Mutex<u64>>,
}

impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
impl<R> Client<R> {
/// Creates a new client
pub fn new(
roundtripper: Rt,
roundtripper: R,
url: String,
user: Option<String>,
pass: Option<String>,
) -> Client<Rt> {
) -> Client<R> {
// Check that if we have a password, we have a username; other way around is ok
debug_assert!(pass.is_none() || user.is_some());

Expand All @@ -74,23 +97,25 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
}
}

/// Make a request and deserialize the response
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
/// Builds a request
pub fn build_request<'a, 'b>(
&self,
rpc_name: &str,
args: &[serde_json::value::Value],
) -> Result<T, Error> {
let request = self.build_request(rpc_name, args);
let response = self.send_request(&request)?;

Ok(response.into_result()?)
name: &'a str,
params: &'b [serde_json::Value],
) -> Request<'a, 'b> {
let mut nonce = self.nonce.lock().unwrap();
*nonce += 1;
Request {
method: name,
params: params,
id: From::from(*nonce),
jsonrpc: Some("2.0"),
}
}

/// The actual send logic used by both [send_request] and [send_batch].
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
fn build_http_request<B>(&self, body: &B) -> Result<http::Request<Vec<u8>>, Error>
where
B: serde::ser::Serialize,
R: for<'de> serde::de::Deserialize<'de>,
{
// Build request
let request_raw = serde_json::to_vec(body)?;
Expand All @@ -111,89 +136,179 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
}

// Errors only on invalid header or builder reuse.
let http_request = request_builder.body(&request_raw[..]).unwrap();
Ok(request_builder.body(request_raw).unwrap())
}

let http_response =
self.roundtripper.request(http_request).map_err(|e| Error::Http(Box::new(e)))?;
/// Accessor for the last-used nonce
pub fn last_nonce(&self) -> u64 {
*self.nonce.lock().unwrap()
}
}

impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
/// Make a request and deserialize the response
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
&self,
rpc_name: &str,
args: &[serde_json::value::Value],
) -> Result<T, Error> {
let request = self.build_request(rpc_name, args);
let response = self.send_request(&request)?;
Ok(response.into_result()?)
}

/// The actual send logic used by both [send_request] and [send_batch].
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
where
B: serde::ser::Serialize,
R: for<'de> serde::de::Deserialize<'de>,
{
let http_request = self.build_http_request(body)?;

let http_response = self
.roundtripper
.request(http_request)
.map_err(|e| Error::Http(Box::new(e)))?;

// nb we ignore stream.status since we expect the body
// to contain information about any error
Ok(serde_json::from_reader(http_response.into_body())?)
}

/// Sends a request to a client
pub fn send_request(&self, request: &Request) -> Result<Response, Error> {
pub fn send_request<'a, 'b>(&self, request: &Request<'a, 'b>) -> Result<Response, Error> {
let response: Response = self.send_raw(&request)?;
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
return Err(Error::VersionMismatch);
}
if response.id != request.id {
return Err(Error::NonceMismatch);
}
Ok(response)
validate_response(request, response)
}

/// Sends a batch of requests to the client. The return vector holds the response
/// for the request at the corresponding index. If no response was provided, it's [None].
/// Sends a batch of requests to the client. The return vector holds the response
/// for the request at the corresponding index. If no response was provided, it's [None].
///
/// Note that the requests need to have valid IDs, so it is advised to create the requests
/// with [build_request].
pub fn send_batch(&self, requests: &[Request]) -> Result<Vec<Option<Response>>, Error> {
pub fn send_batch<'a, 'b>(
&self,
requests: &[Request<'a, 'b>],
) -> Result<Vec<Option<Response>>, Error> {
if requests.len() < 1 {
return Err(Error::EmptyBatch);
}

// If the request body is invalid JSON, the response is a single response object.
// We ignore this case since we are confident we are producing valid JSON.
let responses: Vec<Response> = self.send_raw(&requests)?;
if responses.len() > requests.len() {
return Err(Error::WrongBatchResponseSize);
}

// To prevent having to clone responses, we first copy all the IDs so we can reference
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
// should be inexpensive and require no allocations as Numbers are more common.
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
// First index responses by ID and catch duplicate IDs.
let mut resp_by_id = HashMap::new();
for (id, resp) in ids.iter().zip(responses.into_iter()) {
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
return Err(Error::BatchDuplicateResponseId(dup.id));
}
}
// Match responses to the requests.
let results =
requests.into_iter().map(|r| resp_by_id.remove(&HashableValue(&r.id))).collect();

// Since we're also just producing the first duplicate ID, we can also just produce the
// first incorrect ID in case there are multiple.
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
return Err(Error::WrongBatchResponseId(incorrect.1.id));
}
validate_batch_response(requests, responses)
}
}

Ok(results)
#[cfg(feature = "async")]
impl<Rt: AsyncHttpRoundTripper + 'static + Sync> Client<Rt> {
/// Make a request and deserialize the response
pub async fn do_rpc_async<T: for<'a> serde::de::Deserialize<'a>>(
&self,
rpc_name: &str,
args: &[serde_json::value::Value],
) -> Result<T, Error> {
let request = self.build_request(rpc_name, args);
let response = self.send_request_async(&request).await?;
Ok(response.into_result()?)
}

/// Builds a request
pub fn build_request<'a, 'b>(
/// The actual send logic used by both [send_request] and [send_batch].
async fn send_raw_async<B, R>(&self, body: &B) -> Result<R, Error>
where
B: serde::ser::Serialize,
R: for<'de> serde::de::Deserialize<'de>,
{
let http_request = self.build_http_request(body)?;

let http_response = self
.roundtripper
.request(http_request)
.await
.map_err(|e| Error::Http(Box::new(e)))?;

// nb we ignore stream.status since we expect the body
// to contain information about any error
Ok(serde_json::from_reader(http_response.into_body())?)
}

/// Sends a request to a client
pub async fn send_request_async<'a, 'b>(
&self,
name: &'a str,
params: &'b [serde_json::Value],
) -> Request<'a, 'b> {
let mut nonce = self.nonce.lock().unwrap();
*nonce += 1;
Request {
method: name,
params: params,
id: From::from(*nonce),
jsonrpc: Some("2.0"),
request: &Request<'a, 'b>,
) -> Result<Response, Error> {
let response: Response = self.send_raw_async(&request).await?;
validate_response(request, response)
}

/// Sends a batch of requests to the client. The return vector holds the response
/// for the request at the corresponding index. If no response was provided, it's [None].
///
/// Note that the requests need to have valid IDs, so it is advised to create the requests
/// with [build_request].
pub async fn send_batch_async<'a, 'b>(
&self,
requests: &[Request<'a, 'b>],
) -> Result<Vec<Option<Response>>, Error> {
if requests.len() < 1 {
return Err(Error::EmptyBatch);
}

// If the request body is invalid JSON, the response is a single response object.
// We ignore this case since we are confident we are producing valid JSON.
let responses: Vec<Response> = self.send_raw_async(&requests).await?;

validate_batch_response(requests, responses)
}
}

fn validate_response<'a, 'b>(
request: &Request<'a, 'b>,
response: Response,
) -> Result<Response, Error> {
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
return Err(Error::VersionMismatch);
}
if response.id != request.id {
return Err(Error::NonceMismatch);
}
Ok(response)
}

fn validate_batch_response<'a, 'b>(
requests: &[Request<'a, 'b>],
responses: Vec<Response>,
) -> Result<Vec<Option<Response>>, Error> {
if responses.len() > requests.len() {
return Err(Error::WrongBatchResponseSize);
}

// To prevent having to clone responses, we first copy all the IDs so we can reference
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
// should be inexpensive and require no allocations as Numbers are more common.
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
// First index responses by ID and catch duplicate IDs.
let mut resp_by_id = HashMap::new();
for (id, resp) in ids.iter().zip(responses.into_iter()) {
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
return Err(Error::BatchDuplicateResponseId(dup.id));
}
}
// Match responses to the requests.
let results = requests
.into_iter()
.map(|r| resp_by_id.remove(&HashableValue(&r.id)))
.collect();

/// Accessor for the last-used nonce
pub fn last_nonce(&self) -> u64 {
*self.nonce.lock().unwrap()
// Since we're also just producing the first duplicate ID, we can also just produce the
// first incorrect ID in case there are multiple.
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
return Err(Error::WrongBatchResponseId(incorrect.1.id));
}

Ok(results)
}

#[cfg(test)]
Expand All @@ -208,7 +323,7 @@ mod tests {

fn request(
&self,
_: http::Request<&[u8]>,
_: http::Request<Vec<u8>>,
) -> Result<http::Response<Self::ResponseBody>, Self::Err> {
Err(io::ErrorKind::Other.into())
}
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{error, fmt};

use serde_json;

use Response;
use crate::Response;

/// A library error
#[derive(Debug)]
Expand Down
Loading