Skip to content

Commit

Permalink
Abort checkpoint service tasks on epoch change (#19915)
Browse files Browse the repository at this point in the history
## Description 

Checkpoint builder may never finish because it misses transactions and
consensus on peers has shut down.

## Test plan 

CI
`test_simulated_load_reconfig_with_crashes_and_delays`

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mwtian authored Oct 18, 2024
1 parent 1f30f8c commit 5102c1b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 94 deletions.
66 changes: 8 additions & 58 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use crate::execution_cache::TransactionCacheRead;
use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
use crate::state_accumulator::StateAccumulator;
use diffy::create_patch;
use futures::future::{select, Either};
use futures::FutureExt;
use itertools::Itertools;
use mysten_metrics::{monitored_future, monitored_scope, MonitoredFutureExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -63,11 +61,7 @@ use sui_types::messages_consensus::ConsensusTransactionKey;
use sui_types::signature::GenericSignature;
use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
use sui_types::transaction::{TransactionDataAPI, TransactionKey, TransactionKind};
use tokio::{
sync::{watch, Notify},
task::JoinSet,
time::timeout,
};
use tokio::{sync::Notify, task::JoinSet, time::timeout};
use tracing::{debug, error, info, instrument, warn};
use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::DBMapUtils;
Expand Down Expand Up @@ -863,7 +857,6 @@ pub struct CheckpointBuilder {
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
Expand All @@ -873,7 +866,6 @@ pub struct CheckpointAggregator {
tables: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
exit: watch::Receiver<()>,
current: Option<CheckpointSignatureAggregator>,
output: Box<dyn CertifiedCheckpointOutput>,
state: Arc<AuthorityState>,
Expand Down Expand Up @@ -901,7 +893,6 @@ impl CheckpointBuilder {
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
notify_aggregator: Arc<Notify>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
Expand All @@ -915,7 +906,6 @@ impl CheckpointBuilder {
effects_store,
accumulator,
output,
exit,
notify_aggregator,
metrics,
max_transactions_per_checkpoint,
Expand All @@ -926,26 +916,10 @@ impl CheckpointBuilder {
async fn run(mut self) {
info!("Starting CheckpointBuilder");
loop {
// Check whether an exit signal has been received, if so we break the loop.
// This gives us a chance to exit, in case checkpoint making keeps failing.
match self.exit.has_changed() {
Ok(true) | Err(_) => {
break;
}
Ok(false) => (),
};

self.maybe_build_checkpoints().await;

match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await {
Either::Left(_) => {
// break loop on exit signal
break;
}
Either::Right(_) => {}
}
self.notify.notified().await;
}
info!("Shutting down CheckpointBuilder");
}

async fn maybe_build_checkpoints(&mut self) {
Expand Down Expand Up @@ -1769,7 +1743,6 @@ impl CheckpointAggregator {
tables: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
exit: watch::Receiver<()>,
output: Box<dyn CertifiedCheckpointOutput>,
state: Arc<AuthorityState>,
metrics: Arc<CheckpointMetrics>,
Expand All @@ -1779,7 +1752,6 @@ impl CheckpointAggregator {
tables,
epoch_store,
notify,
exit,
current,
output,
state,
Expand All @@ -1800,19 +1772,7 @@ impl CheckpointAggregator {
continue;
}

match select(
self.exit.changed().boxed(),
timeout(Duration::from_secs(1), self.notify.notified()).boxed(),
)
.await
{
Either::Left(_) => {
// return on exit signal
info!("Shutting down CheckpointAggregator");
return;
}
Either::Right(_) => {}
}
let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
}
}

Expand Down Expand Up @@ -2241,19 +2201,13 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (
Arc<Self>,
watch::Sender<()>, /* The exit sender */
JoinSet<()>, /* Handle to tasks */
) {
) -> (Arc<Self>, JoinSet<()> /* Handle to tasks */) {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
let notify_builder = Arc::new(Notify::new());
let notify_aggregator = Arc::new(Notify::new());

let (exit_snd, exit_rcv) = watch::channel(());

let mut tasks = JoinSet::new();

let builder = CheckpointBuilder::new(
Expand All @@ -2264,22 +2218,17 @@ impl CheckpointService {
effects_store,
accumulator,
checkpoint_output,
exit_rcv.clone(),
notify_aggregator.clone(),
metrics.clone(),
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);
let epoch_store_clone = epoch_store.clone();
tasks.spawn(monitored_future!(async move {
let _ = epoch_store_clone.within_alive_epoch(builder.run()).await;
}));
tasks.spawn(monitored_future!(builder.run()));

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
exit_rcv,
certified_checkpoint_output,
state.clone(),
metrics.clone(),
Expand All @@ -2299,7 +2248,7 @@ impl CheckpointService {
metrics,
});

(service, exit_snd, tasks)
(service, tasks)
}

#[cfg(test)]
Expand Down Expand Up @@ -2412,6 +2361,7 @@ mod tests {
use super::*;
use crate::authority::test_authority_builder::TestAuthorityBuilder;
use futures::future::BoxFuture;
use futures::FutureExt as _;
use shared_crypto::intent::{Intent, IntentScope};
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
Expand Down Expand Up @@ -2543,7 +2493,7 @@ mod tests {
&epoch_store,
));

let (checkpoint_service, _exit_sender, _tasks) = CheckpointService::spawn(
let (checkpoint_service, _tasks) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _, _) = CheckpointService::spawn(
let (checkpoint_service, _) = CheckpointService::spawn(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
Expand Down
56 changes: 21 additions & 35 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::TryFutureExt;
use mysten_common::debug_fatal;
use prometheus::Registry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
Expand Down Expand Up @@ -48,7 +47,6 @@ use tap::tap::TapFallible;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, mpsc, watch, Mutex};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::timeout;
use tower::ServiceBuilder;
use tracing::{debug, error, warn};
use tracing::{error_span, info, Instrument};
Expand Down Expand Up @@ -150,10 +148,6 @@ pub struct ValidatorComponents {
consensus_manager: ConsensusManager,
consensus_store_pruner: ConsensusStorePruner,
consensus_adapter: Arc<ConsensusAdapter>,
// Sending to the channel or dropping this will eventually stop checkpoint tasks.
// The receiver side of this channel is copied into each checkpoint service task,
// and they are listening to any change to this channel.
checkpoint_service_exit: watch::Sender<()>,
// Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
checkpoint_service_tasks: JoinSet<()>,
checkpoint_metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -1291,17 +1285,16 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_exit, checkpoint_service_tasks) =
Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);
let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);

// create a new map that gets injected into both the consensus handler and the consensus adapter
// the consensus handler will write values forwarded from consensus, and the consensus adapter
Expand Down Expand Up @@ -1378,7 +1371,6 @@ impl SuiNode {
consensus_manager,
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
Expand All @@ -1394,7 +1386,7 @@ impl SuiNode {
state_sync_handle: state_sync::Handle,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, watch::Sender<()>, JoinSet<()>) {
) -> (Arc<CheckpointService>, JoinSet<()>) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand Down Expand Up @@ -1688,31 +1680,25 @@ impl SuiNode {
consensus_manager,
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
mut checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
}) = self.validator_components.lock().await.take()
{
info!("Reconfiguring the validator.");
// Stop the old checkpoint service and wait for them to finish.
let _ = checkpoint_service_exit.send(());
let wait_result = timeout(Duration::from_secs(5), async move {
while let Some(result) = checkpoint_service_tasks.join_next().await {
if let Err(err) = result {
if err.is_panic() {
std::panic::resume_unwind(err.into_panic());
}
warn!("Error in checkpoint service task: {:?}", err);
// Cancel the old checkpoint service tasks.
// Waiting for checkpoint builder to finish gracefully is not possible, because it
// may wait on transactions while consensus on peers have already shut down.
checkpoint_service_tasks.abort_all();
while let Some(result) = checkpoint_service_tasks.join_next().await {
if let Err(err) = result {
if err.is_panic() {
std::panic::resume_unwind(err.into_panic());
}
warn!("Error in checkpoint service task: {:?}", err);
}
})
.await;
if wait_result.is_err() {
debug_fatal!("Timed out waiting for checkpoint service tasks to finish.");
} else {
info!("Checkpoint service has shut down.");
}
info!("Checkpoint service has shut down.");

consensus_manager.shutdown().await;
info!("Consensus has shut down.");
Expand Down

0 comments on commit 5102c1b

Please sign in to comment.