From 3a9c72a6c138cf291605f0abef195fca2ba3169e Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:59:21 -0700 Subject: [PATCH] [Epoch Sync] Initial implementation for Epoch Sync V4. (#11934) Remaining issues: https://github.com/near/near-one-project-tracking/issues/73 I still need to go through this one more time, but reviews can be done in the meantime. --- chain/chain/src/garbage_collection.rs | 5 + chain/chain/src/test_utils/kv_runtime.rs | 3 +- chain/client-primitives/src/types.rs | 28 +- chain/client/src/adapter.rs | 4 +- chain/client/src/client.rs | 11 + chain/client/src/client_actor.rs | 14 +- chain/client/src/info.rs | 5 +- chain/client/src/sync/epoch.rs | 511 ++++++++++++++++++ chain/client/src/sync/header.rs | 57 +- chain/client/src/sync/mod.rs | 1 + chain/client/src/test_utils/setup.rs | 4 +- chain/epoch-manager/src/adapter.rs | 32 +- chain/epoch-manager/src/lib.rs | 61 ++- chain/network/src/client.rs | 18 +- chain/network/src/network_protocol/mod.rs | 12 + chain/network/src/peer/peer_actor.rs | 13 +- .../src/peer_manager/peer_manager_actor.rs | 38 ++ .../src/rate_limits/messages_limits.rs | 2 + chain/network/src/test_loop.rs | 25 +- chain/network/src/types.rs | 5 + core/async/src/test_loop.rs | 12 + core/async/src/test_loop/sender.rs | 8 +- core/chain-configs/src/client_config.rs | 30 + core/chain-configs/src/lib.rs | 27 +- core/primitives/src/epoch_sync.rs | 82 +++ core/primitives/src/lib.rs | 1 + core/primitives/src/views.rs | 9 +- .../src/test_loop/tests/epoch_sync.rs | 224 ++++++++ integration-tests/src/test_loop/tests/mod.rs | 1 + nearcore/src/config.rs | 40 +- tools/chainsync-loadtest/src/network.rs | 2 + 31 files changed, 1169 insertions(+), 116 deletions(-) create mode 100644 chain/client/src/sync/epoch.rs create mode 100644 core/primitives/src/epoch_sync.rs create mode 100644 integration-tests/src/test_loop/tests/epoch_sync.rs diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index e7a6ac3d90f..2aad91bfe4f 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -315,6 +315,11 @@ impl ChainStore { ) -> Result<(), Error> { let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered(); let head = self.head()?; + if head.prev_block_hash == CryptoHash::default() { + // This is genesis. It means we are state syncing right after epoch sync. Don't clear + // anything at genesis, or else the node will never boot up again. + return Ok(()); + } // Get header we were syncing into. let header = self.get_block_header(&sync_hash)?; let prev_hash = *header.prev_hash(); diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index d011bc0286c..f6e16a762a9 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -829,8 +829,9 @@ impl EpochManagerAdapter for MockEpochManager { Ok(Default::default()) } - fn epoch_sync_init_epoch_manager( + fn init_after_epoch_sync( &self, + _store_update: &mut StoreUpdate, _prev_epoch_first_block_info: BlockInfo, _prev_epoch_last_block_info: BlockInfo, _prev_epoch_prev_last_block_info: BlockInfo, diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 45452341384..2baeb99161d 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -271,6 +271,13 @@ impl std::fmt::Debug for StateSyncStatus { } } +#[derive(Clone, Debug)] +pub struct EpochSyncStatus { + pub source_peer_height: BlockHeight, + pub source_peer_id: PeerId, + pub attempt_time: near_time::Utc, +} + /// Various status sync can be in, whether it's fast sync or archival. #[derive(Clone, Debug, strum::AsRefStr)] pub enum SyncStatus { @@ -279,9 +286,8 @@ pub enum SyncStatus { /// Not syncing / Done syncing. NoSync, /// Syncing using light-client headers to a recent epoch - // TODO #3488 - // Bowen: why do we use epoch ordinal instead of epoch id? - EpochSync { epoch_ord: u64 }, + EpochSync(EpochSyncStatus), + EpochSyncDone, /// Downloading block headers for fast sync. HeaderSync { /// Head height at the beginning. Not the header head height! @@ -328,10 +334,11 @@ impl SyncStatus { SyncStatus::NoSync => 0, SyncStatus::AwaitingPeers => 1, SyncStatus::EpochSync { .. } => 2, - SyncStatus::HeaderSync { .. } => 3, - SyncStatus::StateSync(_) => 4, - SyncStatus::StateSyncDone => 5, - SyncStatus::BlockSync { .. } => 6, + SyncStatus::EpochSyncDone { .. } => 3, + SyncStatus::HeaderSync { .. } => 4, + SyncStatus::StateSync(_) => 5, + SyncStatus::StateSyncDone => 6, + SyncStatus::BlockSync { .. } => 7, } } @@ -356,7 +363,12 @@ impl From for SyncStatusView { match status { SyncStatus::AwaitingPeers => SyncStatusView::AwaitingPeers, SyncStatus::NoSync => SyncStatusView::NoSync, - SyncStatus::EpochSync { epoch_ord } => SyncStatusView::EpochSync { epoch_ord }, + SyncStatus::EpochSync(status) => SyncStatusView::EpochSync { + source_peer_height: status.source_peer_height, + source_peer_id: status.source_peer_id.to_string(), + attempt_time: status.attempt_time.to_string(), + }, + SyncStatus::EpochSyncDone => SyncStatusView::EpochSyncDone, SyncStatus::HeaderSync { start_height, current_height, highest_height } => { SyncStatusView::HeaderSync { start_height, current_height, highest_height } } diff --git a/chain/client/src/adapter.rs b/chain/client/src/adapter.rs index 4ab35929a8e..c9591e325d1 100644 --- a/chain/client/src/adapter.rs +++ b/chain/client/src/adapter.rs @@ -25,6 +25,8 @@ pub fn client_sender_for_network( tx_status_request: view_client_addr.clone().into_sender(), tx_status_response: view_client_addr.clone().into_sender(), announce_account: view_client_addr.into_sender(), - chunk_endorsement: client_addr.into_sender(), + chunk_endorsement: client_addr.clone().into_sender(), + epoch_sync_request: client_addr.clone().into_sender(), + epoch_sync_response: client_addr.into_sender(), } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index a388464f989..af9443f32e5 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -93,6 +93,7 @@ use tracing::{debug, debug_span, error, info, instrument, trace, warn}; #[cfg(feature = "test_features")] use crate::client_actor::AdvProduceChunksMode; +use crate::sync::epoch::EpochSync; const NUM_REBROADCAST_BLOCKS: usize = 30; @@ -151,6 +152,8 @@ pub struct Client { /// storing the current status of the state sync and blocks catch up pub catchup_state_syncs: HashMap, BlocksCatchUpState)>, + /// Keeps track of information needed to perform the initial Epoch Sync + pub epoch_sync: EpochSync, /// Keeps track of syncing headers. pub header_sync: HeaderSync, /// Keeps track of syncing block. @@ -277,6 +280,13 @@ impl Client { let sharded_tx_pool = ShardedTransactionPool::new(rng_seed, config.transaction_pool_size_limit); let sync_status = SyncStatus::AwaitingPeers; + let epoch_sync = EpochSync::new( + clock.clone(), + network_adapter.clone(), + chain.genesis().clone(), + async_computation_spawner.clone(), + config.epoch_sync.clone(), + ); let header_sync = HeaderSync::new( clock.clone(), network_adapter.clone(), @@ -372,6 +382,7 @@ impl Client { NonZeroUsize::new(num_block_producer_seats).unwrap(), ), catchup_state_syncs: HashMap::new(), + epoch_sync, header_sync, block_sync, state_sync, diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 89c6ca14c5e..3b5c504d5fe 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -1712,9 +1712,19 @@ impl ClientActorInner { /// Handle the SyncRequirement::SyncNeeded. /// - /// This method runs the header sync, the block sync + /// This method performs whatever syncing technique is needed (epoch sync, header sync, + /// state sync, block sync) to make progress towards bring the node up to date. fn handle_sync_needed(&mut self, highest_height: u64, signer: &Option>) { - // Run each step of syncing separately. + // Run epoch sync first; if this is applicable then nothing else is. + let epoch_sync_result = self.client.epoch_sync.run( + &mut self.client.sync_status, + &self.client.chain, + highest_height, + &self.network_info.highest_height_peers, + ); + unwrap_and_report_state_sync_result!(epoch_sync_result); + + // Run header sync as long as there are headers to catch up. let header_sync_result = self.client.header_sync.run( &mut self.client.sync_status, &mut self.client.chain, diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index cfd7384efa4..3f3d3d8e10e 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -709,9 +709,10 @@ pub fn display_sync_status( match sync_status { SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), SyncStatus::NoSync => format!("#{:>8} {:>44}", head.height, head.last_block_hash), - SyncStatus::EpochSync { epoch_ord } => { - format!("[EPOCH: {:>5}] Getting to a recent epoch", epoch_ord) + SyncStatus::EpochSync(status) => { + format!("[EPOCH] {:?}", status) } + SyncStatus::EpochSyncDone => "[EPOCH] Done".to_string(), SyncStatus::HeaderSync { start_height, current_height, highest_height } => { let percent = if highest_height <= start_height { 0.0 diff --git a/chain/client/src/sync/epoch.rs b/chain/client/src/sync/epoch.rs new file mode 100644 index 00000000000..04ac6fa6eaf --- /dev/null +++ b/chain/client/src/sync/epoch.rs @@ -0,0 +1,511 @@ +use crate::client_actor::ClientActorInner; +use borsh::BorshDeserialize; +use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; +use near_async::messaging::{CanSend, Handler}; +use near_async::time::Clock; +use near_chain::types::Tip; +use near_chain::{BlockHeader, Chain, ChainStoreAccess, Error}; +use near_chain_configs::EpochSyncConfig; +use near_client_primitives::types::{EpochSyncStatus, SyncStatus}; +use near_epoch_manager::EpochManagerAdapter; +use near_network::client::{EpochSyncRequestMessage, EpochSyncResponseMessage}; +use near_network::types::{ + HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest, +}; +use near_performance_metrics_macros::perf; +use near_primitives::epoch_block_info::BlockInfo; +use near_primitives::epoch_info::EpochInfo; +use near_primitives::epoch_manager::AGGREGATOR_KEY; +use near_primitives::epoch_sync::{ + EpochSyncProof, EpochSyncProofCurrentEpochData, EpochSyncProofLastEpochData, + EpochSyncProofPastEpochData, +}; +use near_primitives::merkle::PartialMerkleTree; +use near_primitives::network::PeerId; +use near_primitives::types::{BlockHeight, EpochId}; +use near_store::{DBCol, Store, FINAL_HEAD_KEY}; +use rand::seq::SliceRandom; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::instrument; + +pub struct EpochSync { + clock: Clock, + network_adapter: PeerManagerAdapter, + genesis: BlockHeader, + async_computation_spawner: Arc, + config: EpochSyncConfig, +} + +impl EpochSync { + pub fn new( + clock: Clock, + network_adapter: PeerManagerAdapter, + genesis: BlockHeader, + async_computation_spawner: Arc, + config: EpochSyncConfig, + ) -> Self { + Self { clock, network_adapter, genesis, async_computation_spawner, config } + } + + /// Derives an epoch sync proof for a recent epoch, that can be directly used to bootstrap + /// a new node or bring a far-behind node to a recent epoch. + #[instrument(skip(store))] + fn derive_epoch_sync_proof(store: Store) -> Result { + // Epoch sync initializes a new node with the first block of some epoch; we call that + // epoch the "target epoch". In the context of talking about the proof or the newly + // bootstrapped node, it is also called the "current epoch". + // + // The basic requirement for picking the target epoch is that its first block must be + // final. That's just so that we don't have to deal with any forks. Therefore, it is + // sufficient to pick whatever epoch the current final block is in. However, because + // state sync also requires some previous headers to be available (depending on how + // many chunks were missing), it is more convenient to just pick the prior epoch, so + // that by the time the new node does state sync, it would have a whole epoch of headers + // available via header sync. + // + // In other words, we pick the target epoch to be the previous epoch of the final tip. + let tip = store + .get_ser::(DBCol::BlockMisc, FINAL_HEAD_KEY)? + .ok_or_else(|| Error::Other("Could not find tip".to_string()))?; + let next_next_epoch_id = tip.next_epoch_id; + // Last block hash of the target epoch is the same as the next next EpochId. + // That's a general property for Near's epochs. + let target_epoch_last_block_header = store + .get_ser::(DBCol::BlockHeader, next_next_epoch_id.0.as_bytes())? + .ok_or_else(|| Error::Other("Could not find last block of target epoch".to_string()))?; + let target_epoch_second_last_block_header = store + .get_ser::( + DBCol::BlockHeader, + target_epoch_last_block_header.prev_hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other("Could not find second last block of target epoch".to_string()) + })?; + + Self::derive_epoch_sync_proof_from_final_block(store, target_epoch_second_last_block_header) + } + + /// Derives an epoch sync proof using a target epoch which the given block header is in. + /// The given block header must be some block in the epoch that is right after a final + /// block in the same epoch. But it doesn't matter which final block it is. + fn derive_epoch_sync_proof_from_final_block( + store: Store, + next_block_header_after_final_block_in_current_epoch: BlockHeader, + ) -> Result { + let final_block_header_in_current_epoch = store + .get_ser::( + DBCol::BlockHeader, + next_block_header_after_final_block_in_current_epoch.prev_hash().as_bytes(), + )? + .ok_or_else(|| Error::Other("Could not find final block header".to_string()))?; + + let current_epoch = *final_block_header_in_current_epoch.epoch_id(); + let current_epoch_info = store + .get_ser::(DBCol::EpochInfo, current_epoch.0.as_bytes())? + .ok_or_else(|| Error::EpochOutOfBounds(current_epoch))?; + let next_epoch = *final_block_header_in_current_epoch.next_epoch_id(); + let next_epoch_info = store + .get_ser::(DBCol::EpochInfo, next_epoch.0.as_bytes())? + .ok_or_else(|| Error::EpochOutOfBounds(next_epoch))?; + + // TODO: don't always generate from genesis + let all_past_epochs = Self::get_all_past_epochs( + &store, + EpochId::default(), + next_epoch, + &final_block_header_in_current_epoch, + )?; + if all_past_epochs.is_empty() { + return Err(Error::Other("Need at least three epochs to epoch sync".to_string())); + } + let prev_epoch = *all_past_epochs.last().unwrap().last_final_block_header.epoch_id(); + let prev_epoch_info = store + .get_ser::(DBCol::EpochInfo, prev_epoch.0.as_bytes())? + .ok_or_else(|| Error::EpochOutOfBounds(prev_epoch))?; + + let last_block_of_prev_epoch = store + .get_ser::(DBCol::BlockHeader, next_epoch.0.as_bytes())? + .ok_or_else(|| Error::Other("Could not find last block of target epoch".to_string()))?; + + let last_block_info_of_prev_epoch = store + .get_ser::(DBCol::BlockInfo, last_block_of_prev_epoch.hash().as_bytes())? + .ok_or_else(|| Error::Other("Could not find last block info".to_string()))?; + + let second_last_block_of_prev_epoch = store + .get_ser::( + DBCol::BlockHeader, + last_block_of_prev_epoch.prev_hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other("Could not find second last block of target epoch".to_string()) + })?; + + let second_last_block_info_of_prev_epoch = store + .get_ser::( + DBCol::BlockInfo, + last_block_of_prev_epoch.prev_hash().as_bytes(), + )? + .ok_or_else(|| Error::Other("Could not find second last block info".to_string()))?; + + let first_block_info_of_prev_epoch = store + .get_ser::( + DBCol::BlockInfo, + last_block_info_of_prev_epoch.epoch_first_block().as_bytes(), + )? + .ok_or_else(|| Error::Other("Could not find first block info".to_string()))?; + + let block_info_for_final_block_of_current_epoch = store + .get_ser::( + DBCol::BlockInfo, + final_block_header_in_current_epoch.hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other("Could not find block info for latest final block".to_string()) + })?; + + let first_block_of_current_epoch = store + .get_ser::( + DBCol::BlockHeader, + block_info_for_final_block_of_current_epoch.epoch_first_block().as_bytes(), + )? + .ok_or_else(|| Error::Other("Could not find first block of next epoch".to_string()))?; + + let first_block_info_of_current_epoch = store + .get_ser::( + DBCol::BlockInfo, + block_info_for_final_block_of_current_epoch.epoch_first_block().as_bytes(), + )? + .ok_or_else(|| { + Error::Other("Could not find first block info of next epoch".to_string()) + })?; + + let merkle_proof_for_first_block_of_current_epoch = store + .get_ser::( + DBCol::BlockMerkleTree, + first_block_of_current_epoch.hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other("Could not find merkle proof for first block".to_string()) + })?; + + let proof = EpochSyncProof { + past_epochs: all_past_epochs, + last_epoch: EpochSyncProofLastEpochData { + epoch_info: prev_epoch_info, + next_epoch_info: current_epoch_info, + next_next_epoch_info: next_epoch_info, + first_block_in_epoch: first_block_info_of_prev_epoch, + last_block_in_epoch: last_block_info_of_prev_epoch, + second_last_block_in_epoch: second_last_block_info_of_prev_epoch, + final_block_header_in_next_epoch: final_block_header_in_current_epoch, + approvals_for_final_block_in_next_epoch: + next_block_header_after_final_block_in_current_epoch.approvals().to_vec(), + }, + current_epoch: EpochSyncProofCurrentEpochData { + first_block_header_in_epoch: first_block_of_current_epoch, + first_block_info_in_epoch: first_block_info_of_current_epoch, + last_block_header_in_prev_epoch: last_block_of_prev_epoch, + second_last_block_header_in_prev_epoch: second_last_block_of_prev_epoch, + merkle_proof_for_first_block: merkle_proof_for_first_block_of_current_epoch, + }, + }; + + Ok(proof) + } + + /// Get all the past epoch data needed for epoch sync, between `after_epoch` and `next_epoch` + /// (both exclusive). `current_epoch_any_header` is any block header in the current epoch, + /// which is the epoch before `next_epoch`. + #[instrument(skip(store, current_epoch_any_header))] + fn get_all_past_epochs( + store: &Store, + after_epoch: EpochId, + next_epoch: EpochId, + current_epoch_any_header: &BlockHeader, + ) -> Result, Error> { + // We're going to get all the epochs and then figure out the correct chain of + // epochs. The reason is that (1) epochs may, in very rare cases, have forks, + // so we cannot just take all the epochs and assume their heights do not collide; + // and (2) it is not easy to walk backwards from the last epoch; there's no + // "give me the previous epoch" query. So instead, we use block header's + // `next_epoch_id` to establish an epoch chain. + let mut all_epoch_infos = HashMap::new(); + for item in store.iter(DBCol::EpochInfo) { + let (key, value) = item?; + if key.as_ref() == AGGREGATOR_KEY { + continue; + } + let epoch_id = EpochId::try_from_slice(key.as_ref())?; + let epoch_info = EpochInfo::try_from_slice(value.as_ref())?; + all_epoch_infos.insert(epoch_id, epoch_info); + } + + // Collect the previous-epoch relationship based on block headers. + // To get block headers for past epochs, we use the fact that the EpochId is the + // same as the block hash of the last block two epochs ago. That works except for + // the current epoch, whose last block doesn't exist yet, which is why we need + // any arbitrary block header in the current epoch as a special case. + let mut epoch_to_prev_epoch = HashMap::new(); + epoch_to_prev_epoch.insert( + *current_epoch_any_header.next_epoch_id(), + *current_epoch_any_header.epoch_id(), + ); + for (epoch_id, _) in &all_epoch_infos { + if let Ok(Some(block)) = + store.get_ser::(DBCol::BlockHeader, epoch_id.0.as_bytes()) + { + epoch_to_prev_epoch.insert(*block.next_epoch_id(), *block.epoch_id()); + } + } + + // Now that we have the chain of previous epochs, walk from the last epoch backwards + // towards the first epoch. + let mut epoch_ids = vec![]; + let mut current_epoch = next_epoch; + while current_epoch != after_epoch { + let prev_epoch = epoch_to_prev_epoch.get(¤t_epoch).ok_or_else(|| { + Error::Other(format!("Could not find prev epoch for {:?}", current_epoch)) + })?; + epoch_ids.push(current_epoch); + current_epoch = *prev_epoch; + } + epoch_ids.reverse(); + + // Now that we have all epochs, we can fetch the data needed for each epoch. + let epochs = (0..epoch_ids.len() - 2) + .into_par_iter() + .map(|index| -> Result { + let next_next_epoch_id = epoch_ids[index + 2]; + let epoch_id = epoch_ids[index]; + let last_block_header = store + .get_ser::(DBCol::BlockHeader, next_next_epoch_id.0.as_bytes())? + .ok_or_else(|| { + Error::Other(format!( + "Could not find last block header for epoch {:?}", + epoch_id + )) + })?; + let second_last_block_header = store + .get_ser::( + DBCol::BlockHeader, + last_block_header.prev_hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other(format!( + "Could not find second last block header for epoch {:?}", + epoch_id + )) + })?; + let third_last_block_header = store + .get_ser::( + DBCol::BlockHeader, + second_last_block_header.prev_hash().as_bytes(), + )? + .ok_or_else(|| { + Error::Other(format!( + "Could not find third last block header for epoch {:?}", + epoch_id + )) + })?; + let epoch_info = all_epoch_infos.get(&epoch_id).ok_or_else(|| { + Error::Other(format!("Could not find epoch info for epoch {:?}", epoch_id)) + })?; + Ok(EpochSyncProofPastEpochData { + block_producers: epoch_info + .block_producers_settlement() + .iter() + .map(|validator_id| epoch_info.get_validator(*validator_id)) + .collect(), + last_final_block_header: third_last_block_header, + approvals_for_last_final_block: second_last_block_header.approvals().to_vec(), + }) + }) + .collect::, _>>()?; + Ok(epochs) + } + + /// Performs the epoch sync logic if applicable in the current state of the blockchain. + /// This is periodically called by the client actor. + pub fn run( + &self, + status: &mut SyncStatus, + chain: &Chain, + highest_height: BlockHeight, + highest_height_peers: &[HighestHeightPeerInfo], + ) -> Result<(), Error> { + if !self.config.enabled { + return Ok(()); + } + let tip_height = chain.chain_store().header_head()?.height; + if tip_height + self.config.epoch_sync_horizon >= highest_height { + return Ok(()); + } + match status { + SyncStatus::AwaitingPeers | SyncStatus::StateSync(_) => { + return Ok(()); + } + SyncStatus::EpochSync(status) => { + if status.attempt_time + self.config.timeout_for_epoch_sync < self.clock.now_utc() { + tracing::warn!("Epoch sync from {} timed out; retrying", status.source_peer_id); + } else { + return Ok(()); + } + } + _ => {} + } + + // TODO(#11976): Implement a more robust logic for picking a peer to request epoch sync from. + let peer = highest_height_peers + .choose(&mut rand::thread_rng()) + .ok_or_else(|| Error::Other("No peers to request epoch sync from".to_string()))?; + + *status = SyncStatus::EpochSync(EpochSyncStatus { + source_peer_id: peer.peer_info.id.clone(), + source_peer_height: peer.highest_block_height, + attempt_time: self.clock.now_utc(), + }); + + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::EpochSyncRequest { peer_id: peer.peer_info.id.clone() }, + )); + + Ok(()) + } + + pub fn apply_proof( + &self, + status: &mut SyncStatus, + chain: &mut Chain, + proof: EpochSyncProof, + source_peer: PeerId, + epoch_manager: &dyn EpochManagerAdapter, + ) -> Result<(), Error> { + if let SyncStatus::EpochSync(status) = status { + if status.source_peer_id != source_peer { + tracing::warn!("Ignoring epoch sync proof from unexpected peer: {}", source_peer); + return Ok(()); + } + if proof.current_epoch.first_block_header_in_epoch.height() + + self.config.epoch_sync_accept_proof_max_horizon + < status.source_peer_height + { + tracing::error!( + "Ignoring epoch sync proof from peer {} with too high height", + source_peer + ); + return Ok(()); + } + } else { + tracing::warn!("Ignoring unexpected epoch sync proof from peer: {}", source_peer); + return Ok(()); + } + + // TODO(#11932): Verify the proof. + + let last_header = proof.current_epoch.first_block_header_in_epoch; + let mut store_update = chain.chain_store.store().store_update(); + + let mut update = chain.mut_chain_store().store_update(); + update.save_block_header_no_update_tree(last_header.clone())?; + update.save_block_header_no_update_tree( + proof.current_epoch.last_block_header_in_prev_epoch, + )?; + update.save_block_header_no_update_tree( + proof.current_epoch.second_last_block_header_in_prev_epoch, + )?; + tracing::info!( + "last final block of last past epoch: {:?}", + proof.past_epochs.last().unwrap().last_final_block_header.hash() + ); + update.save_block_header_no_update_tree( + proof.past_epochs.last().unwrap().last_final_block_header.clone(), + )?; + update.force_save_header_head(&Tip::from_header(&last_header))?; + update.save_final_head(&Tip::from_header(&self.genesis))?; + + epoch_manager.init_after_epoch_sync( + &mut store_update, + proof.last_epoch.first_block_in_epoch, + proof.last_epoch.second_last_block_in_epoch, + proof.last_epoch.last_block_in_epoch.clone(), + proof.last_epoch.last_block_in_epoch.epoch_id(), + proof.last_epoch.epoch_info, + proof.last_epoch.final_block_header_in_next_epoch.epoch_id(), + proof.last_epoch.next_epoch_info, + proof.last_epoch.final_block_header_in_next_epoch.next_epoch_id(), + proof.last_epoch.next_next_epoch_info, + )?; + + store_update.insert_ser( + DBCol::BlockInfo, + &borsh::to_vec(proof.current_epoch.first_block_info_in_epoch.hash()).unwrap(), + &proof.current_epoch.first_block_info_in_epoch, + )?; + + store_update.set_ser( + DBCol::BlockOrdinal, + &borsh::to_vec(&proof.current_epoch.merkle_proof_for_first_block.size()).unwrap(), + last_header.hash(), + )?; + + store_update.set_ser( + DBCol::BlockHeight, + &borsh::to_vec(&last_header.height()).unwrap(), + last_header.hash(), + )?; + + store_update.set_ser( + DBCol::BlockMerkleTree, + last_header.hash().as_bytes(), + &proof.current_epoch.merkle_proof_for_first_block, + )?; + + update.merge(store_update); + update.commit()?; + + *status = SyncStatus::EpochSyncDone; + + Ok(()) + } +} + +impl Handler for ClientActorInner { + #[perf] + fn handle(&mut self, msg: EpochSyncRequestMessage) { + let store = self.client.chain.chain_store.store().clone(); + let network_adapter = self.client.network_adapter.clone(); + let route_back = msg.route_back; + self.client.epoch_sync.async_computation_spawner.spawn( + "respond to epoch sync request", + move || { + let proof = match EpochSync::derive_epoch_sync_proof(store) { + Ok(epoch_sync_proof) => epoch_sync_proof, + Err(e) => { + tracing::error!("Failed to derive epoch sync proof: {:?}", e); + return; + } + }; + network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::EpochSyncResponse { route_back, proof }, + )); + }, + ) + } +} + +impl Handler for ClientActorInner { + #[perf] + fn handle(&mut self, msg: EpochSyncResponseMessage) { + if let Err(e) = self.client.epoch_sync.apply_proof( + &mut self.client.sync_status, + &mut self.client.chain, + msg.proof, + msg.from_peer, + self.client.epoch_manager.as_ref(), + ) { + tracing::error!("Failed to apply epoch sync proof: {:?}", e); + } + } +} diff --git a/chain/client/src/sync/header.rs b/chain/client/src/sync/header.rs index 84ebf2deb74..3476eb92ed0 100644 --- a/chain/client/src/sync/header.rs +++ b/chain/client/src/sync/header.rs @@ -122,18 +122,18 @@ impl HeaderSync { let enable_header_sync = match sync_status { SyncStatus::HeaderSync { .. } | SyncStatus::BlockSync { .. } + | SyncStatus::EpochSyncDone | SyncStatus::StateSyncDone => { // TODO: Transitioning from BlockSync to HeaderSync is fine if the highest height of peers gets too far from our header_head_height. However it's currently unconditional. true } - SyncStatus::NoSync | SyncStatus::AwaitingPeers | SyncStatus::EpochSync { .. } => { - // TODO: How can it get to EpochSync if it's hardcoded to go from NoSync to HeaderSync? + SyncStatus::NoSync | SyncStatus::AwaitingPeers => { debug!(target: "sync", "Sync: initial transition to Header sync. Header head {} at {}", header_head.last_block_hash, header_head.height, ); true } - SyncStatus::StateSync { .. } => false, + SyncStatus::EpochSync { .. } | SyncStatus::StateSync { .. } => false, }; if !enable_header_sync { @@ -157,7 +157,8 @@ impl HeaderSync { let shutdown_height = self.shutdown_height.get().unwrap_or(u64::MAX); let highest_height = peer.highest_block_height.min(shutdown_height); if highest_height > header_head.height { - self.syncing_peer = self.request_headers(chain, peer); + self.request_headers(chain, &peer)?; + self.syncing_peer = Some(peer); } } Ok(()) @@ -203,13 +204,9 @@ impl HeaderSync { // This can be either the initial timeout, or any of the progress timeouts after the initial timeout. let stalling = header_head.height <= old_expected_height && now > timeout; - // Always enable header sync on initial state transition from - // * NoSync - // * AwaitingPeers. - // TODO: Will this remain correct with the introduction of EpochSync? - // TODO: Shouldn't a node transition to EpochSync from these states? + // Always enable header sync if we're able to do header sync but are not doing it already. let force_sync = match sync_status { - SyncStatus::NoSync | SyncStatus::AwaitingPeers => true, + SyncStatus::NoSync | SyncStatus::AwaitingPeers | SyncStatus::EpochSyncDone => true, _ => false, }; @@ -317,19 +314,17 @@ impl HeaderSync { fn request_headers( &mut self, chain: &Chain, - peer: HighestHeightPeerInfo, - ) -> Option { - if let Ok(locator) = self.get_locator(chain) { - debug!(target: "sync", "Sync: request headers: asking {} for headers, {:?}", peer.peer_info.id, locator); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::BlockHeadersRequest { - hashes: locator, - peer_id: peer.peer_info.id.clone(), - }, - )); - return Some(peer); - } - None + peer: &HighestHeightPeerInfo, + ) -> Result<(), near_chain::Error> { + let locator = self.get_locator(chain)?; + debug!(target: "sync", "Sync: request headers: asking {} for headers, {:?}", peer.peer_info.id, locator); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::BlockHeadersRequest { + hashes: locator, + peer_id: peer.peer_info.id.clone(), + }, + )); + Ok(()) } // The remote side will return MAX_BLOCK_HEADERS headers, starting from the first hash in @@ -352,8 +347,20 @@ impl HeaderSync { let ordinals = get_locator_ordinals(final_head_ordinal, tip_ordinal); let mut locator: Vec = vec![]; for ordinal in &ordinals { - let block_hash = store.get_block_hash_from_ordinal(*ordinal)?; - locator.push(block_hash); + match store.get_block_hash_from_ordinal(*ordinal) { + Ok(block_hash) => { + locator.push(block_hash); + } + Err(e) => { + // In the case of epoch sync, it is normal and expected that we will not have + // many headers before the tip, so that case is fine. + if *ordinal == tip_ordinal { + return Err(e); + } + debug!(target: "sync", "Sync: failed to get block hash from ordinal {}; \ + this is normal if we just finished epoch sync. Error: {:?}", ordinal, e); + } + } } debug!(target: "sync", "Sync: locator: {:?} ordinals: {:?}", locator, ordinals); Ok(locator) diff --git a/chain/client/src/sync/mod.rs b/chain/client/src/sync/mod.rs index a7933545d98..462344f1926 100644 --- a/chain/client/src/sync/mod.rs +++ b/chain/client/src/sync/mod.rs @@ -1,5 +1,6 @@ pub mod adapter; pub mod block; +pub mod epoch; pub mod external; pub mod header; pub mod state; diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index cde03a9ce30..b7f2ec26ddd 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -761,7 +761,9 @@ fn process_peer_manager_message_default( | NetworkRequests::TxStatus(_, _, _) | NetworkRequests::SnapshotHostInfo { .. } | NetworkRequests::Challenge(_) - | NetworkRequests::ChunkStateWitnessAck(_, _) => {} + | NetworkRequests::ChunkStateWitnessAck(_, _) + | NetworkRequests::EpochSyncRequest { .. } + | NetworkRequests::EpochSyncResponse { .. } => {} } } diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index b4ba37fe7c1..874a8328851 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -308,8 +308,9 @@ pub trait EpochManagerAdapter: Send + Sync { } /// Epoch Manager init procedure that is necessary after Epoch Sync. - fn epoch_sync_init_epoch_manager( + fn init_after_epoch_sync( &self, + store_update: &mut StoreUpdate, prev_epoch_first_block_info: BlockInfo, prev_epoch_prev_last_block_info: BlockInfo, prev_epoch_last_block_info: BlockInfo, @@ -820,8 +821,9 @@ impl EpochManagerAdapter for EpochManagerHandle { )) } - fn epoch_sync_init_epoch_manager( + fn init_after_epoch_sync( &self, + store_update: &mut StoreUpdate, prev_epoch_first_block_info: BlockInfo, prev_epoch_prev_last_block_info: BlockInfo, prev_epoch_last_block_info: BlockInfo, @@ -833,20 +835,18 @@ impl EpochManagerAdapter for EpochManagerHandle { next_epoch_info: EpochInfo, ) -> Result<(), EpochError> { let mut epoch_manager = self.write(); - epoch_manager - .init_after_epoch_sync( - prev_epoch_first_block_info, - prev_epoch_prev_last_block_info, - prev_epoch_last_block_info, - prev_epoch_id, - prev_epoch_info, - epoch_id, - epoch_info, - next_epoch_id, - next_epoch_info, - )? - .commit() - .map_err(|err| err.into()) + epoch_manager.init_after_epoch_sync( + store_update, + prev_epoch_first_block_info, + prev_epoch_prev_last_block_info, + prev_epoch_last_block_info, + prev_epoch_id, + prev_epoch_info, + epoch_id, + epoch_info, + next_epoch_id, + next_epoch_info, + ) } fn verify_block_vrf( diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index 892203ebde9..708dc394a78 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -2,7 +2,7 @@ use crate::metrics::{PROTOCOL_VERSION_NEXT, PROTOCOL_VERSION_VOTES}; use crate::types::EpochInfoAggregator; use near_cache::SyncLruCache; use near_chain_configs::GenesisConfig; -use near_primitives::block::Tip; +use near_primitives::block::{BlockHeader, Tip}; use near_primitives::checked_feature; use near_primitives::epoch_block_info::{BlockInfo, SlashState}; use near_primitives::epoch_info::EpochInfo; @@ -24,7 +24,7 @@ use near_primitives::version::{ProtocolVersion, UPGRADABILITY_FIX_PROTOCOL_VERSI use near_primitives::views::{ CurrentEpochValidatorInfo, EpochValidatorInfo, NextEpochValidatorInfo, ValidatorKickoutView, }; -use near_store::{DBCol, Store, StoreUpdate}; +use near_store::{DBCol, Store, StoreUpdate, HEADER_HEAD_KEY}; use num_rational::Rational64; use primitive_types::U256; use std::cmp::Ordering; @@ -359,6 +359,7 @@ impl EpochManager { pub fn init_after_epoch_sync( &mut self, + store_update: &mut StoreUpdate, prev_epoch_first_block_info: BlockInfo, prev_epoch_prev_last_block_info: BlockInfo, prev_epoch_last_block_info: BlockInfo, @@ -368,18 +369,21 @@ impl EpochManager { epoch_info: EpochInfo, next_epoch_id: &EpochId, next_epoch_info: EpochInfo, - ) -> Result { - let mut store_update = self.store.store_update(); - self.save_block_info(&mut store_update, Arc::new(prev_epoch_first_block_info))?; - self.save_block_info(&mut store_update, Arc::new(prev_epoch_prev_last_block_info))?; - self.save_block_info(&mut store_update, Arc::new(prev_epoch_last_block_info))?; - self.save_epoch_info(&mut store_update, prev_epoch_id, Arc::new(prev_epoch_info))?; - self.save_epoch_info(&mut store_update, epoch_id, Arc::new(epoch_info))?; - self.save_epoch_info(&mut store_update, next_epoch_id, Arc::new(next_epoch_info))?; - // TODO #3488 - // put unreachable! here to avoid warnings - unreachable!(); - // Ok(store_update) + ) -> Result<(), EpochError> { + // TODO(#11931): We need to initialize the aggregator to the previous epoch, because + // we move the aggregator forward in the previous epoch we do not have previous epoch's + // blocks to compute the aggregator data. See issue for details. Consider a cleaner way. + self.epoch_info_aggregator = + EpochInfoAggregator::new(*prev_epoch_id, *prev_epoch_prev_last_block_info.prev_hash()); + store_update.set_ser(DBCol::EpochInfo, AGGREGATOR_KEY, &self.epoch_info_aggregator)?; + + self.save_block_info(store_update, Arc::new(prev_epoch_first_block_info))?; + self.save_block_info(store_update, Arc::new(prev_epoch_prev_last_block_info))?; + self.save_block_info(store_update, Arc::new(prev_epoch_last_block_info))?; + self.save_epoch_info(store_update, prev_epoch_id, Arc::new(prev_epoch_info))?; + self.save_epoch_info(store_update, epoch_id, Arc::new(epoch_info))?; + self.save_epoch_info(store_update, next_epoch_id, Arc::new(next_epoch_info))?; + Ok(()) } /// When computing validators to kickout, we exempt some validators first so that @@ -1968,9 +1972,32 @@ impl EpochManager { } let prev_hash = *block_info.prev_hash(); - let prev_info = self.get_block_info(&prev_hash)?; - let prev_height = prev_info.height(); - let prev_epoch = *prev_info.epoch_id(); + let (prev_height, prev_epoch) = match self.get_block_info(&prev_hash) { + Ok(info) => (info.height(), *info.epoch_id()), + Err(EpochError::MissingBlock(_)) => { + // In the case of epoch sync, we may not have the BlockInfo for the last final block + // of the epoch. In this case, check for this special case. + // TODO(11931): think of a better way to do this. + let tip = self + .store + .get_ser::(DBCol::BlockMisc, HEADER_HEAD_KEY)? + .ok_or_else(|| EpochError::IOErr("Tip not found in store".to_string()))?; + let block_header = self + .store + .get_ser::(DBCol::BlockHeader, tip.prev_block_hash.as_bytes())? + .ok_or_else(|| { + EpochError::IOErr( + "BlockHeader for prev block of tip not found in store".to_string(), + ) + })?; + if block_header.prev_hash() == block_info.hash() { + (block_info.height() - 1, *block_info.epoch_id()) + } else { + return Err(EpochError::MissingBlock(prev_hash)); + } + } + Err(e) => return Err(e), + }; let block_info = self.get_block_info(&cur_hash)?; aggregator.update_tail(&block_info, &epoch_info, prev_height); diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index 8378e72be1a..ac201ab3230 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -1,9 +1,10 @@ use crate::network_protocol::StateResponseInfo; use crate::types::{NetworkInfo, ReasonForBan}; -use near_async::messaging::AsyncSender; +use near_async::messaging::{AsyncSender, Sender}; use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom}; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; +use near_primitives::epoch_sync::EpochSyncProof; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -117,6 +118,19 @@ pub struct AnnounceAccountRequest(pub Vec<(AnnounceAccount, Option)>); #[rtype(result = "()")] pub struct ChunkEndorsementMessage(pub ChunkEndorsement); +#[derive(actix::Message, Debug, Clone, PartialEq, Eq)] +#[rtype(result = "()")] +pub struct EpochSyncRequestMessage { + pub route_back: CryptoHash, +} + +#[derive(actix::Message, Debug, Clone, PartialEq, Eq)] +#[rtype(result = "()")] +pub struct EpochSyncResponseMessage { + pub from_peer: PeerId, + pub proof: EpochSyncProof, +} + #[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)] #[multi_send_message_derive(Debug)] #[multi_send_input_derive(Debug, Clone, PartialEq, Eq)] @@ -137,4 +151,6 @@ pub struct ClientSenderForNetwork { pub announce_account: AsyncSender, ReasonForBan>>, pub chunk_endorsement: AsyncSender, + pub epoch_sync_request: Sender, + pub epoch_sync_response: Sender, } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 86d59709bde..d29521daec4 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -35,6 +35,7 @@ use near_crypto::Signature; use near_o11y::OpenTelemetrySpanExt; use near_primitives::block::{Approval, Block, BlockHeader, GenesisId}; use near_primitives::challenge::Challenge; +use near_primitives::epoch_sync::EpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::merkle::combine_hash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -538,6 +539,8 @@ pub enum RoutedMessageBody { PartialEncodedStateWitness(PartialEncodedStateWitness), PartialEncodedStateWitnessForward(PartialEncodedStateWitness), VersionedChunkEndorsement(ChunkEndorsement), + EpochSyncRequest, + EpochSyncResponse(EpochSyncProof), } impl RoutedMessageBody { @@ -613,6 +616,14 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::VersionedChunkEndorsement(_) => { write!(f, "VersionedChunkEndorsement") } + RoutedMessageBody::EpochSyncRequest => write!(f, "EpochSyncRequest"), + RoutedMessageBody::EpochSyncResponse(proof) => { + write!( + f, + "EpochSyncResponse(epoch: {:?})", + proof.current_epoch.first_block_header_in_epoch.epoch_id(), + ) + } } } } @@ -696,6 +707,7 @@ impl RoutedMessage { RoutedMessageBody::Ping(_) | RoutedMessageBody::TxStatusRequest(_, _) | RoutedMessageBody::PartialEncodedChunkRequest(_) + | RoutedMessageBody::EpochSyncRequest ) } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index f31e58f1c4b..1529da14866 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1,8 +1,9 @@ use crate::accounts_data::AccountDataError; use crate::client::{ AnnounceAccountRequest, BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, - BlockResponse, ChunkEndorsementMessage, ProcessTxRequest, RecvChallenge, StateRequestHeader, - StateRequestPart, StateResponse, TxStatusRequest, TxStatusResponse, + BlockResponse, ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage, + ProcessTxRequest, RecvChallenge, StateRequestHeader, StateRequestPart, StateResponse, + TxStatusRequest, TxStatusResponse, }; use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; @@ -1049,6 +1050,14 @@ impl PeerActor { network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok(); None } + RoutedMessageBody::EpochSyncRequest => { + network_state.client.send(EpochSyncRequestMessage { route_back: msg_hash }); + None + } + RoutedMessageBody::EpochSyncResponse(proof) => { + network_state.client.send(EpochSyncResponseMessage { from_peer: peer_id, proof }); + None + } body => { tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); None diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 3ab5f2070bd..6e04e871203 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -1015,6 +1015,44 @@ impl PeerManagerActor { } NetworkResponses::NoResponse } + NetworkRequests::EpochSyncRequest { peer_id } => { + if self.state.send_message_to_peer( + &self.clock, + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(peer_id), + body: RoutedMessageBody::EpochSyncRequest, + }, + ), + ) { + NetworkResponses::NoResponse + } else { + NetworkResponses::RouteNotFound + } + } + NetworkRequests::EpochSyncResponse { route_back, proof } => { + if self.state.send_message_to_peer( + &self.clock, + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::Hash(route_back), + body: RoutedMessageBody::EpochSyncResponse(proof), + }, + ), + ) { + NetworkResponses::NoResponse + } else { + tracing::info!( + "Failed to send EpochSyncResponse to {}, route not found", + route_back + ); + NetworkResponses::RouteNotFound + } + } } } diff --git a/chain/network/src/rate_limits/messages_limits.rs b/chain/network/src/rate_limits/messages_limits.rs index c02eb954b89..08d2d8ea40f 100644 --- a/chain/network/src/rate_limits/messages_limits.rs +++ b/chain/network/src/rate_limits/messages_limits.rs @@ -218,6 +218,8 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa Some((PartialEncodedStateWitnessForward, 1)) } RoutedMessageBody::VersionedChunkEndorsement(_) => Some((ChunkEndorsement, 1)), + RoutedMessageBody::EpochSyncRequest => None, + RoutedMessageBody::EpochSyncResponse(_) => None, RoutedMessageBody::Ping(_) | RoutedMessageBody::Pong(_) | RoutedMessageBody::_UnusedChunkStateWitness diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 6fe32e92bd3..76e56a2dbb9 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -3,7 +3,8 @@ use std::sync::{Arc, Mutex}; use crate::client::{ BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse, - ChunkEndorsementMessage, ProcessTxRequest, ProcessTxResponse, + ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest, + ProcessTxResponse, }; use crate::shards_manager::ShardsManagerRequestFromNetwork; use crate::state_witness::{ @@ -32,6 +33,8 @@ pub struct ClientSenderForTestLoopNetwork { pub block_approval: AsyncSender, pub transaction: AsyncSender, pub chunk_endorsement: AsyncSender, + pub epoch_sync_request: Sender, + pub epoch_sync_response: Sender, } #[derive(Clone, MultiSend, MultiSenderFrom)] @@ -213,6 +216,7 @@ fn network_message_to_client_handler( let my_account_id = my_account_id.clone(); Box::new(move |request| match request { NetworkRequests::Block { block } => { + let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap(); for account_id in shared_state.accounts() { if account_id != &my_account_id { let future = shared_state @@ -220,7 +224,7 @@ fn network_message_to_client_handler( .client_sender .send_async(BlockResponse { block: block.clone(), - peer_id: PeerId::random(), + peer_id: my_peer_id.clone(), was_requested: false, }); drop(future); @@ -257,6 +261,23 @@ fn network_message_to_client_handler( drop(future); None } + NetworkRequests::EpochSyncRequest { peer_id } => { + let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap(); + assert_ne!(&peer_id, my_peer_id, "Sending message to self not supported."); + shared_state.senders_for_peer(&peer_id).client_sender.send(EpochSyncRequestMessage { + route_back: shared_state.generate_route_back(my_peer_id), + }); + None + } + NetworkRequests::EpochSyncResponse { route_back, proof } => { + let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap(); + shared_state + .senders_for_route_back(&route_back) + .client_sender + .send(EpochSyncResponseMessage { from_peer: my_peer_id.clone(), proof }); + None + } + _ => Some(request), }) } diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 5d6329e4ba5..1b42baa6d4b 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -16,6 +16,7 @@ use near_async::time; use near_crypto::PublicKey; use near_primitives::block::{ApprovalMessage, Block, GenesisId}; use near_primitives::challenge::Challenge; +use near_primitives::epoch_sync::EpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; @@ -271,6 +272,10 @@ pub enum NetworkRequests { PartialEncodedStateWitness(Vec<(AccountId, PartialEncodedStateWitness)>), /// Message from chunk validator to all other chunk validators to forward state witness part. PartialEncodedStateWitnessForward(Vec, PartialEncodedStateWitness), + /// Requests an epoch sync + EpochSyncRequest { peer_id: PeerId }, + /// Response to an epoch sync request + EpochSyncResponse { route_back: CryptoHash, proof: EpochSyncProof }, } /// Combines peer address info, chain. diff --git a/core/async/src/test_loop.rs b/core/async/src/test_loop.rs index bc1dd9e0947..624ccdf2130 100644 --- a/core/async/src/test_loop.rs +++ b/core/async/src/test_loop.rs @@ -104,6 +104,9 @@ pub struct TestLoopV2 { /// Shutdown flag. When this flag is true, delayed action runners will no /// longer post any new events to the event loop. shutting_down: Arc, + /// If present, a function to call to print something every time an event is + /// handled. Intended only for debugging. + every_event_callback: Option>, } /// An event waiting to be executed, ordered by the due time and then by ID. @@ -208,6 +211,7 @@ impl TestLoopV2 { current_time: Duration::ZERO, clock: FakeClock::default(), shutting_down, + every_event_callback: None, } } @@ -268,6 +272,10 @@ impl TestLoopV2 { self.data.register_actor_for_index(index, actor, adapter) } + pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) { + self.every_event_callback = Some(Box::new(callback)); + } + /// Helper to push events we have just received into the heap. fn queue_received_events(&mut self) { for event in self.pending_events.lock().unwrap().events.drain(..) { @@ -351,6 +359,10 @@ impl TestLoopV2 { tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json); assert_eq!(self.current_time, event.due); + if let Some(callback) = &mut self.every_event_callback { + callback(&self.data); + } + let callback = event.event.callback; callback(&mut self.data); diff --git a/core/async/src/test_loop/sender.rs b/core/async/src/test_loop/sender.rs index e1df6a5ccc5..502a172e89a 100644 --- a/core/async/src/test_loop/sender.rs +++ b/core/async/src/test_loop/sender.rs @@ -83,13 +83,13 @@ where { fn send(&self, msg: M) { let mut this = self.clone(); + let description = format!("{}({:?})", pretty_type_name::(), &msg); let callback = move |data: &mut TestLoopData| { - tracing::debug!(target: "test_loop", "Handling message: {:?}", msg); let actor = data.get_mut(&this.actor_handle); actor.handle(msg, &mut this); }; self.pending_events_sender.send_with_delay( - format!("{}({})", pretty_type_name::(), pretty_type_name::()), + description, Box::new(callback), self.sender_delay, ); @@ -104,15 +104,15 @@ where { fn send(&self, msg: MessageWithCallback) { let mut this = self.clone(); + let description = format!("{}({:?})", pretty_type_name::(), &msg.message); let callback = move |data: &mut TestLoopData| { - tracing::debug!(target: "test_loop", "Handling message: {:?}", msg); let MessageWithCallback { message: msg, callback } = msg; let actor = data.get_mut(&this.actor_handle); let result = actor.handle(msg, &mut this); callback(Ok(result)); }; self.pending_events_sender.send_with_delay( - format!("{}({})", pretty_type_name::(), pretty_type_name::()), + description, Box::new(callback), self.sender_delay, ); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 5d680cdb038..1fdc67b62f1 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -159,6 +159,29 @@ impl SyncConfig { } } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct EpochSyncConfig { + pub enabled: bool, + pub epoch_sync_horizon: BlockHeightDelta, + pub epoch_sync_accept_proof_max_horizon: BlockHeightDelta, + #[serde(with = "near_time::serde_duration_as_std")] + pub timeout_for_epoch_sync: Duration, +} + +impl Default for EpochSyncConfig { + fn default() -> Self { + Self { + enabled: false, + // Mainnet is 43200 blocks per epoch, so let's default to epoch sync if + // we're more than 5 epochs behind, and we accept proofs up to 2 epochs old. + // (Epoch sync should not be picking a target epoch more than 2 epochs old.) + epoch_sync_horizon: 216000, + epoch_sync_accept_proof_max_horizon: 86400, + timeout_for_epoch_sync: Duration::seconds(60), + } + } +} + // A handle that allows the main process to interrupt resharding if needed. // This typically happens when the main process is interrupted. #[derive(Clone)] @@ -277,6 +300,10 @@ pub fn default_state_sync() -> Option { }) } +pub fn default_epoch_sync() -> Option { + Some(EpochSyncConfig::default()) +} + pub fn default_state_sync_enabled() -> bool { true } @@ -450,6 +477,8 @@ pub struct ClientConfig { pub state_sync_enabled: bool, /// Options for syncing state. pub state_sync: StateSyncConfig, + /// Options for epoch sync. + pub epoch_sync: EpochSyncConfig, /// Limit of the size of per-shard transaction pool measured in bytes. If not set, the size /// will be unbounded. pub transaction_pool_size_limit: Option, @@ -557,6 +586,7 @@ impl ClientConfig { flat_storage_creation_period: Duration::seconds(1), state_sync_enabled, state_sync: StateSyncConfig::default(), + epoch_sync: EpochSyncConfig::default(), transaction_pool_size_limit: None, enable_multiline_logging: false, resharding_config: MutableConfigValue::new( diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index 78baf9058d6..be2e1e45e03 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -10,19 +10,20 @@ pub mod test_utils; mod updateable_config; pub use client_config::{ - default_enable_multiline_logging, default_header_sync_expected_height_per_second, - default_header_sync_initial_timeout, default_header_sync_progress_timeout, - default_header_sync_stall_ban_timeout, default_log_summary_period, - default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, - default_produce_chunk_add_transactions_time_limit, default_state_sync, - default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, ChunkDistributionNetworkConfig, ChunkDistributionUris, - ClientConfig, DumpConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, - LogSummaryStyle, ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, + default_enable_multiline_logging, default_epoch_sync, + default_header_sync_expected_height_per_second, default_header_sync_initial_timeout, + default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, + default_log_summary_period, default_orphan_state_witness_max_size, + default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, + default_state_sync, default_state_sync_enabled, default_state_sync_timeout, + default_sync_check_period, default_sync_height_threshold, default_sync_max_block_requests, + default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, + ChunkDistributionNetworkConfig, ChunkDistributionUris, ClientConfig, DumpConfig, + EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, LogSummaryStyle, + ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; diff --git a/core/primitives/src/epoch_sync.rs b/core/primitives/src/epoch_sync.rs new file mode 100644 index 00000000000..6577c87cde6 --- /dev/null +++ b/core/primitives/src/epoch_sync.rs @@ -0,0 +1,82 @@ +use crate::block_header::BlockHeader; +use crate::epoch_block_info::BlockInfo; +use crate::epoch_info::EpochInfo; +use crate::merkle::PartialMerkleTree; +use crate::types::validator_stake::ValidatorStake; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::Signature; + +/// Proof that the blockchain history had progressed from the genesis (not included here) to the +/// current epoch indicated in the proof. +/// +/// A side note to better understand the fields in this proof: the last three blocks of any +/// epoch are guaranteed to have consecutive heights: +/// - H: The last final block of the epoch +/// - H + 1: The second last block of the epoch +/// - H + 2: The last block of the epoch +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct EpochSyncProof { + /// All the past epochs, starting from the first epoch after genesis, to + /// the last epoch before the current epoch. + pub past_epochs: Vec, + /// Some extra data for the last epoch before the current epoch. + pub last_epoch: EpochSyncProofLastEpochData, + /// Extra information to initialize the current epoch we're syncing to. + pub current_epoch: EpochSyncProofCurrentEpochData, +} + +/// Data needed for each epoch in the past. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct EpochSyncProofPastEpochData { + /// The block producers and their stake, for this epoch. This is verified + /// against the `next_bp_hash` of the `last_final_block_header` of the epoch before this. + pub block_producers: Vec, + /// The last final block header of the epoch (i.e. third last block of the epoch). + /// This is verified against the `approvals_for_last_final_block`. + pub last_final_block_header: BlockHeader, + /// Approvals for the last final block, which comes from the second last block of the epoch. + /// Since it has a consecutive height from the final block, the approvals are guaranteed to + /// be endorsements which directly endorse the final block. + pub approvals_for_last_final_block: Vec>>, +} + +/// Data needed to initialize the epoch sync boundary. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct EpochSyncProofLastEpochData { + /// The following six fields are used to derive the epoch_sync_data_hash included in any + /// BlockHeaderV3. This is used to verify all the data we need around the epoch sync + /// boundary. + pub epoch_info: EpochInfo, + pub next_epoch_info: EpochInfo, + pub next_next_epoch_info: EpochInfo, + pub first_block_in_epoch: BlockInfo, + pub last_block_in_epoch: BlockInfo, + pub second_last_block_in_epoch: BlockInfo, + + /// Any final block header in the next epoch (i.e. current epoch for the whole proof). + /// This is used to provide the `epoch_sync_data_hash` mentioned above. + pub final_block_header_in_next_epoch: BlockHeader, + /// Approvals for `final_block_header_in_next_epoch`, used to prove that that block header + /// is valid. + pub approvals_for_final_block_in_next_epoch: Vec>>, +} + +/// Data needed to initialize the current epoch we're syncing to. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct EpochSyncProofCurrentEpochData { + /// The first block header that begins the epoch. It is proven using a merkle proof + /// against the final block provided in the LastEpochData. Note that we cannot use signatures + /// to prove this like the other cases, because the first block header may not have a + /// consecutive height afterwards. + pub first_block_header_in_epoch: BlockHeader, + // TODO(#11932): can this be proven or derived? + pub first_block_info_in_epoch: BlockInfo, + // The last two block headers are also needed for various purposes after epoch sync. + // TODO(#11931): do we really need these? + pub last_block_header_in_prev_epoch: BlockHeader, + pub second_last_block_header_in_prev_epoch: BlockHeader, + // TODO(#11932): I'm not sure if this can be used to prove the block against the merkle root + // included in the final block in this next epoch (included in LastEpochData). We may need to + // include another merkle proof. + pub merkle_proof_for_first_block: PartialMerkleTree, +} diff --git a/core/primitives/src/lib.rs b/core/primitives/src/lib.rs index 235a695a1d0..04d695a4b2d 100644 --- a/core/primitives/src/lib.rs +++ b/core/primitives/src/lib.rs @@ -17,6 +17,7 @@ pub mod congestion_info; pub mod epoch_block_info; pub mod epoch_info; pub mod epoch_manager; +pub mod epoch_sync; pub mod errors; pub mod merkle; pub mod network; diff --git a/core/primitives/src/views.rs b/core/primitives/src/views.rs index 6ad35e974a3..99216a8d694 100644 --- a/core/primitives/src/views.rs +++ b/core/primitives/src/views.rs @@ -414,9 +414,12 @@ pub enum SyncStatusView { /// Not syncing / Done syncing. NoSync, /// Syncing using light-client headers to a recent epoch - // TODO #3488 - // Bowen: why do we use epoch ordinal instead of epoch id? - EpochSync { epoch_ord: u64 }, + EpochSync { + source_peer_height: BlockHeight, + source_peer_id: String, + attempt_time: String, + }, + EpochSyncDone, /// Downloading block headers for fast sync. HeaderSync { start_height: BlockHeight, diff --git a/integration-tests/src/test_loop/tests/epoch_sync.rs b/integration-tests/src/test_loop/tests/epoch_sync.rs new file mode 100644 index 00000000000..2df87f04c98 --- /dev/null +++ b/integration-tests/src/test_loop/tests/epoch_sync.rs @@ -0,0 +1,224 @@ +use itertools::Itertools; +use near_async::time::Duration; +use near_chain_configs::test_genesis::TestGenesisBuilder; +use near_client::test_utils::test_loop::ClientQueries; +use near_o11y::testonly::init_test_logger; +use near_primitives::types::AccountId; + +use crate::test_loop::builder::TestLoopBuilder; +use crate::test_loop::env::TestLoopEnv; +use crate::test_loop::utils::transactions::execute_money_transfers; +use crate::test_loop::utils::ONE_NEAR; +use near_async::messaging::CanSend; +use near_chain::ChainStoreAccess; +use near_client::SetNetworkInfo; +use near_network::types::{HighestHeightPeerInfo, NetworkInfo, PeerInfo}; +use near_primitives::block::GenesisId; +use near_store::test_utils::create_test_store; +use std::cell::RefCell; +use std::rc::Rc; + +const NUM_CLIENTS: usize = 4; + +// Test that a new node that only has genesis can use whatever method available +// to sync up to the current state of the network. +#[test] +fn test_epoch_sync_from_genesis() { + init_test_logger(); + let builder = TestLoopBuilder::new(); + + let initial_balance = 10000 * ONE_NEAR; + let accounts = + (0..100).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + let clients = accounts.iter().take(NUM_CLIENTS).cloned().collect_vec(); + + let mut genesis_builder = TestGenesisBuilder::new(); + genesis_builder + .genesis_time_from_clock(&builder.clock()) + .protocol_version_latest() + .genesis_height(10000) + .gas_prices_free() + .gas_limit_one_petagas() + .shard_layout_simple_v1(&["account3", "account5", "account7"]) + .transaction_validity_period(1000) + .epoch_length(10) + .validators_desired_roles(&clients.iter().map(|t| t.as_str()).collect_vec(), &[]) + .shuffle_shard_assignment_for_chunk_producers(true); + for account in &accounts { + genesis_builder.add_user_account_simple(account.clone(), initial_balance); + } + let genesis = genesis_builder.build(); + + let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = + builder.genesis(genesis.clone()).clients(clients).build(); + + let first_epoch_tracked_shards = { + let clients = node_datas + .iter() + .map(|data| &test_loop.data.get(&data.client_sender.actor_handle()).client) + .collect_vec(); + clients.tracked_shards_for_each_client() + }; + tracing::info!("First epoch tracked shards: {:?}", first_epoch_tracked_shards); + + execute_money_transfers(&mut test_loop, &node_datas, &accounts); + + // Make sure the chain progressed for several epochs. + assert!( + test_loop + .data + .get(&node_datas[0].client_sender.actor_handle()) + .client + .chain + .head() + .unwrap() + .height + > 10050 + ); + + // Make a new TestLoopEnv, adding a new node to the network, and check that it can properly sync. + let mut stores = Vec::new(); + for data in &node_datas { + stores.push(( + test_loop + .data + .get(&data.client_sender.actor_handle()) + .client + .chain + .chain_store + .store() + .clone(), + None, + )); + } + stores.push((create_test_store(), None)); // new node starts empty. + + // Properly shut down the previous TestLoopEnv. + // We must preserve the tempdir, since state dumps are stored there, + // and are necessary for state sync to work on the new node. + let tempdir = TestLoopEnv { test_loop, datas: node_datas, tempdir } + .shutdown_and_drain_remaining_events(Duration::seconds(5)); + + tracing::info!("Starting new TestLoopEnv with new node"); + + let clients = accounts.iter().take(NUM_CLIENTS + 1).cloned().collect_vec(); + + let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = TestLoopBuilder::new() + .genesis(genesis.clone()) + .clients(clients) + .stores_override(stores) + .test_loop_data_dir(tempdir) + .config_modifier(|config, _| { + // Enable epoch sync, and make the horizon small enough to trigger it. + config.epoch_sync.enabled = true; + config.epoch_sync.epoch_sync_horizon = 30; + config.epoch_sync.epoch_sync_accept_proof_max_horizon = 20; + // Make header sync horizon small enough to trigger it. + config.block_header_fetch_horizon = 8; + // Make block sync horizon small enough to trigger it. + config.block_fetch_horizon = 3; + }) + .skip_warmup() + .build(); + + // Note: TestLoopEnv does not currently propagate the network info to other peers. This is because + // the networking layer is completely mocked out. So in order to allow the new node to sync, we + // need to manually propagate the network info to the new node. In this case we'll tell the new + // node that node 0 is available to sync from. + let chain0 = &test_loop.data.get(&node_datas[0].client_sender.actor_handle()).client.chain; + let peer_info = HighestHeightPeerInfo { + archival: false, + genesis_id: GenesisId { chain_id: genesis.config.chain_id, hash: *chain0.genesis().hash() }, + highest_block_hash: chain0.head().unwrap().last_block_hash, + highest_block_height: chain0.head().unwrap().height, + tracked_shards: vec![], + peer_info: PeerInfo { + account_id: Some(accounts[0].clone()), + addr: None, + id: node_datas[0].peer_id.clone(), + }, + }; + node_datas[NUM_CLIENTS].client_sender.send(SetNetworkInfo(NetworkInfo { + connected_peers: Vec::new(), + highest_height_peers: vec![peer_info], // only this field matters. + known_producers: vec![], + num_connected_peers: 0, + peer_max_count: 0, + received_bytes_per_sec: 0, + sent_bytes_per_sec: 0, + tier1_accounts_data: Vec::new(), + tier1_accounts_keys: Vec::new(), + tier1_connections: Vec::new(), + })); + + // Check that the new node will reach a high height as well. + let new_node = node_datas.last().unwrap().client_sender.actor_handle(); + let sync_status_history = Rc::new(RefCell::new(Vec::new())); + { + let sync_status_history = sync_status_history.clone(); + test_loop.set_every_event_callback(move |test_loop_data| { + let client = &test_loop_data.get(&new_node).client; + let header_head_height = client.chain.header_head().unwrap().height; + let head_height = client.chain.head().unwrap().height; + tracing::info!( + "New node sync status: {:?}, header head height: {:?}, head height: {:?}", + client.sync_status, + header_head_height, + head_height + ); + let sync_status = client.sync_status.as_variant_name(); + let mut history = sync_status_history.borrow_mut(); + if history.last().map(|s| s as &str) != Some(sync_status) { + history.push(sync_status.to_string()); + } + }); + } + let new_node = node_datas.last().unwrap().client_sender.actor_handle(); + let node0 = node_datas[0].client_sender.actor_handle(); + test_loop.run_until( + |test_loop_data| { + let new_node_height = test_loop_data.get(&new_node).client.chain.head().unwrap().height; + let node0_height = test_loop_data.get(&node0).client.chain.head().unwrap().height; + new_node_height == node0_height + }, + Duration::seconds(20), + ); + + let current_height = test_loop.data.get(&node0).client.chain.head().unwrap().height; + // Run for at least two more epochs to make sure everything continues to be fine. + test_loop.run_until( + |test_loop_data| { + let new_node_height = test_loop_data.get(&new_node).client.chain.head().unwrap().height; + new_node_height >= current_height + 30 + }, + Duration::seconds(30), + ); + + TestLoopEnv { test_loop, datas: node_datas, tempdir } + .shutdown_and_drain_remaining_events(Duration::seconds(5)); + + assert_eq!( + sync_status_history.borrow().as_slice(), + &[ + // Initial state. + "AwaitingPeers", + // State after having enough peers. + "NoSync", + // EpochSync should be entered first. + "EpochSync", + // EpochSync should succeed. + "EpochSyncDone", + // Header sync happens next to bring forward HEADER_HEAD. + "HeaderSync", + // State sync downloads the state from state dumps. + "StateSync", + // Block sync picks up from where StateSync left off, and finishes the sync. + "BlockSync", + // NoSync means we're up to date. + "NoSync" + ] + .into_iter() + .map(|s| s.to_string()) + .collect::>() + ); +} diff --git a/integration-tests/src/test_loop/tests/mod.rs b/integration-tests/src/test_loop/tests/mod.rs index 43165c1bcde..482c113f58e 100644 --- a/integration-tests/src/test_loop/tests/mod.rs +++ b/integration-tests/src/test_loop/tests/mod.rs @@ -1,6 +1,7 @@ mod chunk_validator_kickout; pub mod congestion_control; pub mod congestion_control_genesis_bootstrap; +pub mod epoch_sync; pub mod in_memory_tries; pub mod max_receipt_size; pub mod multinode_stateless_validators; diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 8a36ffff5d6..466ed046af8 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -9,24 +9,24 @@ use near_chain_configs::test_utils::{ TESTING_INIT_BALANCE, TESTING_INIT_STAKE, }; use near_chain_configs::{ - default_enable_multiline_logging, default_header_sync_expected_height_per_second, - default_header_sync_initial_timeout, default_header_sync_progress_timeout, - default_header_sync_stall_ban_timeout, default_log_summary_period, - default_orphan_state_witness_max_size, default_orphan_state_witness_pool_size, - default_produce_chunk_add_transactions_time_limit, default_state_sync, - default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period, - default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, get_initial_supply, ChunkDistributionNetworkConfig, - ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, LogSummaryStyle, - MutableConfigValue, MutableValidatorSigner, ReshardingConfig, StateSyncConfig, - BLOCK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_PRODUCER_KICKOUT_THRESHOLD, - CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, EXPECTED_EPOCH_LENGTH, FISHERMEN_THRESHOLD, - GAS_PRICE_ADJUSTMENT_RATE, GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, MAX_INFLATION_RATE, - MIN_BLOCK_PRODUCTION_DELAY, MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, - NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, PROTOCOL_UPGRADE_STAKE_THRESHOLD, - TRANSACTION_VALIDITY_PERIOD, + default_enable_multiline_logging, default_epoch_sync, + default_header_sync_expected_height_per_second, default_header_sync_initial_timeout, + default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, + default_log_summary_period, default_orphan_state_witness_max_size, + default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit, + default_state_sync, default_state_sync_enabled, default_state_sync_timeout, + default_sync_check_period, default_sync_height_threshold, default_sync_max_block_requests, + default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, get_initial_supply, + ChunkDistributionNetworkConfig, ClientConfig, EpochSyncConfig, GCConfig, Genesis, + GenesisConfig, GenesisValidationMode, LogSummaryStyle, MutableConfigValue, + MutableValidatorSigner, ReshardingConfig, StateSyncConfig, BLOCK_PRODUCER_KICKOUT_THRESHOLD, + CHUNK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, + EXPECTED_EPOCH_LENGTH, FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, GENESIS_CONFIG_FILENAME, + INITIAL_GAS_LIMIT, MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, MIN_GAS_PRICE, NEAR_BASE, + NUM_BLOCKS_PER_YEAR, NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, + PROTOCOL_UPGRADE_STAKE_THRESHOLD, TRANSACTION_VALIDITY_PERIOD, }; use near_config_utils::{ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey}; @@ -273,6 +273,8 @@ pub struct Config { /// Options for syncing state. #[serde(skip_serializing_if = "Option::is_none")] pub state_sync: Option, + /// Options for epoch sync + pub epoch_sync: Option, /// Limit of the size of per-shard transaction pool measured in bytes. If not set, the size /// will be unbounded. /// @@ -358,6 +360,7 @@ impl Default for Config { split_storage: None, expected_shutdown: None, state_sync: default_state_sync(), + epoch_sync: default_epoch_sync(), state_sync_enabled: default_state_sync_enabled(), transaction_pool_size_limit: default_transaction_pool_size_limit(), enable_multiline_logging: default_enable_multiline_logging(), @@ -578,6 +581,7 @@ impl NearConfig { flat_storage_creation_period: Duration::seconds(1), state_sync_enabled: config.state_sync_enabled, state_sync: config.state_sync.unwrap_or_default(), + epoch_sync: config.epoch_sync.unwrap_or_default(), transaction_pool_size_limit: config.transaction_pool_size_limit, enable_multiline_logging: config.enable_multiline_logging.unwrap_or(true), resharding_config: MutableConfigValue::new( diff --git a/tools/chainsync-loadtest/src/network.rs b/tools/chainsync-loadtest/src/network.rs index de6aefaeda0..2d99b306789 100644 --- a/tools/chainsync-loadtest/src/network.rs +++ b/tools/chainsync-loadtest/src/network.rs @@ -264,6 +264,8 @@ impl Network { Ok(accounts.0.into_iter().map(|a| a.0).collect::>()) }), chunk_endorsement: noop().into_sender(), + epoch_sync_request: noop().into_sender(), + epoch_sync_response: noop().into_sender(), } } }