Skip to content

Commit

Permalink
Simplify error and pending handling
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Nov 6, 2023
1 parent d627322 commit 2a92792
Showing 1 changed file with 46 additions and 31 deletions.
77 changes: 46 additions & 31 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,15 @@ impl Client {
///
/// If the heartbeat task panicked.
#[allow(clippy::unwrap_in_result)]
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> {
fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
let is_canceled = self
.shutdown_tx
.as_mut()
.expect("only taken on drop")
.poll_canceled(cx)
.is_ready();

match self.heartbeat_task.poll_unpin(cx) {
let result = match self.heartbeat_task.poll_unpin(cx) {
Poll::Pending => {
if is_canceled {
self.set_task_exited_error(
Expand All @@ -448,7 +448,7 @@ impl Client {
)
} else {
// Heartbeat task is still running.
Ok(())
return Poll::Pending;
}
}
Poll::Ready(Ok(Ok(_))) => {
Expand Down Expand Up @@ -486,7 +486,9 @@ impl Client {
)
}
}
}
};

Poll::Ready(result)
}

/// Check if the connection's request/response task has exited.
Expand All @@ -497,17 +499,14 @@ impl Client {
/// # Panics
///
/// If the connection task panicked.
fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> {
match self.connection_task.poll_unpin(context) {
Poll::Pending => {
// Connection task is still running.
Ok(())
}
Poll::Ready(Ok(())) => {
fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
// Return `Pending` if the connection task is still running.
let result = match ready!(self.connection_task.poll_unpin(context)) {
Ok(()) => {
// Connection task stopped unexpectedly, without panicking.
self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
}
Poll::Ready(Err(error)) => {
Err(error) => {
// Connection task panicked.
let error = error.panic_if_task_has_panicked();

Expand All @@ -526,7 +525,9 @@ impl Client {
)
}
}
}
};

Poll::Ready(result)
}

/// Properly update the error slot after a background task has unexpectedly stopped.
Expand Down Expand Up @@ -569,6 +570,33 @@ impl Client {
}
}

/// Poll for space in the shared request sender channel, and for errors in the connection tasks.
///
/// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have
/// terminated. If there is no space in the channel, returns `Pending`, and schedules the task
/// for wakeup when there is space available, or one of the tasks terminates.
fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
// # Correctness
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup
// if either task exits, or if the heartbeat task drops the cancel handle.
//
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup.
//
// It's ok to exit early and skip wakeups when there is an error, because the connection
// and its tasks are shut down immediately on error.

let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?;
let _connection_pending: Poll<()> = self.poll_connection(cx)?;

// We're only pending if the sender channel is full.
self.poll_request(cx)
}

/// Shut down the resources held by the client half of this peer connection.
///
/// Stops further requests to the remote peer, and stops the heartbeat task.
Expand Down Expand Up @@ -599,33 +627,20 @@ impl Service<Request> for Client {
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// `check_heartbeat` and `check_connection` schedule the client task for wakeup
// if either task exits, or if the heartbeat task drops the cancel handle.
// `poll_client()` schedules the client task for wakeup if the sender channel has space,
// either connection task exits, or if the heartbeat task drops the cancel handle.

// Check all the tasks and channels.
//
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup.

// Check all the tasks
let heartbeat_result = self.check_heartbeat(cx);
let connection_result = self.check_connection(cx);
let sender_result = self.poll_request(cx);

// Requests should only wait if there is no space in the channel.
let is_pending = sender_result.is_pending();

// Combine the results, include the result inside the poll.
let result = heartbeat_result.and(connection_result).and_then(|_| {
let _pending: Poll<()> = sender_result?;
Ok(())
});
let result = ready!(self.poll_client(cx));

// Shut down the client and connection if there is an error.
if let Err(error) = result {
self.shutdown();

Poll::Ready(Err(error))
} else if is_pending {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
Expand Down

0 comments on commit 2a92792

Please sign in to comment.