Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd support #1866

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,23 @@ jobs:
- name: windows / stable-x86_64-msvc
os: windows-latest
target: x86_64-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
- name: windows / stable-i686-msvc
os: windows-latest
target: i686-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
- name: windows / stable-x86_64-gnu
os: windows-latest
rust: stable-x86_64-pc-windows-gnu
target: x86_64-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
package_name: mingw-w64-x86_64-gcc
mingw64_path: "C:\\msys64\\mingw64\\bin"
- name: windows / stable-i686-gnu
os: windows-latest
rust: stable-i686-pc-windows-gnu
target: i686-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
package_name: mingw-w64-i686-gcc
mingw64_path: "C:\\msys64\\mingw32\\bin"

Expand All @@ -145,6 +145,8 @@ jobs:
features: "--features gzip,stream"
- name: "feat.: brotli"
features: "--features brotli,stream"
- name: "feat.: zstd"
features: "--features zstd,stream"
- name: "feat.: deflate"
features: "--features deflate,stream"
- name: "feat.: json"
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ gzip = ["dep:async-compression", "async-compression?/gzip", "dep:tokio-util"]

brotli = ["dep:async-compression", "async-compression?/brotli", "dep:tokio-util"]

zstd = ["dep:async-compression", "async-compression?/zstd", "dep:tokio-util"]

deflate = ["dep:async-compression", "async-compression?/zlib", "dep:tokio-util"]

json = ["dep:serde_json"]
Expand Down Expand Up @@ -167,6 +169,7 @@ hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-
serde = { version = "1.0", features = ["derive"] }
libflate = "1.0"
brotli_crate = { package = "brotli", version = "3.3.0" }
zstd_crate = { package = "zstd", version = "0.13" }
doc-comment = "0.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }
futures-util = { version = "0.3.0", default-features = false, features = ["std", "alloc"] }
Expand Down Expand Up @@ -258,6 +261,11 @@ name = "brotli"
path = "tests/brotli.rs"
required-features = ["brotli", "stream"]

[[test]]
name = "zstd"
path = "tests/zstd.rs"
required-features = ["zstd", "stream"]

[[test]]
name = "deflate"
path = "tests/deflate.rs"
Expand Down
40 changes: 40 additions & 0 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,29 @@ impl ClientBuilder {
self
}

/// Enable auto zstd decompression by checking the `Content-Encoding` response header.
///
/// If auto zstd decompression is turned on:
///
/// - When sending a request and if the request's headers do not already contain
/// an `Accept-Encoding` **and** `Range` values, the `Accept-Encoding` header is set to `zstd`.
/// The request body is **not** automatically compressed.
/// - When receiving a response, if its headers contain a `Content-Encoding` value of
/// `zstd`, both `Content-Encoding` and `Content-Length` are removed from the
/// headers' set. The response body is automatically decompressed.
///
/// If the `zstd` feature is turned on, the default option is enabled.
///
/// # Optional
///
/// This requires the optional `zstd` feature to be enabled
#[cfg(feature = "zstd")]
#[cfg_attr(docsrs, doc(cfg(feature = "zstd")))]
pub fn zstd(mut self, enable: bool) -> ClientBuilder {
self.config.accepts.zstd = enable;
self
}

/// Enable auto deflate decompression by checking the `Content-Encoding` response header.
///
/// If auto deflate decompression is turned on:
Expand Down Expand Up @@ -972,6 +995,23 @@ impl ClientBuilder {
}
}

/// Disable auto response body zstd decompression.
///
/// This method exists even if the optional `zstd` feature is not enabled.
/// This can be used to ensure a `Client` doesn't use zstd decompression
/// even if another dependency were to enable the optional `zstd` feature.
pub fn no_zstd(self) -> ClientBuilder {
#[cfg(feature = "zstd")]
{
self.zstd(false)
}

#[cfg(not(feature = "zstd"))]
{
self
}
}

/// Disable auto response body deflate decompression.
///
/// This method exists even if the optional `deflate` feature is not enabled.
Expand Down
144 changes: 128 additions & 16 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use async_compression::tokio::bufread::GzipDecoder;
#[cfg(feature = "brotli")]
use async_compression::tokio::bufread::BrotliDecoder;

#[cfg(feature = "zstd")]
use async_compression::tokio::bufread::ZstdDecoder;

#[cfg(feature = "deflate")]
use async_compression::tokio::bufread::ZlibDecoder;

Expand All @@ -19,9 +22,19 @@ use http::HeaderMap;
use hyper::body::Body as HttpBody;
use hyper::body::Frame;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::codec::{BytesCodec, FramedRead};
#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::io::StreamReader;

use super::body::ResponseBody;
Expand All @@ -33,6 +46,8 @@ pub(super) struct Accepts {
pub(super) gzip: bool,
#[cfg(feature = "brotli")]
pub(super) brotli: bool,
#[cfg(feature = "zstd")]
pub(super) zstd: bool,
#[cfg(feature = "deflate")]
pub(super) deflate: bool,
}
Expand All @@ -44,6 +59,8 @@ impl Accepts {
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "zstd")]
zstd: false,
#[cfg(feature = "deflate")]
deflate: false,
}
Expand All @@ -59,7 +76,12 @@ pub(crate) struct Decoder {

type PeekableIoStream = Peekable<IoStream>;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "zstd",
feature = "brotli",
feature = "deflate"
))]
type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;

enum Inner {
Expand All @@ -74,12 +96,21 @@ enum Inner {
#[cfg(feature = "brotli")]
Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Zstd` decoder will uncompress the zstd compressed response content before returning it.
#[cfg(feature = "zstd")]
Zstd(Pin<Box<FramedRead<ZstdDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Deflate` decoder will uncompress the deflated response content before returning it.
#[cfg(feature = "deflate")]
Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A decoder that doesn't have a value yet.
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Pending(Pin<Box<Pending>>),
}

Expand All @@ -93,6 +124,8 @@ enum DecoderType {
Gzip,
#[cfg(feature = "brotli")]
Brotli,
#[cfg(feature = "zstd")]
Zstd,
#[cfg(feature = "deflate")]
Deflate,
}
Expand Down Expand Up @@ -155,6 +188,21 @@ impl Decoder {
}
}

/// A zstd decoder.
///
/// This decoder will buffer and decompress chunks that are zstd compressed.
#[cfg(feature = "zstd")]
fn zstd(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
inner: Inner::Pending(Box::pin(Pending(
IoStream(body).peekable(),
DecoderType::Zstd,
))),
}
}

/// A deflate decoder.
///
/// This decoder will buffer and decompress chunks that are deflated.
Expand All @@ -170,7 +218,12 @@ impl Decoder {
}
}

#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool {
use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use log::warn;
Expand Down Expand Up @@ -225,6 +278,13 @@ impl Decoder {
}
}

#[cfg(feature = "zstd")]
{
if _accepts.zstd && Decoder::detect_encoding(_headers, "zstd") {
return Decoder::zstd(body);
}
}

#[cfg(feature = "deflate")]
{
if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") {
Expand All @@ -245,7 +305,12 @@ impl HttpBody for Decoder {
cx: &mut Context,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.inner {
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
Poll::Ready(Ok(inner)) => {
self.inner = inner;
Expand Down Expand Up @@ -277,6 +342,14 @@ impl HttpBody for Decoder {
None => Poll::Ready(None),
}
}
#[cfg(feature = "zstd")]
Inner::Zstd(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
}
}
#[cfg(feature = "deflate")]
Inner::Deflate(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Expand All @@ -292,7 +365,12 @@ impl HttpBody for Decoder {
match self.inner {
Inner::PlainText(ref body) => HttpBody::size_hint(body),
// the rest are "unknown", so default
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
_ => http_body::SizeHint::default(),
}
}
Expand Down Expand Up @@ -332,6 +410,11 @@ impl Future for Pending {
BrotliDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "zstd")]
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(FramedRead::new(
ZstdDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
Expand Down Expand Up @@ -381,22 +464,37 @@ impl Accepts {
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "zstd")]
zstd: false,
#[cfg(feature = "deflate")]
deflate: false,
}
}
*/

pub(super) fn as_str(&self) -> Option<&'static str> {
match (self.is_gzip(), self.is_brotli(), self.is_deflate()) {
(true, true, true) => Some("gzip, br, deflate"),
(true, true, false) => Some("gzip, br"),
(true, false, true) => Some("gzip, deflate"),
(false, true, true) => Some("br, deflate"),
(true, false, false) => Some("gzip"),
(false, true, false) => Some("br"),
(false, false, true) => Some("deflate"),
(false, false, false) => None,
match (
self.is_gzip(),
self.is_brotli(),
self.is_zstd(),
self.is_deflate(),
) {
(true, true, true, true) => Some("gzip, br, zstd, deflate"),
(true, true, false, true) => Some("gzip, br, deflate"),
(true, true, true, false) => Some("gzip, br, zstd"),
(true, true, false, false) => Some("gzip, br"),
(true, false, true, true) => Some("gzip, zstd, deflate"),
(true, false, false, true) => Some("gzip, zstd, deflate"),
(false, true, true, true) => Some("br, zstd, deflate"),
(false, true, false, true) => Some("br, zstd, deflate"),
(true, false, true, false) => Some("gzip, zstd"),
(true, false, false, false) => Some("gzip"),
(false, true, true, false) => Some("br, zstd"),
(false, true, false, false) => Some("br"),
(false, false, true, true) => Some("zstd, deflate"),
(false, false, true, false) => Some("zstd"),
(false, false, false, true) => Some("deflate"),
(false, false, false, false) => None,
}
}

Expand Down Expand Up @@ -424,6 +522,18 @@ impl Accepts {
}
}

fn is_zstd(&self) -> bool {
#[cfg(feature = "zstd")]
{
self.zstd
}

#[cfg(not(feature = "zstd"))]
{
false
}
}

fn is_deflate(&self) -> bool {
#[cfg(feature = "deflate")]
{
Expand All @@ -444,6 +554,8 @@ impl Default for Accepts {
gzip: true,
#[cfg(feature = "brotli")]
brotli: true,
#[cfg(feature = "zstd")]
zstd: true,
#[cfg(feature = "deflate")]
deflate: true,
}
Expand Down
Loading