Skip to content

Commit

Permalink
Merge pull request meqif#9 from vinipsmaker/crust-set_read_timeout
Browse files Browse the repository at this point in the history
Crust set read timeout
  • Loading branch information
Peter Jankuliak committed Dec 28, 2015
2 parents 2520c9c + ab5f012 commit c452483
Showing 1 changed file with 53 additions and 33 deletions.
86 changes: 53 additions & 33 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use util::{now_microseconds, ewma, abs_diff};
use packet::{Packet, PacketType, Encodable, Decodable, ExtensionType, HEADER_SIZE};
use rand::{self, Rng};
use time::SteadyTime;
use time;
use std::time::Duration;

// For simplicity's sake, let us assume no packet will ever exceed the
Expand Down Expand Up @@ -188,7 +189,12 @@ pub struct UtpSocket {
pub max_retransmission_retries: u32,

/// Used by `set_read_timeout`.
user_read_timeout: i64,
user_read_timeout: u64,

/// The last time congestion algorithm was updated/handled-a-timeout
last_congestion_update: SteadyTime,

retries: u32,
}

impl UtpSocket {
Expand Down Expand Up @@ -236,6 +242,8 @@ impl UtpSocket {
cwnd: INIT_CWND * MSS,
max_retransmission_retries: MAX_RETRANSMISSION_RETRIES,
user_read_timeout: 0,
last_congestion_update: SteadyTime::now(),
retries: 0,
}
}

Expand Down Expand Up @@ -460,8 +468,7 @@ impl UtpSocket {
// Receive JAKE
let mut buf = [0; BUF_SIZE];
while self.state != SocketState::Closed {
let user_read_timeout = self.user_read_timeout;
try!(self.recv(&mut buf, user_read_timeout != 0));
try!(self.recv(&mut buf, false));
}

Ok(())
Expand Down Expand Up @@ -490,8 +497,7 @@ impl UtpSocket {
return Ok((0, self.connected_to));
}

let user_read_timeout = self.user_read_timeout;
match self.recv(buf, user_read_timeout != 0) {
match self.recv(buf, true) {
Ok((0, _src)) => continue,
Ok(x) => return Ok(x),
Err(e) => return Err(e)
Expand All @@ -502,7 +508,7 @@ impl UtpSocket {

/// Changes read operations to block for at most the specified number of
/// milliseconds.
pub fn set_read_timeout(&mut self, user_timeout: Option<i64>) {
pub fn set_read_timeout(&mut self, user_timeout: Option<u64>) {
self.user_read_timeout = match user_timeout {
Some(t) => {
if t > 0 {
Expand All @@ -520,17 +526,17 @@ impl UtpSocket {
let mut b = [0; BUF_SIZE + HEADER_SIZE];
let now = SteadyTime::now();
let (read, src);
let mut retries = 0;
let user_timeout = if use_user_timeout {
self.user_read_timeout
} else {
0
};
let use_user_timeout = user_timeout != 0;

// Try to receive a packet and handle timeouts
loop {
// Abort loop if the current try exceeds the maximum number of retransmission retries.
if retries >= self.max_retransmission_retries {
if self.retries >= self.max_retransmission_retries {
debug!("exceeds max_retransmission_retries : {} ; current connect state is : {:?}",
self.max_retransmission_retries, self.state);
self.state = SocketState::Closed;
Expand All @@ -539,26 +545,32 @@ impl UtpSocket {
return Err(Error::from(SocketError::ConnectionTimedOut));
}

let timeout;
let congestion_timeout = if self.state != SocketState::New {
debug!("setting read timeout of {} ms", self.congestion_timeout);
Some(Duration::from_millis(self.congestion_timeout))
} else { None };
{
let user_timeout = Duration::from_millis(user_timeout);
timeout = if use_user_timeout {
match congestion_timeout {
Some(congestion_timeout) => {
use std::cmp::min;
Some(min(congestion_timeout, user_timeout))
},
None => Some(user_timeout),
}
} else {
congestion_timeout
};
}

let timeout = if user_timeout != 0 {
match congestion_timeout {
Some(congestion_timeout) => {
use std::cmp::min;
Some(min(congestion_timeout, Duration::from_millis(user_timeout as u64)))
},
None => Some(Duration::from_millis(user_timeout as u64))
if use_user_timeout {
let user_timeout
= time::Duration::milliseconds(user_timeout as i64);
if (SteadyTime::now() - now) >= user_timeout {
return Err(Error::from(SocketError::UserTimedOut));
}
} else {
congestion_timeout
};

if user_timeout != 0
&& (SteadyTime::now() - now).num_milliseconds() >= user_timeout {
return Err(Error::from(SocketError::UserTimedOut));
}

self.socket.set_read_timeout(timeout).expect("Error setting read timeout");
Expand All @@ -567,16 +579,29 @@ impl UtpSocket {
Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
e.kind() == ErrorKind::TimedOut) => {
debug!("recv_from timed out");
try!(self.handle_receive_timeout(user_timeout != 0));
let now = SteadyTime::now();
let congestion_timeout = {
time::Duration::milliseconds(self.congestion_timeout
as i64)
};
if !use_user_timeout
|| ((now - self.last_congestion_update)
>= congestion_timeout) {
self.last_congestion_update = now;
try!(self.handle_receive_timeout());
self.retries += 1;
}
},
Err(e) => return Err(e),
};

let elapsed = (SteadyTime::now() - now).num_milliseconds();
debug!("{} ms elapsed", elapsed);
retries += 1;
}

self.last_congestion_update = SteadyTime::now();
self.retries = 0;

// Decode received data into a packet
let packet = match Packet::from_bytes(&b[..read]) {
Ok(packet) => packet,
Expand Down Expand Up @@ -608,11 +633,8 @@ impl UtpSocket {
Ok((read, src))
}

fn handle_receive_timeout(&mut self, keep_current_timeout: bool)
-> Result<()> {
if !keep_current_timeout {
self.congestion_timeout *= 2
}
fn handle_receive_timeout(&mut self) -> Result<()> {
self.congestion_timeout *= 2;
self.cwnd = MSS;

// There are three possible cases here:
Expand Down Expand Up @@ -778,8 +800,7 @@ impl UtpSocket {
let mut buf = [0u8; BUF_SIZE];
while !self.send_window.is_empty() {
debug!("packets in send window: {}", self.send_window.len());
let user_read_timeout = self.user_read_timeout;
try!(self.recv(&mut buf, user_read_timeout != 0));
try!(self.recv(&mut buf, false));
}

Ok(())
Expand Down Expand Up @@ -811,8 +832,7 @@ impl UtpSocket {
debug!("self.duplicate_ack_count: {}", self.duplicate_ack_count);
debug!("now_microseconds() - now = {}", now_microseconds() - now);
let mut buf = [0; BUF_SIZE];
let user_read_timeout = self.user_read_timeout;
try!(self.recv(&mut buf, user_read_timeout != 0));
try!(self.recv(&mut buf, false));
}
debug!("out: now_microseconds() - now = {}", now_microseconds() - now);

Expand Down

0 comments on commit c452483

Please sign in to comment.