Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(net): Fix potential network hangs, and reduce code complexity #7859

Merged
merged 27 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a8b065b
Refactor return type of poll_discover()
teor2345 Oct 27, 2023
728efad
Simplify poll_ready() by removing preselected peers
teor2345 Oct 27, 2023
dbb4e1c
Fix peer set readiness check
teor2345 Oct 27, 2023
a65b831
Pass task context correctly to background tasks
teor2345 Oct 27, 2023
8d22afd
Make poll_discover() return Pending
teor2345 Oct 27, 2023
c96cd5b
Make poll_inventory() return Pending
teor2345 Oct 27, 2023
2eafe95
Make poll_unready() return Poll::Pending
teor2345 Oct 27, 2023
248dce0
Simplify with futures::ready!() and ?
teor2345 Oct 27, 2023
533044b
When there are no peers, wake on newly ready peers, or new peers, in …
teor2345 Oct 27, 2023
9c6c54c
Preserve the original waker when there are no unready peers
teor2345 Oct 27, 2023
2ca7842
Fix polling docs and remove unnecessary code
teor2345 Nov 5, 2023
8d5c845
Make sure we're ignoring Poll::Pending not Result
teor2345 Nov 5, 2023
67a2fc6
Make panic checking method names clearer
teor2345 Nov 6, 2023
8181cd3
Fix connection Client task wakeups and error handling
teor2345 Nov 6, 2023
5fb5faa
Cleanup connection panic handling and add wakeup docs
teor2345 Nov 6, 2023
51890e9
Fix connection client task wakeups to prevent hangs
teor2345 Nov 6, 2023
ffe0eb6
Simplify error and pending handling
teor2345 Nov 6, 2023
a14fa12
Clarify inventory set behaviour
teor2345 Nov 14, 2023
6bdf6c1
Define peer set poll_* methods so they return Ok if they do something
teor2345 Nov 14, 2023
21b5c39
Clarify documentation
teor2345 Nov 7, 2023
a7a4a37
Fix test that depended on preselected peers
teor2345 Nov 14, 2023
5db0635
Check ready peers for errors before sending requests to them
teor2345 Nov 14, 2023
8e20387
Fix a hanging test by not waiting for irrelevant actions
teor2345 Nov 14, 2023
598a9ef
Only remove cancel handles when they are required
teor2345 Nov 14, 2023
378ba7c
fix incorrect panic on termination setting
teor2345 Nov 14, 2023
798b1ca
Clarify method comments
teor2345 Nov 16, 2023
7b63bef
Merge branch 'main' into peerset-poll
teor2345 Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 66 additions & 4 deletions zebra-chain/src/diagnostic/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod future;
pub mod thread;

/// A trait that checks a task's return value for panics.
pub trait CheckForPanics {
pub trait CheckForPanics: Sized {
/// The output type, after removing panics from `Self`.
type Output;

Expand All @@ -20,11 +20,34 @@ pub trait CheckForPanics {
///
/// If `self` contains a panic payload or an unexpected termination.
#[track_caller]
fn check_for_panics(self) -> Self::Output;
fn panic_if_task_has_finished(self) -> Self::Output {
self.check_for_panics_with(true)
}

/// Check if `self` contains a panic payload, then panic.
/// Otherwise, return the non-panic part of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload.
#[track_caller]
fn panic_if_task_has_panicked(self) -> Self::Output {
self.check_for_panics_with(false)
}

/// Check if `self` contains a panic payload, then panic. Also panics if
/// `panic_on_unexpected_termination` is true, and `self` is an unexpected termination.
/// Otherwise, return the non-panic part of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload, or if we're panicking on unexpected terminations.
#[track_caller]
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output;
}

/// A trait that waits for a task to finish, then handles panics and cancellations.
pub trait WaitForPanics {
pub trait WaitForPanics: Sized {
/// The underlying task output, after removing panics and unwrapping termination results.
type Output;

Expand All @@ -43,5 +66,44 @@ pub trait WaitForPanics {
///
/// If `self` contains an expected termination, and we're shutting down anyway.
#[track_caller]
fn wait_for_panics(self) -> Self::Output;
fn wait_and_panic_on_unexpected_termination(self) -> Self::Output {
self.wait_for_panics_with(true)
}

/// Waits for `self` to finish, then check if its output is:
/// - a panic payload: resume that panic,
/// - a task termination: hang waiting for shutdown.
///
/// Otherwise, returns the task return value of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload.
///
/// # Hangs
///
/// If `self` contains a task termination.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
self.wait_for_panics_with(false)
}

/// Waits for `self` to finish, then check if its output is:
/// - a panic payload: resume that panic,
/// - an unexpected termination:
/// - if `panic_on_unexpected_termination` is true, panic with that error,
/// - otherwise, hang waiting for shutdown,
/// - an expected termination: hang waiting for shutdown.
///
/// Otherwise, returns the task return value of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload, or if we're panicking on unexpected terminations.
///
/// # Hangs
///
/// If `self` contains an expected or ignored termination, and we're shutting down anyway.
#[track_caller]
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output;
}
45 changes: 29 additions & 16 deletions zebra-chain/src/diagnostic/task/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ impl<T> CheckForPanics for Result<T, JoinError> {
type Output = Result<T, JoinError>;

/// Returns the task result if the task finished normally.
/// Otherwise, resumes any panics, logs unexpected errors, and ignores any expected errors.
/// Otherwise, resumes any panics, and ignores any expected errors.
/// Handles unexpected errors based on `panic_on_unexpected_termination`.
///
/// If the task finished normally, returns `Some(T)`.
/// If the task was cancelled, returns `None`.
#[track_caller]
fn check_for_panics(self) -> Self::Output {
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self {
Ok(task_output) => Ok(task_output),
Err(join_error) => Err(join_error.check_for_panics()),
Err(join_error) => {
Err(join_error.check_for_panics_with(panic_on_unexpected_termination))
}
}
}
}
Expand All @@ -38,22 +41,26 @@ impl CheckForPanics for JoinError {
/// Resume any panics and panic on unexpected task cancellations.
/// Always returns [`JoinError::Cancelled`](JoinError::is_cancelled).
#[track_caller]
fn check_for_panics(self) -> Self::Output {
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self.try_into_panic() {
Ok(panic_payload) => panic::resume_unwind(panic_payload),

// We could ignore this error, but then we'd have to change the return type.
Err(task_cancelled) if is_shutting_down() => {
debug!(
?task_cancelled,
"ignoring cancelled task because Zebra is shutting down"
);
Err(task_cancelled) => {
if !panic_on_unexpected_termination {
debug!(?task_cancelled, "ignoring expected task termination");

task_cancelled
}
task_cancelled
} else if is_shutting_down() {
debug!(
?task_cancelled,
"ignoring task termination because Zebra is shutting down"
);

Err(task_cancelled) => {
panic!("task cancelled during normal Zebra operation: {task_cancelled:?}");
task_cancelled
} else {
panic!("task unexpectedly exited with: {:?}", task_cancelled)
}
}
}
}
Expand All @@ -67,7 +74,9 @@ where

/// Returns a future which waits for `self` to finish, then checks if its output is:
/// - a panic payload: resume that panic,
/// - an unexpected termination: panic with that error,
/// - an unexpected termination:
/// - if `panic_on_unexpected_termination` is true, panic with that error,
/// - otherwise, hang waiting for shutdown,
/// - an expected termination: hang waiting for shutdown.
///
/// Otherwise, returns the task return value of `self`.
Expand All @@ -79,11 +88,15 @@ where
/// # Hangs
///
/// If `self` contains an expected termination, and we're shutting down anyway.
/// If we're ignoring terminations because `panic_on_unexpected_termination` is `false`.
/// Futures hang by returning `Pending` and not setting a waker, so this uses minimal resources.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
async move {
match self.await.check_for_panics() {
match self
.await
.check_for_panics_with(panic_on_unexpected_termination)
{
Ok(task_output) => task_output,
Err(_expected_cancel_error) => future::pending().await,
}
Expand Down
73 changes: 57 additions & 16 deletions zebra-chain/src/diagnostic/task/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,77 @@ use std::{
thread::{self, JoinHandle},
};

use crate::shutdown::is_shutting_down;

use super::{CheckForPanics, WaitForPanics};

impl<T> CheckForPanics for thread::Result<T> {
impl<T> CheckForPanics for thread::Result<T>
where
T: std::fmt::Debug,
{
type Output = T;

/// Panics if the thread panicked.
/// # Panics
///
/// - if the thread panicked.
/// - if the thread is cancelled, `panic_on_unexpected_termination` is true, and
/// Zebra is not shutting down.
///
/// Threads can't be cancelled except by using a panic, so there are no thread errors here.
/// `panic_on_unexpected_termination` is
#[track_caller]
fn check_for_panics(self) -> Self::Output {
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self {
// The value returned by the thread when it finished.
Ok(thread_output) => thread_output,
Ok(thread_output) => {
if !panic_on_unexpected_termination {
debug!(?thread_output, "ignoring expected thread exit");

thread_output
} else if is_shutting_down() {
debug!(
?thread_output,
"ignoring thread exit because Zebra is shutting down"
);

thread_output
} else {
panic!("thread unexpectedly exited with: {:?}", thread_output)
}
}

// A thread error is always a panic.
Err(panic_payload) => panic::resume_unwind(panic_payload),
}
}
}

impl<T> WaitForPanics for JoinHandle<T> {
impl<T> WaitForPanics for JoinHandle<T>
where
T: std::fmt::Debug,
{
type Output = T;

/// Waits for the thread to finish, then panics if the thread panicked.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
self.join().check_for_panics()
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
self.join()
.check_for_panics_with(panic_on_unexpected_termination)
}
}

impl<T> WaitForPanics for Arc<JoinHandle<T>> {
impl<T> WaitForPanics for Arc<JoinHandle<T>>
where
T: std::fmt::Debug,
{
type Output = Option<T>;

/// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
/// panicked. Otherwise, returns the thread's return value.
///
/// If this is not the final `Arc`, drops the handle and immediately returns `None`.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
// If we are the last Arc with a reference to this handle,
// we can wait for it and propagate any panics.
//
Expand All @@ -56,29 +88,32 @@ impl<T> WaitForPanics for Arc<JoinHandle<T>> {
// This is more readable as an expanded statement.
#[allow(clippy::manual_map)]
if let Some(handle) = Arc::into_inner(self) {
Some(handle.wait_for_panics())
Some(handle.wait_for_panics_with(panic_on_unexpected_termination))
} else {
None
}
}
}

impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>> {
impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>>
where
T: std::fmt::Debug,
{
type Output = Option<T>;

/// If this is the final `Arc`, checks if the thread has finished, then panics if the thread
/// panicked. Otherwise, returns the thread's return value.
///
/// If the thread has not finished, or this is not the final `Arc`, returns `None`.
#[track_caller]
fn check_for_panics(self) -> Self::Output {
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
let handle = self.take()?;

if handle.is_finished() {
// This is the same as calling `self.wait_for_panics()`, but we can't do that,
// because we've taken `self`.
#[allow(clippy::manual_map)]
return handle.wait_for_panics();
return handle.wait_for_panics_with(panic_on_unexpected_termination);
}

*self = Some(handle);
Expand All @@ -87,18 +122,24 @@ impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>> {
}
}

impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>> {
impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>>
where
T: std::fmt::Debug,
{
type Output = Option<T>;

/// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
/// panicked. Otherwise, returns the thread's return value.
///
/// If this is not the final `Arc`, drops the handle and returns `None`.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
// This is more readable as an expanded statement.
#[allow(clippy::manual_map)]
if let Some(output) = self.take()?.wait_for_panics() {
if let Some(output) = self
.take()?
.wait_for_panics_with(panic_on_unexpected_termination)
{
Some(output)
} else {
// Some other task has a reference, so we should give up ours to let them use it.
Expand Down
Loading
Loading