Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix gzip + chunk pool problem #550

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {

#[cfg(test)]
mod tests {
use crate::stream::remote_addr_for_test;
use std::io;

use crate::stream::{remote_addr_for_test, Stream};
use crate::ReadWrite;

use super::*;
Expand Down Expand Up @@ -437,4 +439,45 @@ mod tests {

assert_eq!(agent.state.pool.len(), 1);
}

// Test that a stream gets returned to the pool if it is gzip encoded and the gzip
// decoder reads the exact amount from a chunked stream, not past the 0. This
// happens because gzip has built-in knowledge of the length to read.
#[test]
#[cfg(feature = "gzip")]
fn read_exact_chunked_gzip() {
use crate::response::Compression;
use chunked_transfer::Decoder as ChunkDecoder;

let gz_body = vec![
b'E', b'\r', b'\n', // 14 first chunk
0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x03, 0xCB, 0x48, 0xCD, 0xC9,
b'\r', b'\n', //
b'E', b'\r', b'\n', // 14 second chunk
0xC9, 0x57, 0x28, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 0xC8, 0x18, 0xBC, 0x6C, 0x00, 0xA5,
b'\r', b'\n', //
b'7', b'\r', b'\n', // 7 third chunk
0x5C, 0x7C, 0xEF, 0xA7, 0x00, 0x00, 0x00, //
b'\r', b'\n', //
// end
b'0', b'\r', b'\n', //
b'\r', b'\n', //
];

let agent = Agent::new();
let url = Url::parse("https://example.com").unwrap();

assert_eq!(agent.state.pool.len(), 0);

let chunked = ChunkDecoder::new(io::Cursor::new(gz_body));
let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> =
Box::new(PoolReturnRead::new(&agent, &url, chunked));

let compression = Compression::Gzip;
let mut stream = compression.wrap_reader(pool_return_read);

io::copy(&mut stream, &mut io::sink()).unwrap();

assert_eq!(agent.state.pool.len(), 1);
}
}
146 changes: 142 additions & 4 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ impl Response {
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Compression {
pub(crate) enum Compression {
#[cfg(feature = "brotli")]
Brotli,
#[cfg(feature = "gzip")]
Expand All @@ -592,15 +592,112 @@ impl Compression {

/// Wrap the raw reader with a decompressing reader
#[allow(unused_variables)] // when no features enabled, reader is unused (unreachable)
fn wrap_reader(
pub(crate) fn wrap_reader(
self,
reader: Box<dyn Read + Send + Sync + 'static>,
) -> Box<dyn Read + Send + Sync + 'static> {
#[cfg(any(feature = "brotli", feature = "gzip"))]
let zero = precise::DidReadZero::new(reader);
match self {
#[cfg(feature = "brotli")]
Compression::Brotli => Box::new(BrotliDecoder::new(reader, 4096)),
Compression::Brotli => Box::new(PreciseRead2::new(BrotliDecoder::new(zero, 4096))),
#[cfg(feature = "gzip")]
Compression::Gzip => Box::new(GzDecoder::new(reader)),
Compression::Gzip => Box::new(precise::PreciseRead::new(GzDecoder::new(zero))),
}
}
}

#[cfg(any(feature = "brotli", feature = "gzip"))]
mod precise {
use std::io::{self, Read};

trait IntoInner<I> {
fn into_inner(self) -> I;
}

#[cfg(feature = "gzip")]
impl<I> IntoInner<I> for super::GzDecoder<I> {
fn into_inner(self) -> I {
Self::into_inner(self)
}
}

#[cfg(feature = "brotli")]
impl<I> IntoInner<I> for super::BrotliDecoder<I> {
fn into_inner(self) -> I {
Self::into_inner(self)
}
}

/// Inside decompression (gzip, brotli), we use PreciseRead to ensure the underlying
/// reader is read precisely to end. This is needed when we have the nesting
/// GzDecoder(ChunkedDecoder), because gzip (and brotli) has a "built in" idea
/// of how many bytes it's going to produce, which means it might omit reading
/// the 0\r\n\r\n after the last actual data in the chunked decoder.
pub(crate) struct PreciseRead<R>(Option<R>);

impl<R> PreciseRead<R> {
pub fn new(inner: R) -> Self {
PreciseRead(Some(inner))
}
}

impl<R: Read + IntoInner<DidReadZero>> Read for PreciseRead<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let reader = match &mut self.0 {
Some(v) => v,
None => return Ok(0),
};
let n = reader.read(buf)?;

if n == 0 {
let precise = self.0.take().expect("to only take PreciseRead once");
let mut inner = precise.into_inner();

// This is the whole point of PreciseRead. If we have not read to 0 for the
// inner reader, we must at this point attempt to read a single 0, so that
// we ensure the potential underlying chunked decoder consumes the last 0\r\n\r\n.
if !inner.did_read_zero() {
let mut dummy = [0_u8];
let n = inner.read(&mut dummy)?;
if n != 0 {
let error = "Unexpected data in response after compressed content end";
return Err(io::Error::new(io::ErrorKind::InvalidData, error));
}
}
}

Ok(n)
}
}

/// Helper type to know whether a PreciseRead has used it's inner reader to read a 0.
/// This is required because Gzip "knows" how many bytes to read to fulfil the compression,
/// which means a combination of gzip + chunked encoding does read the ChunkedDecoder to a
/// 0 leaving the final `0\r\n\r\n` unread. Unread bytes means the connection is not returned to
/// the pool.
pub(crate) struct DidReadZero(Box<dyn Read + Send + Sync + 'static>, bool);

impl DidReadZero {
pub fn new(r: Box<dyn Read + Send + Sync + 'static>) -> Self {
DidReadZero(r, false)
}

#[allow(unused)]
fn did_read_zero(&self) -> bool {
self.1
}
}

impl Read for DidReadZero {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.0.read(buf)?;

if n == 0 {
self.1 = true;
}

Ok(n)
}
}
}
Expand Down Expand Up @@ -1093,4 +1190,45 @@ mod tests {
println!("Response size: {}", size);
assert!(size < 400); // 200 on Macbook M1
}

#[test]
#[cfg(feature = "gzip")]
fn ensure_no_content_after_compressed() {
use crate::response::Compression;
use chunked_transfer::Decoder as ChunkDecoder;

let gz_body = vec![
b'E', b'\r', b'\n', // 14 first chunk
0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x03, 0xCB, 0x48, 0xCD, 0xC9,
b'\r', b'\n', //
b'E', b'\r', b'\n', // 14 second chunk
0xC9, 0x57, 0x28, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 0xC8, 0x18, 0xBC, 0x6C, 0x00, 0xA5,
b'\r', b'\n', //
b'7', b'\r', b'\n', // 7 third chunk
0x5C, 0x7C, 0xEF, 0xA7, 0x00, 0x00, 0x00, //
b'\r', b'\n', //
// THIS IS THE PROBLEM: We insert another chunk here, which is not expected for the gzip
// decoder (which finished after the last chunk). This should cause an error because we
// disallow garbage data after the compression end.
b'7', b'\r', b'\n', // 7 fourth chunk
0x5C, 0x7C, 0xEF, 0xA7, 0x00, 0x00, 0x00, //
b'\r', b'\n', //
// end
b'0', b'\r', b'\n', //
b'\r', b'\n', //
];

let chunked = Box::new(ChunkDecoder::new(io::Cursor::new(gz_body)));
let compression = Compression::Gzip;
let mut stream = compression.wrap_reader(chunked);

let r = io::copy(&mut stream, &mut io::sink());

let err = r.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert_eq!(
err.to_string(),
"Unexpected data in response after compressed content end"
);
}
}
14 changes: 14 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,20 @@ pub(crate) fn remote_addr_for_test() -> SocketAddr {
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
}

#[cfg(test)]
impl ReadWrite for io::Cursor<Vec<u8>> {
fn socket(&self) -> Option<&std::net::TcpStream> {
None
}
}

#[cfg(test)]
impl From<io::Cursor<Vec<u8>>> for Stream {
fn from(c: io::Cursor<Vec<u8>>) -> Self {
Stream::new(c, "1.1.1.1:8080".parse().unwrap())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down