Skip to content

Commit

Permalink
[Epoch Sync] Initial implementation for Epoch Sync V4. (#11934)
Browse files Browse the repository at this point in the history
Remaining issues:

near/near-one-project-tracking#73

I still need to go through this one more time, but reviews can be done
in the meantime.
  • Loading branch information
robin-near authored Aug 22, 2024
1 parent 7d0e3d7 commit 3a9c72a
Show file tree
Hide file tree
Showing 31 changed files with 1,169 additions and 116 deletions.
5 changes: 5 additions & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ impl ChainStore {
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered();
let head = self.head()?;
if head.prev_block_hash == CryptoHash::default() {
// This is genesis. It means we are state syncing right after epoch sync. Don't clear
// anything at genesis, or else the node will never boot up again.
return Ok(());
}
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
let prev_hash = *header.prev_hash();
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,9 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(Default::default())
}

fn epoch_sync_init_epoch_manager(
fn init_after_epoch_sync(
&self,
_store_update: &mut StoreUpdate,
_prev_epoch_first_block_info: BlockInfo,
_prev_epoch_last_block_info: BlockInfo,
_prev_epoch_prev_last_block_info: BlockInfo,
Expand Down
28 changes: 20 additions & 8 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ impl std::fmt::Debug for StateSyncStatus {
}
}

#[derive(Clone, Debug)]
pub struct EpochSyncStatus {
pub source_peer_height: BlockHeight,
pub source_peer_id: PeerId,
pub attempt_time: near_time::Utc,
}

/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Clone, Debug, strum::AsRefStr)]
pub enum SyncStatus {
Expand All @@ -279,9 +286,8 @@ pub enum SyncStatus {
/// Not syncing / Done syncing.
NoSync,
/// Syncing using light-client headers to a recent epoch
// TODO #3488
// Bowen: why do we use epoch ordinal instead of epoch id?
EpochSync { epoch_ord: u64 },
EpochSync(EpochSyncStatus),
EpochSyncDone,
/// Downloading block headers for fast sync.
HeaderSync {
/// Head height at the beginning. Not the header head height!
Expand Down Expand Up @@ -328,10 +334,11 @@ impl SyncStatus {
SyncStatus::NoSync => 0,
SyncStatus::AwaitingPeers => 1,
SyncStatus::EpochSync { .. } => 2,
SyncStatus::HeaderSync { .. } => 3,
SyncStatus::StateSync(_) => 4,
SyncStatus::StateSyncDone => 5,
SyncStatus::BlockSync { .. } => 6,
SyncStatus::EpochSyncDone { .. } => 3,
SyncStatus::HeaderSync { .. } => 4,
SyncStatus::StateSync(_) => 5,
SyncStatus::StateSyncDone => 6,
SyncStatus::BlockSync { .. } => 7,
}
}

Expand All @@ -356,7 +363,12 @@ impl From<SyncStatus> for SyncStatusView {
match status {
SyncStatus::AwaitingPeers => SyncStatusView::AwaitingPeers,
SyncStatus::NoSync => SyncStatusView::NoSync,
SyncStatus::EpochSync { epoch_ord } => SyncStatusView::EpochSync { epoch_ord },
SyncStatus::EpochSync(status) => SyncStatusView::EpochSync {
source_peer_height: status.source_peer_height,
source_peer_id: status.source_peer_id.to_string(),
attempt_time: status.attempt_time.to_string(),
},
SyncStatus::EpochSyncDone => SyncStatusView::EpochSyncDone,
SyncStatus::HeaderSync { start_height, current_height, highest_height } => {
SyncStatusView::HeaderSync { start_height, current_height, highest_height }
}
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub fn client_sender_for_network(
tx_status_request: view_client_addr.clone().into_sender(),
tx_status_response: view_client_addr.clone().into_sender(),
announce_account: view_client_addr.into_sender(),
chunk_endorsement: client_addr.into_sender(),
chunk_endorsement: client_addr.clone().into_sender(),
epoch_sync_request: client_addr.clone().into_sender(),
epoch_sync_response: client_addr.into_sender(),
}
}
11 changes: 11 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use tracing::{debug, debug_span, error, info, instrument, trace, warn};

#[cfg(feature = "test_features")]
use crate::client_actor::AdvProduceChunksMode;
use crate::sync::epoch::EpochSync;

const NUM_REBROADCAST_BLOCKS: usize = 30;

Expand Down Expand Up @@ -151,6 +152,8 @@ pub struct Client {
/// storing the current status of the state sync and blocks catch up
pub catchup_state_syncs:
HashMap<CryptoHash, (StateSync, HashMap<u64, ShardSyncDownload>, BlocksCatchUpState)>,
/// Keeps track of information needed to perform the initial Epoch Sync
pub epoch_sync: EpochSync,
/// Keeps track of syncing headers.
pub header_sync: HeaderSync,
/// Keeps track of syncing block.
Expand Down Expand Up @@ -277,6 +280,13 @@ impl Client {
let sharded_tx_pool =
ShardedTransactionPool::new(rng_seed, config.transaction_pool_size_limit);
let sync_status = SyncStatus::AwaitingPeers;
let epoch_sync = EpochSync::new(
clock.clone(),
network_adapter.clone(),
chain.genesis().clone(),
async_computation_spawner.clone(),
config.epoch_sync.clone(),
);
let header_sync = HeaderSync::new(
clock.clone(),
network_adapter.clone(),
Expand Down Expand Up @@ -372,6 +382,7 @@ impl Client {
NonZeroUsize::new(num_block_producer_seats).unwrap(),
),
catchup_state_syncs: HashMap::new(),
epoch_sync,
header_sync,
block_sync,
state_sync,
Expand Down
14 changes: 12 additions & 2 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1712,9 +1712,19 @@ impl ClientActorInner {

/// Handle the SyncRequirement::SyncNeeded.
///
/// This method runs the header sync, the block sync
/// This method performs whatever syncing technique is needed (epoch sync, header sync,
/// state sync, block sync) to make progress towards bring the node up to date.
fn handle_sync_needed(&mut self, highest_height: u64, signer: &Option<Arc<ValidatorSigner>>) {
// Run each step of syncing separately.
// Run epoch sync first; if this is applicable then nothing else is.
let epoch_sync_result = self.client.epoch_sync.run(
&mut self.client.sync_status,
&self.client.chain,
highest_height,
&self.network_info.highest_height_peers,
);
unwrap_and_report_state_sync_result!(epoch_sync_result);

// Run header sync as long as there are headers to catch up.
let header_sync_result = self.client.header_sync.run(
&mut self.client.sync_status,
&mut self.client.chain,
Expand Down
5 changes: 3 additions & 2 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,10 @@ pub fn display_sync_status(
match sync_status {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
SyncStatus::NoSync => format!("#{:>8} {:>44}", head.height, head.last_block_hash),
SyncStatus::EpochSync { epoch_ord } => {
format!("[EPOCH: {:>5}] Getting to a recent epoch", epoch_ord)
SyncStatus::EpochSync(status) => {
format!("[EPOCH] {:?}", status)
}
SyncStatus::EpochSyncDone => "[EPOCH] Done".to_string(),
SyncStatus::HeaderSync { start_height, current_height, highest_height } => {
let percent = if highest_height <= start_height {
0.0
Expand Down
Loading

0 comments on commit 3a9c72a

Please sign in to comment.