Skip to content

Commit

Permalink
Improve error handling; increase timeout in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pool2win committed Dec 2, 2024
1 parent 57ec7ca commit 6bb161b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 31 deletions.
7 changes: 5 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,13 @@ impl Node {
EchoBroadcast::new(protocol_service, echo_broadcast_handle, state, node_id);
let timeout_layer =
tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(timeout));
let _ = timeout_layer
if let Err(e) = timeout_layer
.layer(echo_broadcast_service)
.oneshot(message)
.await;
.await
{
log::error!("Timeout error in broadcast message response: {}", e);

Check warning on line 409 in src/node.rs

View check run for this annotation

Codecov / codecov/patch

src/node.rs#L409

Added line #L409 was not covered by tests
}
});
}

Expand Down
9 changes: 7 additions & 2 deletions src/node/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ impl CommandExecutor {
}

pub async fn run_dkg(&self) -> Result<(), Box<dyn Error + Send>> {
log::debug!("Running DKG....");
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Command::RunDKG { respond_to: tx }).await;
if let Err(e) = self.tx.send(Command::RunDKG { respond_to: tx }).await {
log::error!("Error sending RunDKG command: {}", e);
return Err(Box::new(e));

Check warning on line 77 in src/node/commands.rs

View check run for this annotation

Codecov / codecov/patch

src/node/commands.rs#L76-L77

Added lines #L76 - L77 were not covered by tests
}
rx.await.unwrap()
}
}
Expand All @@ -86,7 +90,6 @@ impl Commands for Node {
async fn start_command_loop(&self, mut command_rx: mpsc::Receiver<Command>) {
log::debug!("Starting command loop....");
while let Some(msg) = command_rx.recv().await {
log::debug!("Received command: {:?}", msg);
match msg {
Command::Shutdown => {
log::info!("Shutting down....");
Expand All @@ -100,8 +103,10 @@ impl Commands for Node {
}
Command::RunDKG { respond_to } => {
if let Err(e) = self.trigger_dkg_tx.send(()).await {
log::error!("Error sending RunDKG command: {}", e);
let _ = respond_to.send(Err(Box::new(e)));

Check warning on line 107 in src/node/commands.rs

View check run for this annotation

Codecov / codecov/patch

src/node/commands.rs#L106-L107

Added lines #L106 - L107 were not covered by tests
} else {
log::debug!("Sent RunDKG command");
let _ = respond_to.send(Ok(()));
}
}
Expand Down
12 changes: 4 additions & 8 deletions src/node/echo_broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ impl EchoBroadcastActor {
respond_to,
} => {
// send echo to all members
let _ = self
.send_echos_to_members(data.clone(), members, respond_to)
.await;
let _ = self.send_echos(data.clone(), members, respond_to).await;
}
EchoBroadcastMessage::EchoReceive { data } => {
// manage echo data structures and confirm delivered
Expand Down Expand Up @@ -161,7 +159,7 @@ impl EchoBroadcastActor {
/// Process:
/// 1. Wait for echos from all members for the message sent
/// 2. On receiving echos from all members, return Ok() to waiting receiver
pub async fn send_echos_to_members(
pub async fn send_echos(
&mut self,
data: Message,
members: ReliableSenderMap,
Expand Down Expand Up @@ -241,7 +239,6 @@ impl EchoBroadcastActor {

/// Add received echo from a sender to the list of echos received
pub fn add_echo(&mut self, message_id: &MessageId, sender_id: String) {
log::debug!("Adding echo for {:?}, {:?}", message_id, sender_id);
match self.message_echos.get_mut(message_id) {
Some(echos) => {
echos
Expand All @@ -266,13 +263,12 @@ impl EchoBroadcastActor {
let peer_id = data.get_sender_id();
let message_id = data.get_message_id().unwrap();

log::debug!("Adding echo {:?} {:?}", message_id, peer_id);
log::debug!("Handling received echo {:?} {:?}", message_id, peer_id);
self.add_echo(&message_id, peer_id);

if self.echo_received_for_all(&message_id) {
match self.message_client_txs.remove(&message_id) {
Some(respond_to) => {
if respond_to.send(Ok(())).is_err() {
if let Err(_) = respond_to.send(Ok(())) {
log::error!("Error responding on echo broadcast completion");
}
log::debug!("Broadcast message can be delivered now...");
Expand Down
4 changes: 2 additions & 2 deletions src/node/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ impl MembershipHandle {

pub async fn get_members(&self) -> Result<ReliableSenderMap, BoxError> {
let (respond_to, receiver) = oneshot::channel();
if self
if let Err(e) = self
.sender
.send(MembershipMessage::GetMembers(respond_to))
.await
.is_err()
{
log::error!("Error sending request to get members: {}", e);

Check warning on line 132 in src/node/membership.rs

View check run for this annotation

Codecov / codecov/patch

src/node/membership.rs#L132

Added line #L132 was not covered by tests
return Err("Error sending request to get members".into());
}
match receiver.await {
Expand Down
15 changes: 3 additions & 12 deletions src/node/protocol/dkg/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,6 @@ pub(crate) async fn run_dkg(
round_one_rx.recv().await.unwrap();
log::info!("Round 1 finished");

log::debug!(
"received round1 packages = {:?}",
state
.dkg_state
.get_received_round1_packages()
.await
.unwrap()
);

// start round2
if let Err(e) = round2_future.await {
log::error!("Error running round 2: {:?}", e);
Expand Down Expand Up @@ -240,9 +231,9 @@ mod dkg_trigger_tests {
mock
});

let (round_one_tx, mut round_one_rx) = mpsc::channel::<()>(1);
let (round_two_tx, mut round_two_rx) = mpsc::channel::<()>(1);
let (trigger_dkg_tx, mut trigger_dkg_rx) = mpsc::channel::<()>(1);
let (_round_one_tx, mut round_one_rx) = mpsc::channel::<()>(1);
let (_round_two_tx, mut round_two_rx) = mpsc::channel::<()>(1);
let (_trigger_dkg_tx, mut trigger_dkg_rx) = mpsc::channel::<()>(1);
// Wait for just over one interval to ensure we get at least one trigger
let result: Result<(), time::error::Elapsed> = timeout(
Duration::from_millis(10),
Expand Down
10 changes: 5 additions & 5 deletions tests/run_nodes_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
let mut node = Node::new("localhost:6880".to_string(), vec![])
.await
.static_key_pem(KEY.into())
.delivery_timeout(100);
.delivery_timeout(500);

let (ready_tx, mut ready_rx) = oneshot::channel();
let (_executor, command_rx) = CommandExecutor::new();
Expand All @@ -54,7 +54,7 @@ gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
Node::new("localhost:6881".to_string(), vec!["localhost:6880".into()])
.await
.static_key_pem(KEY.into())
.delivery_timeout(100);
.delivery_timeout(500);

let (ready_tx_b, mut _ready_rx_b) = oneshot::channel();
let (executor_b, command_rx_b) = CommandExecutor::new();
Expand Down Expand Up @@ -92,7 +92,7 @@ gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
let mut node_a = Node::new("localhost:6890".to_string(), vec![])
.await
.static_key_pem(KEY.into())
.delivery_timeout(100);
.delivery_timeout(500);

let (ready_tx_a, ready_rx_a) = oneshot::channel();
let (executor_a, command_rx_a) = CommandExecutor::new();
Expand All @@ -105,7 +105,7 @@ gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
Node::new("localhost:6891".to_string(), vec!["localhost:6890".into()])
.await
.static_key_pem(KEY.into())
.delivery_timeout(100);
.delivery_timeout(500);

let (ready_tx_b, ready_rx_b) = oneshot::channel();
let (executor_b, command_rx_b) = CommandExecutor::new();
Expand All @@ -121,7 +121,7 @@ gSEA68zeZuy7PMMQC9ECPmWqDl5AOFj5bi243F823ZVWtXY=
)
.await
.static_key_pem(KEY.into())
.delivery_timeout(100);
.delivery_timeout(500);

let (ready_tx_c, _ready_rx_c) = oneshot::channel();
let (executor_c, command_rx_c) = CommandExecutor::new();
Expand Down

0 comments on commit 6bb161b

Please sign in to comment.