From aea390addaa66cc910415a175c4013608b34d2ad Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Wed, 4 Sep 2024 23:45:03 -0400 Subject: [PATCH] fix(mirror): fix issues with limits on transaction speed (#11916) There has been an issue where the rate of transactions sent can only go so high, and when we try to raise it with the "tx_batch_interval" option, it hits a bottleneck and fails to send transactions fast enough. What is happening is that the `main_loop()` function handles many other things than sending transactions, and shares a thread with the indexer for the target chain. The biggest bottleneck is the fact that building new sets of transactions to send in `queue_txs()` after each batch of transactions we send actually can take a long time, so that the early break in that function when we're 20 milliseconds away from needing to send more transactions is not enough. Then also sometimes we are held up by the indexer sending a new block and us processing it. So to fix this, quite a lot needs to be refactored so that we can put these components in different threads. We pass around a `Mutex`, and modify `on_target_block()` so that it's not async, because otherwise we wouldn't be able to lock the mutex and call that function in a new thread, since the resulting future would not be `Send`. The main thread handles queueing new blocks worth of transactions, and calling all the async `TxTracker` functions, and we start a new thread to run the loop that sends transactions, and another to run the indexer and call `TxTracker::on_target_block()` --- Cargo.lock | 1 + pytest/tests/mocknet/helpers/neard_runner.py | 2 +- tools/mirror/Cargo.toml | 1 + tools/mirror/src/chain_tracker.rs | 591 ++++++++-------- tools/mirror/src/lib.rs | 668 ++++++++++++++----- 5 files changed, 768 insertions(+), 495 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b769222f10..c6dcce2249b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4509,6 +4509,7 @@ name = "near-mirror" version = "0.0.0" dependencies = [ "actix", + "actix-rt", "anyhow", "async-trait", "borsh 1.2.0", diff --git a/pytest/tests/mocknet/helpers/neard_runner.py b/pytest/tests/mocknet/helpers/neard_runner.py index cced33fbbdb..a3003b8ecdf 100644 --- a/pytest/tests/mocknet/helpers/neard_runner.py +++ b/pytest/tests/mocknet/helpers/neard_runner.py @@ -911,7 +911,7 @@ def start_neard(self, batch_interval_millis=None): # Configure the logs config file to control the level of rust and opentelemetry logs. # Default config sets level to DEBUG for "client" and "chain" logs, WARN for tokio+actix, and INFO for everything else. def configure_log_config(self): - default_log_filter = 'client=debug,chain=debug,actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,info' + default_log_filter = 'client=debug,chain=debug,mirror=debug,actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,info' log_config_path = self.target_near_home_path('log_config.json') logging.info("Creating log_config.json with default log filter.") diff --git a/tools/mirror/Cargo.toml b/tools/mirror/Cargo.toml index 77813903458..9aea9e8ef2a 100644 --- a/tools/mirror/Cargo.toml +++ b/tools/mirror/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] actix.workspace = true +actix-rt.workspace = true anyhow.workspace = true async-trait.workspace = true borsh.workspace = true diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs index 084cd1c07fe..81dca68a34b 100644 --- a/tools/mirror/src/chain_tracker.rs +++ b/tools/mirror/src/chain_tracker.rs @@ -19,7 +19,7 @@ use std::collections::hash_map; use std::collections::HashMap; use std::collections::{BTreeSet, HashSet, VecDeque}; use std::fmt::Write; -use std::pin::Pin; +use std::sync::Mutex; use std::time::{Duration, Instant}; // Information related to a single transaction that we sent in the past. @@ -124,6 +124,21 @@ pub(crate) enum SentBatch { ExtraTxs(Vec), } +// an access key's account ID and public key, along with the id of the tx or receipt that might +// have udpated it +pub(crate) struct UpdatedKey { + pub(crate) account_id: AccountId, + pub(crate) public_key: PublicKey, + pub(crate) id: CryptoHash, +} + +// return value of on_target_block() +pub(crate) struct TargetBlockInfo { + // these accounts need to be unstaked + pub(crate) staked_accounts: HashMap<(AccountId, PublicKey), AccountId>, + // these access keys that were previously unavailable may now be available + pub(crate) access_key_updates: Vec, +} // Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch() // Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes. @@ -132,7 +147,6 @@ pub(crate) enum SentBatch { pub(crate) struct TxTracker { sent_txs: HashMap, txs_by_signer: HashMap<(AccountId, PublicKey), BTreeSet>, - queued_blocks: VecDeque, // for each updater (a tx or receipt hash, or a queued transaction we haven't sent yet), keeps // a set of access keys who might be updated by it updater_to_keys: HashMap>, @@ -146,7 +160,6 @@ pub(crate) struct TxTracker { nonempty_height_queued: Option, height_popped: Option, height_seen: Option, - send_time: Pin>, // Config value in the target chain, used to judge how long to wait before sending a new batch of txs min_block_production_delay: Duration, // optional specific tx send delay @@ -176,13 +189,8 @@ impl TxTracker { next_heights, stop_height, tx_batch_interval, - // Wait at least 15 seconds before sending any transactions because for - // a few seconds after the node starts, transaction routing requests - // will be silently dropped by the peer manager. - send_time: Box::pin(tokio::time::sleep(std::time::Duration::from_secs(15))), sent_txs: HashMap::new(), txs_by_signer: HashMap::new(), - queued_blocks: VecDeque::new(), updater_to_keys: HashMap::new(), nonces: HashMap::new(), height_queued: None, @@ -194,20 +202,20 @@ impl TxTracker { } pub(crate) async fn next_heights( - &mut self, + me: &Mutex, source_chain: &T, ) -> anyhow::Result<(Option, Option)> { - while self.next_heights.len() <= crate::CREATE_ACCOUNT_DELTA { + let (mut next_heights, height_queued) = { + let t = me.lock().unwrap(); + (t.next_heights.clone(), t.height_queued) + }; + while next_heights.len() <= crate::CREATE_ACCOUNT_DELTA { // we unwrap() the height_queued because Self::new() should have been called with // nonempty next_heights. - let h = self - .next_heights - .iter() - .next_back() - .cloned() - .unwrap_or_else(|| self.height_queued.unwrap()); + let h = + next_heights.iter().next_back().cloned().unwrap_or_else(|| height_queued.unwrap()); match source_chain.get_next_block_height(h).await { - Ok(h) => self.next_heights.push_back(h), + Ok(h) => next_heights.push_back(h), Err(ChainError::Unknown) => break, Err(ChainError::Other(e)) => { return Err(e) @@ -215,15 +223,13 @@ impl TxTracker { } }; } - let next_height = self.next_heights.get(0).cloned(); - let create_account_height = self.next_heights.get(crate::CREATE_ACCOUNT_DELTA).cloned(); + let mut t = me.lock().unwrap(); + t.next_heights = next_heights; + let next_height = t.next_heights.get(0).cloned(); + let create_account_height = t.next_heights.get(crate::CREATE_ACCOUNT_DELTA).cloned(); Ok((next_height, create_account_height)) } - pub(crate) fn has_stop_height(&self) -> bool { - self.stop_height.is_some() - } - pub(crate) fn finished(&self) -> bool { match self.stop_height { Some(_) => { @@ -234,12 +240,8 @@ impl TxTracker { } } - pub(crate) fn num_blocks_queued(&self) -> usize { - self.queued_blocks.len() - } - - async fn initialize_target_nonce<'a>( - &'a mut self, + async fn initialize_target_nonce( + lock: &Mutex, target_view_client: &Addr, db: &DB, access_key: &(AccountId, PublicKey), @@ -273,47 +275,34 @@ impl TxTracker { } } }; - self.nonces.insert(access_key.clone(), info); + let mut me = lock.lock().unwrap(); + me.nonces.insert(access_key.clone(), info); Ok(()) } - async fn read_target_nonce<'a>( - &'a mut self, - target_view_client: &Addr, - db: &DB, - access_key: &(AccountId, PublicKey), - source_height: Option, - ) -> anyhow::Result<&'a mut NonceInfo> { - if !self.nonces.contains_key(access_key) { - self.initialize_target_nonce(target_view_client, db, access_key, source_height).await?; - } - Ok(self.nonces.get_mut(access_key).unwrap()) - } - - pub(crate) async fn next_nonce<'a>( - &'a mut self, + pub(crate) async fn next_nonce( + lock: &Mutex, target_view_client: &Addr, db: &DB, signer_id: &AccountId, public_key: &PublicKey, source_height: BlockHeight, - ) -> anyhow::Result<&'a TargetNonce> { + ) -> anyhow::Result { let source_height = Some(source_height); - let info = self - .read_target_nonce( - target_view_client, - db, - &(signer_id.clone(), public_key.clone()), - source_height, - ) - .await?; + let access_key = (signer_id.clone(), public_key.clone()); + if !lock.lock().unwrap().nonces.contains_key(&access_key) { + Self::initialize_target_nonce(lock, target_view_client, db, &access_key, source_height) + .await?; + } + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if source_height > info.last_height { info.last_height = source_height; } if let Some(nonce) = &mut info.target_nonce.nonce { *nonce += 1; } - Ok(&info.target_nonce) + Ok(info.target_nonce.clone()) } // normally when we're adding txs, we're adding a tx that @@ -322,7 +311,8 @@ impl TxTracker { // we've used so far + 1. But if we want to add a tx at the beginning, // we need to shift all the bigger nonces by one. pub(crate) async fn insert_nonce( - &mut self, + lock: &Mutex, + tx_block_queue: &Mutex>, target_view_client: &Addr, db: &DB, signer_id: &AccountId, @@ -330,27 +320,31 @@ impl TxTracker { secret_key: &SecretKey, ) -> anyhow::Result { let access_key = (signer_id.clone(), public_key.clone()); - if !self.nonces.contains_key(&access_key) { - self.initialize_target_nonce(target_view_client, db, &access_key, None).await?; - let info = self.nonces.get_mut(&access_key).unwrap(); + if !lock.lock().unwrap().nonces.contains_key(&access_key) { + Self::initialize_target_nonce(lock, target_view_client, db, &access_key, None).await?; + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if let Some(nonce) = &mut info.target_nonce.nonce { *nonce += 1; } return Ok(info.target_nonce.clone()); } + let mut me = lock.lock().unwrap(); let mut first_nonce = None; - let txs = self.nonces.get(&access_key).unwrap().queued_txs.clone(); - for tx_ref in txs { - let tx = self.get_tx(&tx_ref); - if first_nonce.is_none() { - first_nonce = Some(tx.target_nonce()); + let txs = me.nonces.get(&access_key).unwrap().queued_txs.clone(); + if !txs.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for tx_ref in txs { + let tx = Self::get_tx(&mut tx_block_queue, &tx_ref); + if first_nonce.is_none() { + first_nonce = Some(tx.target_nonce()); + } + tx.inc_target_nonce(secret_key) } - tx.inc_target_nonce(secret_key) } match first_nonce { Some(n) => { - if let Some(nonce) = - &mut self.nonces.get_mut(&access_key).unwrap().target_nonce.nonce + if let Some(nonce) = &mut me.nonces.get_mut(&access_key).unwrap().target_nonce.nonce { *nonce += 1; } @@ -363,18 +357,20 @@ impl TxTracker { } } - fn get_tx(&mut self, tx_ref: &TxRef) -> &mut TargetChainTx { - let block_idx = self - .queued_blocks + fn get_tx<'a>( + tx_block_queue: &'a mut VecDeque, + tx_ref: &TxRef, + ) -> &'a mut TargetChainTx { + let block_idx = tx_block_queue .binary_search_by(|b| b.source_height.cmp(&tx_ref.source_height)) .unwrap(); - let block = &mut self.queued_blocks[block_idx]; + let block = &mut tx_block_queue[block_idx]; let chunk = block.chunks.iter_mut().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); &mut chunk.txs[tx_ref.tx_idx] } async fn insert_access_key_updates( - &mut self, + lock: &Mutex, target_view_client: &Addr, db: &DB, tx_ref: &TxRef, @@ -383,8 +379,18 @@ impl TxTracker { ) -> anyhow::Result<()> { let source_height = Some(source_height); for access_key in nonce_updates.iter() { - let info = - self.read_target_nonce(target_view_client, db, access_key, source_height).await?; + if !lock.lock().unwrap().nonces.contains_key(access_key) { + Self::initialize_target_nonce( + lock, + target_view_client, + db, + &access_key, + source_height, + ) + .await?; + } + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if info.last_height < source_height { info.last_height = source_height; @@ -392,7 +398,8 @@ impl TxTracker { info.target_nonce.pending_outcomes.insert(NonceUpdater::TxRef(tx_ref.clone())); } if !nonce_updates.is_empty() { - assert!(self + let mut me = lock.lock().unwrap(); + assert!(me .updater_to_keys .insert(NonceUpdater::TxRef(tx_ref.clone()), nonce_updates.clone()) .is_none()); @@ -400,25 +407,28 @@ impl TxTracker { Ok(()) } - pub(crate) async fn queue_block( - &mut self, - block: MappedBlock, - target_view_client: &Addr, - db: &DB, - ) -> anyhow::Result<()> { - self.height_queued = Some(block.source_height); - self.next_heights.pop_front().unwrap(); + // This is the non-async portion of queue_block() that returns a list of access key updates we need + // to call insert_access_key_updates() for, which we'll do after calling this function. Otherwise + // we would have to lock and unlock the mutex on every transaction to avoid holding it across await points + fn queue_txs<'a>( + lock: &Mutex, + block: &'a MappedBlock, + ) -> anyhow::Result)>> { + let mut nonce_updates = Vec::new(); + let mut me = lock.lock().unwrap(); + me.height_queued = Some(block.source_height); + me.next_heights.pop_front().unwrap(); for c in block.chunks.iter() { if !c.txs.is_empty() { - self.nonempty_height_queued = Some(block.source_height); + me.nonempty_height_queued = Some(block.source_height); } for (tx_idx, tx) in c.txs.iter().enumerate() { let tx_ref = TxRef { source_height: block.source_height, shard_id: c.shard_id, tx_idx }; match tx { crate::TargetChainTx::Ready(tx) => { - let info = self + let info = me .nonces .get_mut(&( tx.target_tx.transaction.signer_id().clone(), @@ -426,17 +436,12 @@ impl TxTracker { )) .unwrap(); info.queued_txs.insert(tx_ref.clone()); - self.insert_access_key_updates( - target_view_client, - db, - &tx_ref, - &tx.nonce_updates, - block.source_height, - ) - .await?; + if !tx.nonce_updates.is_empty() { + nonce_updates.push((tx_ref, &tx.nonce_updates)); + } } crate::TargetChainTx::AwaitingNonce(tx) => { - let info = self + let info = me .nonces .get_mut(&( tx.target_tx.signer_id().clone(), @@ -445,133 +450,39 @@ impl TxTracker { .unwrap(); info.txs_awaiting_nonce.insert(tx_ref.clone()); info.queued_txs.insert(tx_ref.clone()); - self.insert_access_key_updates( - target_view_client, - db, - &tx_ref, - &tx.nonce_updates, - block.source_height, - ) - .await?; + if !tx.nonce_updates.is_empty() { + nonce_updates.push((tx_ref, &tx.nonce_updates)); + } } }; } } - self.queued_blocks.push_back(block); - Ok(()) - } - - pub(crate) fn next_batch_time(&self) -> Instant { - self.send_time.as_ref().deadline().into_std() + Ok(nonce_updates) } - async fn try_set_batch_nonces( - &mut self, + pub(crate) async fn queue_block( + lock: &Mutex, + tx_block_queue: &Mutex>, + block: MappedBlock, target_view_client: &Addr, db: &DB, ) -> anyhow::Result<()> { - let mut needed_access_keys = HashSet::new(); - for c in self.queued_blocks[0].chunks.iter_mut() { - for tx in c.txs.iter_mut() { - if let TargetChainTx::AwaitingNonce(t) = tx { - needed_access_keys.insert(( - t.target_tx.signer_id().clone(), - t.target_tx.public_key().clone(), - )); - } - } - } - for access_key in needed_access_keys.iter() { - self.try_set_nonces(target_view_client, db, access_key, None).await?; - } - let block = &mut self.queued_blocks[0]; - self.height_popped = Some(block.source_height); - for c in block.chunks.iter_mut() { - for (tx_idx, tx) in c.txs.iter_mut().enumerate() { - match tx { - TargetChainTx::AwaitingNonce(_) => { - let tx_ref = TxRef { - source_height: block.source_height, - shard_id: c.shard_id, - tx_idx, - }; - tx.try_set_nonce(None); - match tx { - TargetChainTx::Ready(t) => { - tracing::debug!( - target: "mirror", "Prepared {} for ({}, {:?}) with nonce {} even though there are still pending outcomes that may affect the access key", - &t.provenance, t.target_tx.transaction.signer_id(), t.target_tx.transaction.public_key(), t.target_tx.transaction.nonce() - ); - self.nonces - .get_mut(&( - t.target_tx.transaction.signer_id().clone(), - t.target_tx.transaction.public_key().clone(), - )) - .unwrap() - .txs_awaiting_nonce - .remove(&tx_ref); - } - TargetChainTx::AwaitingNonce(t) => { - tracing::warn!( - target: "mirror", "Could not prepare {} for ({}, {:?}). Nonce unknown", - &t.provenance, t.target_tx.signer_id(), t.target_tx.public_key(), - ); - self.nonces - .get_mut(&( - t.target_tx.signer_id().clone(), - t.target_tx.public_key().clone(), - )) - .unwrap() - .txs_awaiting_nonce - .remove(&tx_ref); - } - }; - } - TargetChainTx::Ready(_) => {} - }; - } + let key_updates = Self::queue_txs(lock, &block)?; + for (tx_ref, nonce_updates) in key_updates { + Self::insert_access_key_updates( + lock, + target_view_client, + db, + &tx_ref, + nonce_updates, + block.source_height, + ) + .await?; } + tx_block_queue.lock().unwrap().push_back(block); Ok(()) } - pub(crate) async fn next_batch( - &mut self, - target_view_client: &Addr, - db: &DB, - ) -> anyhow::Result { - // sleep until 20 milliseconds before we want to send transactions before we check for nonces - // in the target chain. In the second or so between now and then, we might process another block - // that will set the nonces. - tokio::time::sleep_until( - self.send_time.as_ref().deadline() - std::time::Duration::from_millis(20), - ) - .await; - self.try_set_batch_nonces(target_view_client, db).await?; - (&mut self.send_time).await; - let block = self.queued_blocks.pop_front().unwrap(); - let b = TxBatch { - source_height: block.source_height, - source_hash: block.source_hash, - txs: block - .chunks - .into_iter() - .flat_map(|c| { - c.txs.into_iter().enumerate().map(move |(tx_idx, tx)| { - ( - TxRef { - source_height: block.source_height, - shard_id: c.shard_id, - tx_idx, - }, - tx, - ) - }) - }) - .collect(), - }; - Ok(b) - } - fn remove_tx(&mut self, tx: &IndexerTransactionWithOutcome) { let k = (tx.transaction.signer_id.clone(), tx.transaction.public_key.clone()); match self.txs_by_signer.entry(k.clone()) { @@ -693,6 +604,7 @@ impl TxTracker { fn tx_to_receipt( &mut self, + tx_block_queue: &Mutex>, db: &DB, tx_hash: &CryptoHash, receipt_id: &CryptoHash, @@ -716,16 +628,19 @@ impl TxTracker { if let Some(info) = self.nonces.get(access_key) { let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); - for r in txs_awaiting_nonce.iter() { - let tx = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let tx = Self::get_tx(&mut tx_block_queue, r); - match tx { - TargetChainTx::AwaitingNonce(t) => { - assert!(t.target_nonce.pending_outcomes.remove(&updater)); - t.target_nonce.pending_outcomes.insert(new_updater.clone()); - } - TargetChainTx::Ready(_) => unreachable!(), - }; + match tx { + TargetChainTx::AwaitingNonce(t) => { + assert!(t.target_nonce.pending_outcomes.remove(&updater)); + t.target_nonce.pending_outcomes.insert(new_updater.clone()); + } + TargetChainTx::Ready(_) => unreachable!(), + }; + } } let info = self.nonces.get_mut(access_key).unwrap(); @@ -740,63 +655,61 @@ impl TxTracker { Ok(()) } - async fn try_set_nonces( + pub(crate) fn try_set_nonces( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, - access_key: &(AccountId, PublicKey), - id: Option<&CryptoHash>, + updated_key: UpdatedKey, + mut nonce: Option, ) -> anyhow::Result<()> { - let mut n = crate::read_target_nonce(db, &access_key.0, &access_key.1)?.unwrap(); - if let Some(id) = id { - n.pending_outcomes.remove(id); - } - let mut nonce = - crate::fetch_access_key_nonce(target_view_client, &access_key.0, &access_key.1).await?; + let mut n = crate::read_target_nonce(db, &updated_key.account_id, &updated_key.public_key)? + .unwrap(); + n.pending_outcomes.remove(&updated_key.id); n.nonce = std::cmp::max(n.nonce, nonce); - crate::put_target_nonce(db, &access_key.0, &access_key.1, &n)?; + crate::put_target_nonce(db, &updated_key.account_id, &updated_key.public_key, &n)?; - let updater = id.map(|id| NonceUpdater::ChainObjectId(*id)); - if let Some(info) = self.nonces.get_mut(access_key) { - if let Some(updater) = &updater { - info.target_nonce.pending_outcomes.remove(updater); - } + let updater = NonceUpdater::ChainObjectId(updated_key.id); + let access_key = (updated_key.account_id.clone(), updated_key.public_key.clone()); + + if let Some(info) = self.nonces.get_mut(&access_key) { + info.target_nonce.pending_outcomes.remove(&updater); let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); let mut to_remove = Vec::new(); - for r in txs_awaiting_nonce.iter() { - let tx = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let tx = Self::get_tx(&mut tx_block_queue, r); - match tx { - TargetChainTx::AwaitingNonce(t) => { - if let Some(updater) = &updater { - t.target_nonce.pending_outcomes.remove(updater); - } - if let Some(nonce) = &mut nonce { - *nonce += 1; - } + match tx { + TargetChainTx::AwaitingNonce(t) => { + t.target_nonce.pending_outcomes.remove(&updater); + if let Some(nonce) = &mut nonce { + *nonce += 1; + } - if t.target_nonce.pending_outcomes.is_empty() { - to_remove.push(r.clone()); - tx.try_set_nonce(nonce); - match tx { - TargetChainTx::Ready(t) => { - tracing::debug!(target: "mirror", "set nonce for {:?}'s {} to {}", access_key, r, t.target_tx.transaction.nonce()); - } - _ => { - tracing::warn!(target: "mirror", "Couldn't set nonce for {:?}'s {}", access_key, r); + if t.target_nonce.pending_outcomes.is_empty() { + to_remove.push(r.clone()); + tx.try_set_nonce(nonce); + match tx { + TargetChainTx::Ready(t) => { + tracing::debug!(target: "mirror", "set nonce for {:?}'s {} to {}", &access_key, r, t.target_tx.transaction.nonce()); + } + _ => { + tracing::warn!(target: "mirror", "Couldn't set nonce for {:?}'s {}", &access_key, r); + } } + } else { + t.target_nonce.nonce = std::cmp::max(t.target_nonce.nonce, nonce); } - } else { - t.target_nonce.nonce = std::cmp::max(t.target_nonce.nonce, nonce); } - } - TargetChainTx::Ready(_) => unreachable!(), - }; + TargetChainTx::Ready(_) => unreachable!(), + }; + } } - let info = self.nonces.get_mut(access_key).unwrap(); + let info = self.nonces.get_mut(&access_key).unwrap(); for r in to_remove.iter() { info.txs_awaiting_nonce.remove(r); } @@ -805,27 +718,23 @@ impl TxTracker { Ok(()) } - async fn on_outcome_finished( + fn on_outcome_finished( &mut self, - target_view_client: &Addr, db: &DB, id: &CryptoHash, - access_keys: HashSet<(AccountId, PublicKey)>, + access_keys: &HashSet<(AccountId, PublicKey)>, ) -> anyhow::Result<()> { let updater = NonceUpdater::ChainObjectId(*id); if let Some(keys) = self.updater_to_keys.remove(&updater) { - assert!(access_keys == keys); + assert!(access_keys == &keys); } - for access_key in access_keys.iter() { - self.try_set_nonces(target_view_client, db, &access_key, Some(id)).await?; - } crate::delete_pending_outcome(db, id) } - async fn on_target_block_tx( + fn on_target_block_tx( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, tx: IndexerTransactionWithOutcome, ) -> anyhow::Result<()> { @@ -839,44 +748,42 @@ impl TxTracker { if let Some(access_keys) = crate::read_pending_outcome(db, &tx.transaction.hash)? { match tx.outcome.execution_outcome.outcome.status { ExecutionStatusView::SuccessReceiptId(receipt_id) => { - self.tx_to_receipt(db, &tx.transaction.hash, &receipt_id, access_keys)? + self.tx_to_receipt( + tx_block_queue, + db, + &tx.transaction.hash, + &receipt_id, + access_keys, + )?; } ExecutionStatusView::SuccessValue(_) | ExecutionStatusView::Unknown => { unreachable!() } ExecutionStatusView::Failure(_) => { - self.on_outcome_finished( - target_view_client, - db, - &tx.transaction.hash, - access_keys, - ) - .await? + self.on_outcome_finished(db, &tx.transaction.hash, &access_keys)?; } - } + }; } Ok(()) } - async fn on_target_block_applied_receipt( + fn on_target_block_applied_receipt( &mut self, - target_view_client: &Addr, db: &DB, outcome: IndexerExecutionOutcomeWithReceipt, staked_accounts: &mut HashMap<(AccountId, PublicKey), AccountId>, + access_key_updates: &mut Vec, ) -> anyhow::Result<()> { let access_keys = match crate::read_pending_outcome(db, &outcome.execution_outcome.id)? { Some(a) => a, None => return Ok(()), }; - self.on_outcome_finished( - target_view_client, - db, - &outcome.execution_outcome.id, - access_keys, - ) - .await?; + self.on_outcome_finished(db, &outcome.execution_outcome.id, &access_keys)?; + access_key_updates.extend(access_keys.into_iter().map(|(account_id, public_key)| { + UpdatedKey { account_id, public_key, id: outcome.execution_outcome.id } + })); + for receipt_id in outcome.execution_outcome.outcome.receipt_ids { // we don't carry over the access keys here, because we set pending access keys when we send a tx with // an add key action, which should be applied after one receipt. Setting empty access keys here allows us @@ -911,37 +818,38 @@ impl TxTracker { // receipt for any receipts that contain stake actions (w/ nonzero stake) that were // generated by our transactions. Then the caller will send extra stake transactions // to reverse those. - pub(crate) async fn on_target_block( + pub(crate) fn on_target_block( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, msg: StreamerMessage, - ) -> anyhow::Result> { + ) -> anyhow::Result { self.record_block_timestamp(&msg); self.log_target_block(&msg); + let mut access_key_updates = Vec::new(); let mut staked_accounts = HashMap::new(); for s in msg.shards { if let Some(c) = s.chunk { for tx in c.transactions { - self.on_target_block_tx(target_view_client, db, tx).await?; + self.on_target_block_tx(tx_block_queue, db, tx)?; } for outcome in s.receipt_execution_outcomes { self.on_target_block_applied_receipt( - target_view_client, db, outcome, &mut staked_accounts, - ) - .await?; + &mut access_key_updates, + )?; } } } - Ok(staked_accounts) + Ok(TargetBlockInfo { staked_accounts, access_key_updates }) } - async fn on_tx_sent( + fn on_tx_sent( &mut self, + tx_block_queue: &Mutex>, db: &DB, tx_ref: Option, tx: MappedTx, @@ -1007,16 +915,19 @@ impl TxTracker { access_keys_to_remove.insert(access_key.clone()); } - for r in txs_awaiting_nonce.iter() { - let t = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let t = Self::get_tx(&mut tx_block_queue, r); - match t { - TargetChainTx::AwaitingNonce(t) => { - assert!(t.target_nonce.pending_outcomes.remove(&updater)); - t.target_nonce.pending_outcomes.insert(new_updater.clone()); - } - TargetChainTx::Ready(_) => unreachable!(), - }; + match t { + TargetChainTx::AwaitingNonce(t) => { + assert!(t.target_nonce.pending_outcomes.remove(&updater)); + t.target_nonce.pending_outcomes.insert(new_updater.clone()); + } + TargetChainTx::Ready(_) => unreachable!(), + }; + } } } } @@ -1107,6 +1018,7 @@ impl TxTracker { fn on_tx_skipped( &mut self, + tx_block_queue: &Mutex>, tx_ref: &Option, tx: &Transaction, nonce_updates: &HashSet<(AccountId, PublicKey)>, @@ -1124,25 +1036,28 @@ impl TxTracker { if let Some(info) = self.nonces.get(&access_key) { let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); let mut to_remove = Vec::new(); - for r in txs_awaiting_nonce.iter() { - let target_tx = self.get_tx(r); - match target_tx { - TargetChainTx::AwaitingNonce(tx) => { - assert!(tx.target_nonce.pending_outcomes.remove(&updater)); - if tx.target_nonce.pending_outcomes.is_empty() { - target_tx.try_set_nonce(None); - match target_tx { - TargetChainTx::Ready(t) => { - tracing::debug!(target: "mirror", "After skipping {} setting nonce for {:?}'s {} to {}", tx_ref, &access_key, r, t.target_tx.transaction.nonce()); - } - _ => { - tracing::warn!(target: "mirror", "After skipping {} could not set nonce for {:?}'s {}", tx_ref, &access_key, r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let target_tx = Self::get_tx(&mut tx_block_queue, r); + match target_tx { + TargetChainTx::AwaitingNonce(tx) => { + assert!(tx.target_nonce.pending_outcomes.remove(&updater)); + if tx.target_nonce.pending_outcomes.is_empty() { + target_tx.try_set_nonce(None); + match target_tx { + TargetChainTx::Ready(t) => { + tracing::debug!(target: "mirror", "After skipping {} setting nonce for {:?}'s {} to {}", tx_ref, &access_key, r, t.target_tx.transaction.nonce()); + } + _ => { + tracing::warn!(target: "mirror", "After skipping {} could not set nonce for {:?}'s {}", tx_ref, &access_key, r); + } } + to_remove.push(r.clone()); } - to_remove.push(r.clone()); } + TargetChainTx::Ready(_) => unreachable!(), } - TargetChainTx::Ready(_) => unreachable!(), } } @@ -1167,24 +1082,36 @@ impl TxTracker { } // We just successfully sent some transactions. Remember them so we can see if they really show up on chain. - pub(crate) async fn on_txs_sent( + // Returns the new amount that we should wait before sending transactions + pub(crate) fn on_txs_sent( &mut self, + tx_block_queue: &Mutex>, db: &DB, sent_batch: SentBatch, target_height: BlockHeight, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let mut total_sent = 0; let now = Instant::now(); let mut access_keys_to_remove = HashSet::new(); let (txs_sent, provenance) = match sent_batch { SentBatch::MappedBlock(b) => { - let block_delay = self.tx_batch_interval.unwrap_or_else(|| { - self.second_longest_recent_block_delay() - .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)) - }); - self.send_time.as_mut().reset(tokio::time::Instant::now() + block_delay); - crate::set_last_source_height(db, b.source_height)?; + self.height_popped = Some(b.source_height); + for (tx_ref, tx) in b.txs.iter() { + match tx { + TargetChainTx::AwaitingNonce(t) => { + self.nonces + .get_mut(&( + t.target_tx.signer_id().clone(), + t.target_tx.public_key().clone(), + )) + .unwrap() + .txs_awaiting_nonce + .remove(&tx_ref); + } + TargetChainTx::Ready(_) => {} + }; + } let txs = b.txs.into_iter().map(|(tx_ref, tx)| (Some(tx_ref), tx)).collect::>(); (txs, format!("source #{}", b.source_height)) @@ -1199,17 +1126,18 @@ impl TxTracker { crate::TargetChainTx::Ready(t) => { if t.sent_successfully { self.on_tx_sent( + tx_block_queue, db, tx_ref, t, target_height, now, &mut access_keys_to_remove, - ) - .await?; + )?; total_sent += 1; } else { self.on_tx_skipped( + tx_block_queue, &tx_ref, &t.target_tx.transaction, &t.nonce_updates, @@ -1219,6 +1147,7 @@ impl TxTracker { } crate::TargetChainTx::AwaitingNonce(t) => { self.on_tx_skipped( + tx_block_queue, &tx_ref, &t.target_tx, &t.nonce_updates, @@ -1236,6 +1165,10 @@ impl TxTracker { total_sent, provenance, target_height ); - Ok(()) + let next_delay = self.tx_batch_interval.unwrap_or_else(|| { + self.second_longest_recent_block_delay() + .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)) + }); + Ok(next_delay) } } diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 2632fe455ab..a131c66f5bb 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -29,10 +29,11 @@ use near_primitives_core::account::id::AccountType; use near_primitives_core::account::{AccessKey, AccessKeyPermission}; use near_primitives_core::types::{Nonce, ShardId}; use rocksdb::DB; -use std::collections::{HashMap, HashSet}; -use std::path::Path; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; use strum::IntoEnumIterator; use tokio::sync::mpsc; @@ -408,17 +409,14 @@ struct MirrorConfig { const CREATE_ACCOUNT_DELTA: usize = 5; +// TODO: separate out the code that uses the target chain clients, and +// make it an option to send the transactions to some RPC node. +// that way it would be possible to run this code and send transactions with an +// old binary not caught up to the current protocol version, since the +// transactions we're constructing should stay valid. struct TxMirror { - target_stream: mpsc::Receiver, source_chain_access: T, - // TODO: separate out the code that uses the target chain clients, and - // make it an option to send the transactions to some RPC node. - // that way it would be possible to run this code and send transactions with an - // old binary not caught up to the current protocol version, since the - // transactions we're constructing should stay valid. - target_view_client: Addr, - target_client: Addr, - db: DB, + db: Arc, target_genesis_height: BlockHeight, target_min_block_production_delay: Duration, secret: Option<[u8; crate::secret::SECRET_LEN]>, @@ -502,7 +500,7 @@ impl std::fmt::Display for MappedTxProvenance { // what nonce to use because the public key was added in an AddKey // action that we haven't seen on chain yet. The target_tx field is complete // except for the nonce field. -#[derive(Debug)] +#[derive(Clone, Debug)] struct TxAwaitingNonce { source_signer_id: AccountId, source_receiver_id: AccountId, @@ -522,7 +520,7 @@ impl TxAwaitingNonce { target_secret_key: SecretKey, target_public_key: PublicKey, actions: Vec, - target_nonce: &TargetNonce, + target_nonce: TargetNonce, ref_hash: &CryptoHash, provenance: MappedTxProvenance, nonce_updates: HashSet<(AccountId, PublicKey)>, @@ -542,7 +540,7 @@ impl TxAwaitingNonce { target_secret_key, target_tx, nonce_updates, - target_nonce: target_nonce.clone(), + target_nonce, } } } @@ -550,7 +548,7 @@ impl TxAwaitingNonce { // A transaction meant for the target chain that is complete/ready to send. // We keep some extra info about the transaction for the purposes of logging // later on when we find it on chain. -#[derive(Debug)] +#[derive(Clone, Debug)] struct MappedTx { source_signer_id: AccountId, source_receiver_id: AccountId, @@ -604,7 +602,7 @@ impl MappedTx { } } -#[derive(Debug)] +#[derive(Clone, Debug)] enum TargetChainTx { Ready(MappedTx), AwaitingNonce(TxAwaitingNonce), @@ -681,7 +679,7 @@ impl TargetChainTx { target_secret_key: &SecretKey, target_public_key: PublicKey, actions: Vec, - target_nonce: &TargetNonce, + target_nonce: TargetNonce, ref_hash: &CryptoHash, provenance: MappedTxProvenance, nonce_updates: HashSet<(AccountId, PublicKey)>, @@ -744,6 +742,31 @@ struct TxBatch { txs: Vec<(TxRef, TargetChainTx)>, } +impl From<&MappedBlock> for TxBatch { + fn from(block: &MappedBlock) -> Self { + Self { + source_height: block.source_height, + source_hash: block.source_hash, + txs: block + .chunks + .iter() + .flat_map(|c| { + c.txs.iter().enumerate().map(move |(tx_idx, tx)| { + ( + TxRef { + source_height: block.source_height, + shard_id: c.shard_id, + tx_idx, + }, + tx.clone(), + ) + }) + }) + .collect(), + } + } +} + async fn account_exists( view_client: &Addr, account_id: &AccountId, @@ -804,31 +827,29 @@ async fn fetch_access_key_nonce( } impl TxMirror { - fn new>( + fn new( source_chain_access: T, - target_home: P, - mirror_db_path: Option

, + target_home: &Path, + mirror_db_path: Option<&Path>, secret: Option<[u8; crate::secret::SECRET_LEN]>, config: MirrorConfig, ) -> anyhow::Result { let target_config = - nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast) - .with_context(|| { - format!("Error loading target config from {:?}", target_home.as_ref()) - })?; + nearcore::config::load_config(target_home, GenesisValidationMode::UnsafeFast) + .with_context(|| format!("Error loading target config from {:?}", target_home))?; if !target_config.client_config.archive { // this is probably not going to come up, but we want to avoid a situation where // we go offline for a long time and then come back online, and we state sync to // the head of the target chain without looking for our outcomes that made it on // chain right before we went offline - anyhow::bail!("config file in {} has archive: false, but archive must be set to true for the target chain", target_home.as_ref().display()); + anyhow::bail!("config file in {} has archive: false, but archive must be set to true for the target chain", target_home.display()); } let db = match mirror_db_path { Some(mirror_db_path) => open_db(mirror_db_path), None => { // keep backward compatibility let mirror_db_path = near_store::NodeStorage::opener( - target_home.as_ref(), + target_home, target_config.config.archive, &target_config.config.store, None, @@ -839,22 +860,11 @@ impl TxMirror { } }; let db = db.context("failed to open mirror DB")?; - let target_indexer = Indexer::new(near_indexer::IndexerConfig { - home_dir: target_home.as_ref().to_path_buf(), - sync_mode: near_indexer::SyncModeEnum::FromInterruption, - await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, - validate_genesis: false, - }) - .context("failed to start target chain indexer")?; - let (target_view_client, target_client) = target_indexer.client_actors(); - let target_stream = target_indexer.streamer(); + let db = Arc::new(db); let default_extra_key = crate::key_mapping::default_extra_key(secret.as_ref()); Ok(Self { source_chain_access, - target_client, - target_view_client, - target_stream, db, target_genesis_height: target_config.genesis.config.genesis_height, target_min_block_production_delay: target_config @@ -868,14 +878,13 @@ impl TxMirror { } async fn send_transactions<'a, I: Iterator>( - &mut self, + target_client: &Addr, txs: I, ) -> anyhow::Result<()> { for tx in txs { match tx { TargetChainTx::Ready(tx) => { - match self - .target_client + match target_client .send( ProcessTxRequest { transaction: tx.target_tx.clone(), @@ -925,6 +934,7 @@ impl TxMirror { async fn map_actions( &self, + target_view_client: &Addr, tx: &SignedTransaction, ) -> anyhow::Result<(Vec, HashSet<(AccountId, PublicKey)>)> { let mut actions = Vec::new(); @@ -969,12 +979,9 @@ impl TxMirror { &tx.transaction.receiver_id(), self.secret.as_ref(), ); - if !account_exists(&self.target_view_client, &target_account) - .await - .with_context(|| { - format!("failed checking existence for account {}", &target_account) - })? - { + if !account_exists(target_view_client, &target_account).await.with_context( + || format!("failed checking existence for account {}", &target_account), + )? { if target_account.get_account_type() == AccountType::NearImplicitAccount { let public_key = @@ -1015,7 +1022,9 @@ impl TxMirror { async fn prepare_tx( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, source_signer_id: AccountId, source_receiver_id: AccountId, target_signer_id: AccountId, @@ -1028,33 +1037,35 @@ impl TxMirror { nonce_updates: HashSet<(AccountId, PublicKey)>, ) -> anyhow::Result { let target_public_key = target_secret_key.public_key(); + // TODO: clean up this function. The logic is hard to follow let target_nonce = match source_height.as_ref() { Some(_) => None, None => Some( - tracker - .insert_nonce( - &self.target_view_client, - &self.db, - &target_signer_id, - &target_public_key, - target_secret_key, - ) - .await?, + crate::chain_tracker::TxTracker::insert_nonce( + tracker, + tx_block_queue, + target_view_client, + &self.db, + &target_signer_id, + &target_public_key, + target_secret_key, + ) + .await?, ), }; let target_nonce = match source_height { Some(source_height) => { - tracker - .next_nonce( - &self.target_view_client, - &self.db, - &target_signer_id, - &target_public_key, - source_height, - ) - .await? + crate::chain_tracker::TxTracker::next_nonce( + tracker, + target_view_client, + &self.db, + &target_signer_id, + &target_public_key, + source_height, + ) + .await? } - None => target_nonce.as_ref().unwrap(), + None => target_nonce.unwrap(), }; if target_nonce.pending_outcomes.is_empty() && target_nonce.nonce.is_some() { @@ -1092,7 +1103,9 @@ impl TxMirror { // then the only keys we will have mapped are the ones added by regular AddKey transactions. async fn push_extra_tx( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, block_hash: CryptoHash, txs: &mut Vec, predecessor_id: AccountId, @@ -1116,7 +1129,7 @@ impl TxMirror { for k in keys.iter() { let target_secret_key = crate::key_mapping::map_key(k, self.secret.as_ref()); if fetch_access_key_nonce( - &self.target_view_client, + target_view_client, &target_signer_id, &target_secret_key.public_key(), ) @@ -1213,6 +1226,8 @@ impl TxMirror { let target_tx = self .prepare_tx( tracker, + tx_block_queue, + target_view_client, predecessor_id, receiver_id, target_signer_id, @@ -1231,7 +1246,9 @@ impl TxMirror { async fn add_function_call_keys( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, receipt_id: &CryptoHash, receiver_id: &AccountId, @@ -1314,6 +1331,8 @@ impl TxMirror { self.push_extra_tx( tracker, + tx_block_queue, + target_view_client, outcome.block_hash, txs, receipt.predecessor_id().clone(), @@ -1336,7 +1355,9 @@ impl TxMirror { provenance: MappedTxProvenance, source_height: BlockHeight, ref_hash: &CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { // if signer and receiver are the same then the resulting local receipt @@ -1354,6 +1375,8 @@ impl TxMirror { { self.add_function_call_keys( tracker, + tx_block_queue, + target_view_client, txs, &receipt_id, &tx.transaction.receiver_id(), @@ -1376,13 +1399,17 @@ impl TxMirror { provenance: MappedTxProvenance, source_height: BlockHeight, ref_hash: &CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { if let ReceiptEnum::Action(r) | ReceiptEnum::PromiseYield(r) = receipt.receipt() { if r.actions.iter().any(|a| matches!(a, Action::FunctionCall(_))) { self.add_function_call_keys( tracker, + tx_block_queue, + target_view_client, txs, receipt.receipt_id(), receipt.receiver_id(), @@ -1403,7 +1430,9 @@ impl TxMirror { &self, create_account_height: BlockHeight, ref_hash: CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { let source_block = @@ -1418,6 +1447,8 @@ impl TxMirror { create_account_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, txs, ) .await?; @@ -1435,6 +1466,8 @@ impl TxMirror { create_account_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, txs, ) .await?; @@ -1451,7 +1484,9 @@ impl TxMirror { source_height: BlockHeight, create_account_height: Option, ref_hash: CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, ) -> anyhow::Result { let source_block = self.source_chain_access.get_txs(source_height).await.with_context(|| { @@ -1463,7 +1498,8 @@ impl TxMirror { let mut txs = Vec::new(); for (idx, source_tx) in ch.transactions.into_iter().enumerate() { - let (actions, nonce_updates) = self.map_actions(&source_tx).await?; + let (actions, nonce_updates) = + self.map_actions(target_view_client, &source_tx).await?; if actions.is_empty() { // If this is a tx containing only stake actions, skip it. continue; @@ -1485,6 +1521,8 @@ impl TxMirror { let target_tx = self .prepare_tx( tracker, + tx_block_queue, + target_view_client, source_tx.transaction.signer_id().clone(), source_tx.transaction.receiver_id().clone(), target_signer_id, @@ -1504,6 +1542,8 @@ impl TxMirror { source_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, &mut txs, ) .await?; @@ -1515,6 +1555,8 @@ impl TxMirror { source_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, &mut txs, ) .await?; @@ -1533,6 +1575,8 @@ impl TxMirror { create_account_height, ref_hash, tracker, + tx_block_queue, + target_view_client, &mut chunks[0].txs, ) .await?; @@ -1546,21 +1590,28 @@ impl TxMirror { // Up to a certain capacity, prepare and queue up batches of // transactions that we want to send to the target chain. + // Returns the number of blocks worth of txs queued at the end. + // `have_stop_height` refers to whether we're going to stop sending transactions and exit after a particular height async fn queue_txs( &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, ref_hash: CryptoHash, - check_send_time: bool, + have_stop_height: bool, ) -> anyhow::Result<()> { - if tracker.num_blocks_queued() > 100 { + let mut num_blocks_queued = { + let tx_block_queue = tx_block_queue.lock().unwrap(); + tx_block_queue.len() + }; + if num_blocks_queued > 100 { return Ok(()); } - let next_batch_time = tracker.next_batch_time(); - loop { let (next_height, create_account_height) = - tracker.next_heights(&self.source_chain_access).await?; + crate::chain_tracker::TxTracker::next_heights(&tracker, &self.source_chain_access) + .await?; let next_height = match next_height { Some(h) => h, @@ -1568,26 +1619,34 @@ impl TxMirror { }; // if we have a stop height, just send the last few blocks without worrying about // extra create account txs, otherwise wait until we get more blocks - if !tracker.has_stop_height() && create_account_height.is_none() { + if !have_stop_height && create_account_height.is_none() { return Ok(()); } let b = self - .fetch_txs(next_height, create_account_height, ref_hash, tracker) + .fetch_txs( + next_height, + create_account_height, + ref_hash, + tracker, + tx_block_queue, + target_view_client, + ) .await .with_context(|| format!("Can't fetch source #{} transactions", next_height))?; - tracker.queue_block(b, &self.target_view_client, &self.db).await?; - if tracker.num_blocks_queued() > 100 { - break; - } + crate::chain_tracker::TxTracker::queue_block( + tracker, + &tx_block_queue, + b, + target_view_client, + &self.db, + ) + .await?; - if check_send_time - && tracker.num_blocks_queued() > 0 - && Instant::now() + Duration::from_millis(20) > next_batch_time - { - break; + num_blocks_queued += 1; + if num_blocks_queued > 100 { + return Ok(()); } } - Ok(()) } // send stake txs for zero stake for each of the stake actions we just saw in @@ -1597,7 +1656,10 @@ impl TxMirror { // retry later if the tx got lost for some reason async fn unstake( &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_client: &Addr, + target_view_client: &Addr, stakes: HashMap<(AccountId, PublicKey), AccountId>, source_hash: &CryptoHash, target_hash: &CryptoHash, @@ -1607,6 +1669,8 @@ impl TxMirror { for ((receiver_id, public_key), predecessor_id) in stakes { self.push_extra_tx( tracker, + tx_block_queue, + target_view_client, *source_hash, &mut txs, predecessor_id, @@ -1619,61 +1683,206 @@ impl TxMirror { .await?; } if !txs.is_empty() { - self.send_transactions(txs.iter_mut()).await?; - tracker - .on_txs_sent( - &self.db, - crate::chain_tracker::SentBatch::ExtraTxs(txs), - target_height, + Self::send_transactions(target_client, txs.iter_mut()).await?; + let mut tracker = tracker.lock().unwrap(); + tracker.on_txs_sent( + tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::ExtraTxs(txs), + target_height, + )?; + } + Ok(()) + } + + async fn send_txs_loop( + db: Arc, + blocks_sent: mpsc::Sender, + tx_block_queue: Arc>>, + mut send_time: Pin>, + send_delay: Arc>, + target_client: Addr, + ) -> anyhow::Result<()> { + let mut sent_source_height = None; + + loop { + (&mut send_time).await; + + let tx_batch = { + let tx_block_queue = tx_block_queue.lock().unwrap(); + let b = match sent_source_height { + Some(sent_source_height) => { + let mut block_idx = None; + for (idx, b) in tx_block_queue.iter().enumerate() { + if b.source_height > sent_source_height { + block_idx = Some(idx); + break; + } + } + match block_idx { + Some(idx) => tx_block_queue.get(idx), + None => None, + } + } + None => tx_block_queue.get(0), + }; + b.map(|b| TxBatch::from(b)) + }; + + let mut tx_batch = match tx_batch { + Some(b) => b, + None => { + tokio::time::sleep(Duration::from_millis(200)).await; + continue; + } + }; + + let start_time = tokio::time::Instant::now(); + + tracing::debug!(target: "mirror", "Sending transactions for source block #{}", tx_batch.source_height); + Self::send_transactions( + &target_client, + tx_batch.txs.iter_mut().map(|(_tx_ref, tx)| tx), + ) + .await?; + set_last_source_height(&db, tx_batch.source_height)?; + sent_source_height = Some(tx_batch.source_height); + + blocks_sent.send(tx_batch).await.unwrap(); + + let send_delay = *send_delay.lock().unwrap(); + tracing::debug!(target: "mirror", "Sleeping for {:?} until sending more transactions", &send_delay); + let next_send_time = start_time + send_delay; + send_time.as_mut().reset(next_send_time); + } + } + + async fn index_target_loop( + tracker: Arc>, + tx_block_queue: Arc>>, + home_dir: PathBuf, + db: Arc, + clients_tx: tokio::sync::oneshot::Sender<(Addr, Addr)>, + accounts_to_unstake: mpsc::Sender>, + target_height: Arc>, + target_head: Arc>, + ) -> anyhow::Result<()> { + let target_indexer = Indexer::new(near_indexer::IndexerConfig { + home_dir, + sync_mode: near_indexer::SyncModeEnum::FromInterruption, + await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, + validate_genesis: false, + }) + .context("failed to start target chain indexer")?; + let (target_view_client, target_client) = target_indexer.client_actors(); + let mut target_stream = target_indexer.streamer(); + let (first_target_height, first_target_head) = Self::index_target_chain( + &tracker, + &tx_block_queue, + &mut target_stream, + db.as_ref(), + &target_view_client, + &target_client, + ) + .await?; + *target_height.write().unwrap() = first_target_height; + *target_head.write().unwrap() = first_target_head; + clients_tx.send((target_client.clone(), target_view_client.clone())).unwrap(); + + loop { + let msg = target_stream.recv().await.unwrap(); + *target_head.write().unwrap() = msg.block.header.hash; + *target_height.write().unwrap() = msg.block.header.height; + let target_block_info = { + let mut tracker = tracker.lock().unwrap(); + tracker.on_target_block(&tx_block_queue, db.as_ref(), msg)? + }; + if !target_block_info.staked_accounts.is_empty() { + accounts_to_unstake.send(target_block_info.staked_accounts).await.unwrap(); + } + for access_key_update in target_block_info.access_key_updates { + let nonce = crate::fetch_access_key_nonce( + &target_view_client, + &access_key_update.account_id, + &access_key_update.public_key, ) .await?; + let mut tracker = tracker.lock().unwrap(); + tracker.try_set_nonces(&tx_block_queue, db.as_ref(), access_key_update, nonce)?; + } } - Ok(()) } - async fn main_loop( + async fn queue_txs_loop( &mut self, - mut tracker: crate::chain_tracker::TxTracker, - mut target_height: BlockHeight, - mut target_head: CryptoHash, + tracker: Arc>, + tx_block_queue: Arc>>, + target_client: Addr, + target_view_client: Addr, + mut blocks_sent: mpsc::Receiver, + mut accounts_to_unstake: mpsc::Receiver>, + send_delay: Arc>, + target_height: Arc>, + target_head: Arc>, mut source_hash: CryptoHash, + have_stop_height: bool, ) -> anyhow::Result<()> { + let mut queue_txs_time = tokio::time::interval(Duration::from_millis(100)); + loop { tokio::select! { // time to send a batch of transactions - tx_batch = tracker.next_batch(&self.target_view_client, &self.db), if tracker.num_blocks_queued() > 0 => { - let mut tx_batch = tx_batch?; - source_hash = tx_batch.source_hash; - self.send_transactions(tx_batch.txs.iter_mut().map(|(_tx_ref, tx)| tx)).await?; - tracker.on_txs_sent(&self.db, crate::chain_tracker::SentBatch::MappedBlock(tx_batch), target_height).await?; - - // now we have one second left until we need to send more transactions. In the - // meantime, we might as well prepare some more batches of transactions. - // TODO: continue in best effort fashion on error - self.queue_txs(&mut tracker, target_head, true).await?; + _ = queue_txs_time.tick() => { + let target_head = *target_head.read().unwrap(); + self.queue_txs(&tracker, &tx_block_queue, &target_view_client, target_head, have_stop_height).await?; } - msg = self.target_stream.recv() => { - let msg = msg.unwrap(); - target_head = msg.block.header.hash; - target_height = msg.block.header.height; - let staked_accounts = tracker.on_target_block(&self.target_view_client, &self.db, msg).await?; - self.unstake(&mut tracker, staked_accounts, &source_hash, &target_head, target_height).await?; + tx_batch = blocks_sent.recv() => { + let tx_batch = tx_batch.unwrap(); + source_hash = tx_batch.source_hash; + // lock the tracker before removing the block from the queue so that + // we don't call on_target_block() in the other thread between removing the block + // and calling on_txs_sent(), because that could lead to a bug looking up transactions + // in TxTracker::get_tx() + let mut tracker = tracker.lock().unwrap(); + { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + let b = tx_block_queue.pop_front().unwrap(); + assert!(b.source_height == tx_batch.source_height); + }; + let target_height = *target_height.read().unwrap(); + let new_delay = tracker.on_txs_sent( + &tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::MappedBlock(tx_batch), + target_height, + )?; + *send_delay.lock().unwrap() = new_delay; } - // If we don't have any upcoming sets of transactions to send already built, we probably fell behind in the source - // chain and can't fetch the transactions. Check if we have them now here. - _ = tokio::time::sleep(std::time::Duration::from_millis(200)), if tracker.num_blocks_queued() == 0 => { - self.queue_txs(&mut tracker, target_head, true).await?; + msg = accounts_to_unstake.recv() => { + let staked_accounts = msg.unwrap(); + let target_head = *target_head.read().unwrap(); + let target_height = *target_height.read().unwrap(); + self.unstake( + &tracker, &tx_block_queue, &target_client, + &target_view_client, staked_accounts, &source_hash, + &target_head, target_height + ).await?; } }; - if tracker.finished() { - tracing::info!(target: "mirror", "finished sending all transactions"); - return Ok(()); + // TODO: this locking of the mutex before continuing the loop is kind of unnecessary since we should be able to tell + // exactly when we've done the thing that makes finished() return true, usually after a call to on_target_block() + { + let tracker = tracker.lock().unwrap(); + if tracker.finished() { + tracing::info!(target: "mirror", "finished sending all transactions"); + return Ok(()); + } } } } - async fn target_chain_syncing(&self) -> bool { - self.target_client + async fn target_chain_syncing(target_client: &Addr) -> bool { + target_client .send(Status { is_health_check: false, detailed: false }.with_span_context()) .await .unwrap() @@ -1681,9 +1890,10 @@ impl TxMirror { .unwrap_or(true) } - async fn target_chain_head(&self) -> anyhow::Result<(BlockHeight, CryptoHash)> { - let header = self - .target_view_client + async fn target_chain_head( + target_view_client: &Addr, + ) -> anyhow::Result<(BlockHeight, CryptoHash)> { + let header = target_view_client .send(GetBlock(BlockReference::Finality(Finality::Final)).with_span_context()) .await .unwrap() @@ -1694,16 +1904,25 @@ impl TxMirror { // call tracker.on_target_block() on each target chain block until that client is synced async fn index_target_chain( - &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_stream: &mut mpsc::Receiver, + db: &DB, + target_view_client: &Addr, + target_client: &Addr, ) -> anyhow::Result<(BlockHeight, CryptoHash)> { let mut head = None; loop { - let msg = self.target_stream.recv().await.unwrap(); + let msg = target_stream.recv().await.unwrap(); let height = msg.block.header.height; - tracker.on_target_block(&self.target_view_client, &self.db, msg).await?; + { + let mut tracker = tracker.lock().unwrap(); + // TODO: handle the return value. it is possible we want to unstake or update nonces + // after a restart. + tracker.on_target_block(&tx_block_queue, db, msg)?; + } match head { Some((head_height, head_hash)) => { @@ -1712,15 +1931,19 @@ impl TxMirror { } } None => { - if !self.target_chain_syncing().await { - head = Some(self.target_chain_head().await?); + if !Self::target_chain_syncing(target_client).await { + head = Some(Self::target_chain_head(target_view_client).await?); } } } } } - async fn run(mut self, stop_height: Option) -> anyhow::Result<()> { + async fn run( + mut self, + stop_height: Option, + target_home: PathBuf, + ) -> anyhow::Result<()> { let last_stored_height = get_last_source_height(&self.db)?; let last_height = last_stored_height.unwrap_or(self.target_genesis_height - 1); @@ -1747,13 +1970,56 @@ impl TxMirror { tracing::debug!(target: "mirror", "source chain initialized with first heights: {:?}", &next_heights); - let mut tracker = crate::chain_tracker::TxTracker::new( + let tracker = Arc::new(Mutex::new(crate::chain_tracker::TxTracker::new( self.target_min_block_production_delay, self.config.tx_batch_interval, next_heights.iter(), stop_height, - ); - let (target_height, target_head) = self.index_target_chain(&mut tracker).await?; + ))); + let target_height = Arc::new(RwLock::new(0)); + let target_head = Arc::new(RwLock::new(CryptoHash::default())); + let (clients_tx, clients_rx) = tokio::sync::oneshot::channel(); + let (target_indexer_done_tx, target_indexer_done_rx) = + tokio::sync::oneshot::channel::>(); + let (unstake_tx, unstake_rx) = mpsc::channel(10); + + let db = self.db.clone(); + let target_height2 = target_height.clone(); + let target_head2 = target_head.clone(); + let tracker2 = tracker.clone(); + let index_target_thread = actix::Arbiter::new(); + + let tx_block_queue = Arc::new(Mutex::new(VecDeque::new())); + + let tx_block_queue2 = tx_block_queue.clone(); + index_target_thread.spawn(async move { + let res = Self::index_target_loop( + tracker2, + tx_block_queue2, + target_home, + db, + clients_tx, + unstake_tx, + target_height2, + target_head2, + ) + .await; + target_indexer_done_tx.send(res).unwrap(); + }); + + // wait til we set the values in target_height and target_head after receiving a message from the indexer + let (target_client, target_view_client) = clients_rx.await.unwrap(); + + // Wait at least 15 seconds before sending any transactions because for + // a few seconds after the node starts, transaction routing requests + // will be silently dropped by the peer manager. + let mut send_time = Box::pin(tokio::time::sleep(std::time::Duration::from_secs(15))); + let mut send_delay = self + .config + .tx_batch_interval + .unwrap_or(self.target_min_block_production_delay + Duration::from_millis(100)); + + let initial_target_head = *target_head.read().unwrap(); if last_stored_height.is_none() { // send any extra function call-initiated create accounts for the first few blocks right now // we set source_hash to 0 because we don't actually care about it here, and it doesn't even exist since these are @@ -1763,35 +2029,101 @@ impl TxMirror { source_height: last_height, chunks: vec![MappedChunk { shard_id: 0, txs: Vec::new() }], }; + for h in next_heights { - self.add_create_account_txs(h, target_head, &mut tracker, &mut block.chunks[0].txs) - .await?; + self.add_create_account_txs( + h, + initial_target_head, + &tracker, + &tx_block_queue, + &target_view_client, + &mut block.chunks[0].txs, + ) + .await?; } if block.chunks.iter().any(|c| !c.txs.is_empty()) { tracing::debug!(target: "mirror", "sending extra create account transactions for the first {} blocks", CREATE_ACCOUNT_DELTA); - tracker.queue_block(block, &self.target_view_client, &self.db).await?; - let mut b = tracker.next_batch(&self.target_view_client, &self.db).await?; - self.send_transactions(b.txs.iter_mut().map(|(_tx_ref, tx)| tx)).await?; - tracker - .on_txs_sent( + let mut b = { + crate::chain_tracker::TxTracker::queue_block( + &tracker, + &tx_block_queue, + block, + &target_view_client, &self.db, - crate::chain_tracker::SentBatch::MappedBlock(b), - target_height, ) .await?; + (&mut send_time).await; + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + TxBatch::from(&tx_block_queue.pop_front().unwrap()) + }; + Self::send_transactions(&target_client, b.txs.iter_mut().map(|(_tx_ref, tx)| tx)) + .await?; + let mut tracker = tracker.lock().unwrap(); + send_delay = tracker.on_txs_sent( + &tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::MappedBlock(b), + *target_height.read().unwrap(), + )?; + } + } + self.queue_txs( + &tracker, + &tx_block_queue, + &target_view_client, + initial_target_head, + stop_height.is_some(), + ) + .await?; + + let send_delay = Arc::new(Mutex::new(send_delay)); + let send_delay2 = send_delay.clone(); + let (blocks_sent_tx, blocks_sent_rx) = mpsc::channel(10); + let tx_block_queue2 = tx_block_queue.clone(); + let target_client2 = target_client.clone(); + let db = self.db.clone(); + let send_txs_thread = actix::Arbiter::new(); + let (send_txs_done_tx, send_txs_done_rx) = + tokio::sync::oneshot::channel::>(); + send_txs_thread.spawn(async move { + let res = Self::send_txs_loop( + db, + blocks_sent_tx, + tx_block_queue2, + send_time, + send_delay2, + target_client2, + ) + .await; + send_txs_done_tx.send(res).unwrap(); + }); + tokio::select! { + res = self.queue_txs_loop( + tracker, tx_block_queue, target_client, target_view_client, + blocks_sent_rx, unstake_rx, send_delay, target_height, target_head, + source_hash, stop_height.is_some(), + ) => { + // TODO: cancel other threads + res + } + res = target_indexer_done_rx => { + let res = res.unwrap(); + tracing::error!("target indexer thread exited"); + res.context("target indexer thread failure") + } + res = send_txs_done_rx => { + let res = res.unwrap(); + tracing::error!("transaction sending thread exited"); + res.context("target indexer thread failure") } } - - self.queue_txs(&mut tracker, target_head, false).await?; - - self.main_loop(tracker, target_height, target_head, source_hash).await } } async fn run>( source_home: P, target_home: P, - mirror_db_path: Option

, + mirror_db_path: Option, secret: Option<[u8; crate::secret::SECRET_LEN]>, stop_height: Option, online_source: bool, @@ -1811,18 +2143,24 @@ async fn run>( let stop_height = stop_height.unwrap_or( source_chain_access.head_height().await.context("could not fetch source chain head")?, ); - TxMirror::new(source_chain_access, target_home, mirror_db_path, secret, config)? - .run(Some(stop_height)) - .await + TxMirror::new( + source_chain_access, + target_home.as_ref(), + mirror_db_path.as_deref(), + secret, + config, + )? + .run(Some(stop_height), target_home.as_ref().to_path_buf()) + .await } else { TxMirror::new( crate::online::ChainAccess::new(source_home)?, - target_home, - mirror_db_path, + target_home.as_ref(), + mirror_db_path.as_deref(), secret, config, )? - .run(stop_height) + .run(stop_height, target_home.as_ref().to_path_buf()) .await } }