Skip to content

Commit

Permalink
clearing write readiness if TFO connect returns EINPROGRESS
Browse files Browse the repository at this point in the history
- ref #555
- imperfect until tokio-rs/tokio#3888 was merged
  • Loading branch information
zonyitoo committed Jun 25, 2021
1 parent 3743ffe commit f7cc085
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 160 deletions.
14 changes: 6 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,6 @@ byteorder = "1.3"
env_logger = "0.8"
byte_string = "1.0"
tokio = { version = "1", features = ["net", "time", "macros", "io-util"]}

[patch.crates-io]
tokio = { git = "https://github.com/zonyitoo/tokio.git" }
84 changes: 49 additions & 35 deletions crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ enum TcpStreamState {
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -107,59 +107,73 @@ impl AsyncRead for TcpStream {
impl AsyncWrite for TcpStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let this = self.as_mut().project();
let TcpStreamProj { inner, state } = self.project();

match *state {
TcpStreamState::Connected => return inner.poll_write(cx, buf),

match this.state {
TcpStreamState::FastOpenConnect(addr) => {
// TCP_FASTOPEN was supported since FreeBSD 12.0
//
// Example program:
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>

// Wait until socket is writable
ready!(this.inner.poll_write_ready(cx))?;

unsafe {
let saddr = SockAddr::from(*addr);

let ret = libc::sendto(
this.inner.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0, // Yes, BSD doesn't need MSG_FASTOPEN
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
// Connect successfully.
*(this.state) = TcpStreamState::Connected;
return Ok(ret as usize).into();
} else {
// Error occurs
let err = io::Error::last_os_error();

// EAGAIN, EWOULDBLOCK
if err.kind() != ErrorKind::WouldBlock {
let saddr = SockAddr::from(addr);

let stream = inner.get_mut();

// Ensure socket is writable
ready!(stream.poll_write_ready(cx))?;

let mut connecting = false;
let send_result = stream.try_write_io(|| {
unsafe {
let ret = libc::sendto(
stream.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0, // Yes, BSD doesn't need MSG_FASTOPEN
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
Ok(ret as usize)
} else {
// Error occurs
let err = io::Error::last_os_error();

// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
connecting = true;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors
return Err(err).into();
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
} else {
// Pending on poll_write_ready
}
}
});

match send_result {
Ok(n) => {
// Connected successfully with fast open
*state = TcpStreamState::Connected;
return Ok(n).into();
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock && connecting => {
// Connecting with normal TCP handshakes, write the first packet after connected
*state = TcpStreamState::Connected;
}
Err(err) => return Err(err).into(),
}
}

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
}
}
}
Expand Down
66 changes: 45 additions & 21 deletions crates/shadowsocks/src/net/sys/unix/bsd/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum TcpStreamState {
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -119,40 +119,64 @@ impl AsyncRead for TcpStream {
impl AsyncWrite for TcpStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let this = self.as_mut().project();
let TcpStreamProj { inner, state } = self.as_mut().project();

match *state {
TcpStreamState::Connected => return inner.poll_write(cx, buf),

match this.state {
TcpStreamState::FastOpenWrite => {
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
// so the first call of `send` will perform the actual SYN with TFO cookie.
//
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).

match ready!(this.inner.poll_write(cx, buf)) {
let stream = inner.get_mut();

// Ensure socket is writable
ready!(stream.poll_write_ready(cx))?;

let mut connecting = false;
let send_result = stream.try_write_io(|| {
unsafe {
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);
if ret >= 0 {
Ok(ret as usize)
} else {
let err = io::Error::last_os_error();
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
connecting = true;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors, including EAGAIN
Err(err)
}
}
}
});

match send_result {
Ok(n) => {
*(this.state) = TcpStreamState::Connected;
// Connected successfully with fast open
*state = TcpStreamState::Connected;
return Ok(n).into();
}
Err(err) => {
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
} else {
// Other errors
return Err(err).into();
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock && connecting => {
// Connecting with normal TCP handshakes, write the first packet after connected
*state = TcpStreamState::Connected;
}
Err(err) => return Err(err).into(),
}
}

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
}
}
}
Expand Down
Loading

0 comments on commit f7cc085

Please sign in to comment.