diff --git a/Cargo.lock b/Cargo.lock index 4b769222f10..c6dcce2249b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4509,6 +4509,7 @@ name = "near-mirror" version = "0.0.0" dependencies = [ "actix", + "actix-rt", "anyhow", "async-trait", "borsh 1.2.0", diff --git a/pytest/tests/mocknet/helpers/neard_runner.py b/pytest/tests/mocknet/helpers/neard_runner.py index cced33fbbdb..a3003b8ecdf 100644 --- a/pytest/tests/mocknet/helpers/neard_runner.py +++ b/pytest/tests/mocknet/helpers/neard_runner.py @@ -911,7 +911,7 @@ def start_neard(self, batch_interval_millis=None): # Configure the logs config file to control the level of rust and opentelemetry logs. # Default config sets level to DEBUG for "client" and "chain" logs, WARN for tokio+actix, and INFO for everything else. def configure_log_config(self): - default_log_filter = 'client=debug,chain=debug,actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,info' + default_log_filter = 'client=debug,chain=debug,mirror=debug,actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,info' log_config_path = self.target_near_home_path('log_config.json') logging.info("Creating log_config.json with default log filter.") diff --git a/tools/mirror/Cargo.toml b/tools/mirror/Cargo.toml index 77813903458..9aea9e8ef2a 100644 --- a/tools/mirror/Cargo.toml +++ b/tools/mirror/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] actix.workspace = true +actix-rt.workspace = true anyhow.workspace = true async-trait.workspace = true borsh.workspace = true diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs index 084cd1c07fe..81dca68a34b 100644 --- a/tools/mirror/src/chain_tracker.rs +++ b/tools/mirror/src/chain_tracker.rs @@ -19,7 +19,7 @@ use std::collections::hash_map; use std::collections::HashMap; use std::collections::{BTreeSet, HashSet, VecDeque}; use std::fmt::Write; -use std::pin::Pin; +use std::sync::Mutex; use std::time::{Duration, Instant}; // Information related to a single transaction that we sent in the past. @@ -124,6 +124,21 @@ pub(crate) enum SentBatch { ExtraTxs(Vec), } +// an access key's account ID and public key, along with the id of the tx or receipt that might +// have udpated it +pub(crate) struct UpdatedKey { + pub(crate) account_id: AccountId, + pub(crate) public_key: PublicKey, + pub(crate) id: CryptoHash, +} + +// return value of on_target_block() +pub(crate) struct TargetBlockInfo { + // these accounts need to be unstaked + pub(crate) staked_accounts: HashMap<(AccountId, PublicKey), AccountId>, + // these access keys that were previously unavailable may now be available + pub(crate) access_key_updates: Vec, +} // Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch() // Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes. @@ -132,7 +147,6 @@ pub(crate) enum SentBatch { pub(crate) struct TxTracker { sent_txs: HashMap, txs_by_signer: HashMap<(AccountId, PublicKey), BTreeSet>, - queued_blocks: VecDeque, // for each updater (a tx or receipt hash, or a queued transaction we haven't sent yet), keeps // a set of access keys who might be updated by it updater_to_keys: HashMap>, @@ -146,7 +160,6 @@ pub(crate) struct TxTracker { nonempty_height_queued: Option, height_popped: Option, height_seen: Option, - send_time: Pin>, // Config value in the target chain, used to judge how long to wait before sending a new batch of txs min_block_production_delay: Duration, // optional specific tx send delay @@ -176,13 +189,8 @@ impl TxTracker { next_heights, stop_height, tx_batch_interval, - // Wait at least 15 seconds before sending any transactions because for - // a few seconds after the node starts, transaction routing requests - // will be silently dropped by the peer manager. - send_time: Box::pin(tokio::time::sleep(std::time::Duration::from_secs(15))), sent_txs: HashMap::new(), txs_by_signer: HashMap::new(), - queued_blocks: VecDeque::new(), updater_to_keys: HashMap::new(), nonces: HashMap::new(), height_queued: None, @@ -194,20 +202,20 @@ impl TxTracker { } pub(crate) async fn next_heights( - &mut self, + me: &Mutex, source_chain: &T, ) -> anyhow::Result<(Option, Option)> { - while self.next_heights.len() <= crate::CREATE_ACCOUNT_DELTA { + let (mut next_heights, height_queued) = { + let t = me.lock().unwrap(); + (t.next_heights.clone(), t.height_queued) + }; + while next_heights.len() <= crate::CREATE_ACCOUNT_DELTA { // we unwrap() the height_queued because Self::new() should have been called with // nonempty next_heights. - let h = self - .next_heights - .iter() - .next_back() - .cloned() - .unwrap_or_else(|| self.height_queued.unwrap()); + let h = + next_heights.iter().next_back().cloned().unwrap_or_else(|| height_queued.unwrap()); match source_chain.get_next_block_height(h).await { - Ok(h) => self.next_heights.push_back(h), + Ok(h) => next_heights.push_back(h), Err(ChainError::Unknown) => break, Err(ChainError::Other(e)) => { return Err(e) @@ -215,15 +223,13 @@ impl TxTracker { } }; } - let next_height = self.next_heights.get(0).cloned(); - let create_account_height = self.next_heights.get(crate::CREATE_ACCOUNT_DELTA).cloned(); + let mut t = me.lock().unwrap(); + t.next_heights = next_heights; + let next_height = t.next_heights.get(0).cloned(); + let create_account_height = t.next_heights.get(crate::CREATE_ACCOUNT_DELTA).cloned(); Ok((next_height, create_account_height)) } - pub(crate) fn has_stop_height(&self) -> bool { - self.stop_height.is_some() - } - pub(crate) fn finished(&self) -> bool { match self.stop_height { Some(_) => { @@ -234,12 +240,8 @@ impl TxTracker { } } - pub(crate) fn num_blocks_queued(&self) -> usize { - self.queued_blocks.len() - } - - async fn initialize_target_nonce<'a>( - &'a mut self, + async fn initialize_target_nonce( + lock: &Mutex, target_view_client: &Addr, db: &DB, access_key: &(AccountId, PublicKey), @@ -273,47 +275,34 @@ impl TxTracker { } } }; - self.nonces.insert(access_key.clone(), info); + let mut me = lock.lock().unwrap(); + me.nonces.insert(access_key.clone(), info); Ok(()) } - async fn read_target_nonce<'a>( - &'a mut self, - target_view_client: &Addr, - db: &DB, - access_key: &(AccountId, PublicKey), - source_height: Option, - ) -> anyhow::Result<&'a mut NonceInfo> { - if !self.nonces.contains_key(access_key) { - self.initialize_target_nonce(target_view_client, db, access_key, source_height).await?; - } - Ok(self.nonces.get_mut(access_key).unwrap()) - } - - pub(crate) async fn next_nonce<'a>( - &'a mut self, + pub(crate) async fn next_nonce( + lock: &Mutex, target_view_client: &Addr, db: &DB, signer_id: &AccountId, public_key: &PublicKey, source_height: BlockHeight, - ) -> anyhow::Result<&'a TargetNonce> { + ) -> anyhow::Result { let source_height = Some(source_height); - let info = self - .read_target_nonce( - target_view_client, - db, - &(signer_id.clone(), public_key.clone()), - source_height, - ) - .await?; + let access_key = (signer_id.clone(), public_key.clone()); + if !lock.lock().unwrap().nonces.contains_key(&access_key) { + Self::initialize_target_nonce(lock, target_view_client, db, &access_key, source_height) + .await?; + } + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if source_height > info.last_height { info.last_height = source_height; } if let Some(nonce) = &mut info.target_nonce.nonce { *nonce += 1; } - Ok(&info.target_nonce) + Ok(info.target_nonce.clone()) } // normally when we're adding txs, we're adding a tx that @@ -322,7 +311,8 @@ impl TxTracker { // we've used so far + 1. But if we want to add a tx at the beginning, // we need to shift all the bigger nonces by one. pub(crate) async fn insert_nonce( - &mut self, + lock: &Mutex, + tx_block_queue: &Mutex>, target_view_client: &Addr, db: &DB, signer_id: &AccountId, @@ -330,27 +320,31 @@ impl TxTracker { secret_key: &SecretKey, ) -> anyhow::Result { let access_key = (signer_id.clone(), public_key.clone()); - if !self.nonces.contains_key(&access_key) { - self.initialize_target_nonce(target_view_client, db, &access_key, None).await?; - let info = self.nonces.get_mut(&access_key).unwrap(); + if !lock.lock().unwrap().nonces.contains_key(&access_key) { + Self::initialize_target_nonce(lock, target_view_client, db, &access_key, None).await?; + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if let Some(nonce) = &mut info.target_nonce.nonce { *nonce += 1; } return Ok(info.target_nonce.clone()); } + let mut me = lock.lock().unwrap(); let mut first_nonce = None; - let txs = self.nonces.get(&access_key).unwrap().queued_txs.clone(); - for tx_ref in txs { - let tx = self.get_tx(&tx_ref); - if first_nonce.is_none() { - first_nonce = Some(tx.target_nonce()); + let txs = me.nonces.get(&access_key).unwrap().queued_txs.clone(); + if !txs.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for tx_ref in txs { + let tx = Self::get_tx(&mut tx_block_queue, &tx_ref); + if first_nonce.is_none() { + first_nonce = Some(tx.target_nonce()); + } + tx.inc_target_nonce(secret_key) } - tx.inc_target_nonce(secret_key) } match first_nonce { Some(n) => { - if let Some(nonce) = - &mut self.nonces.get_mut(&access_key).unwrap().target_nonce.nonce + if let Some(nonce) = &mut me.nonces.get_mut(&access_key).unwrap().target_nonce.nonce { *nonce += 1; } @@ -363,18 +357,20 @@ impl TxTracker { } } - fn get_tx(&mut self, tx_ref: &TxRef) -> &mut TargetChainTx { - let block_idx = self - .queued_blocks + fn get_tx<'a>( + tx_block_queue: &'a mut VecDeque, + tx_ref: &TxRef, + ) -> &'a mut TargetChainTx { + let block_idx = tx_block_queue .binary_search_by(|b| b.source_height.cmp(&tx_ref.source_height)) .unwrap(); - let block = &mut self.queued_blocks[block_idx]; + let block = &mut tx_block_queue[block_idx]; let chunk = block.chunks.iter_mut().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); &mut chunk.txs[tx_ref.tx_idx] } async fn insert_access_key_updates( - &mut self, + lock: &Mutex, target_view_client: &Addr, db: &DB, tx_ref: &TxRef, @@ -383,8 +379,18 @@ impl TxTracker { ) -> anyhow::Result<()> { let source_height = Some(source_height); for access_key in nonce_updates.iter() { - let info = - self.read_target_nonce(target_view_client, db, access_key, source_height).await?; + if !lock.lock().unwrap().nonces.contains_key(access_key) { + Self::initialize_target_nonce( + lock, + target_view_client, + db, + &access_key, + source_height, + ) + .await?; + } + let mut me = lock.lock().unwrap(); + let info = me.nonces.get_mut(&access_key).unwrap(); if info.last_height < source_height { info.last_height = source_height; @@ -392,7 +398,8 @@ impl TxTracker { info.target_nonce.pending_outcomes.insert(NonceUpdater::TxRef(tx_ref.clone())); } if !nonce_updates.is_empty() { - assert!(self + let mut me = lock.lock().unwrap(); + assert!(me .updater_to_keys .insert(NonceUpdater::TxRef(tx_ref.clone()), nonce_updates.clone()) .is_none()); @@ -400,25 +407,28 @@ impl TxTracker { Ok(()) } - pub(crate) async fn queue_block( - &mut self, - block: MappedBlock, - target_view_client: &Addr, - db: &DB, - ) -> anyhow::Result<()> { - self.height_queued = Some(block.source_height); - self.next_heights.pop_front().unwrap(); + // This is the non-async portion of queue_block() that returns a list of access key updates we need + // to call insert_access_key_updates() for, which we'll do after calling this function. Otherwise + // we would have to lock and unlock the mutex on every transaction to avoid holding it across await points + fn queue_txs<'a>( + lock: &Mutex, + block: &'a MappedBlock, + ) -> anyhow::Result)>> { + let mut nonce_updates = Vec::new(); + let mut me = lock.lock().unwrap(); + me.height_queued = Some(block.source_height); + me.next_heights.pop_front().unwrap(); for c in block.chunks.iter() { if !c.txs.is_empty() { - self.nonempty_height_queued = Some(block.source_height); + me.nonempty_height_queued = Some(block.source_height); } for (tx_idx, tx) in c.txs.iter().enumerate() { let tx_ref = TxRef { source_height: block.source_height, shard_id: c.shard_id, tx_idx }; match tx { crate::TargetChainTx::Ready(tx) => { - let info = self + let info = me .nonces .get_mut(&( tx.target_tx.transaction.signer_id().clone(), @@ -426,17 +436,12 @@ impl TxTracker { )) .unwrap(); info.queued_txs.insert(tx_ref.clone()); - self.insert_access_key_updates( - target_view_client, - db, - &tx_ref, - &tx.nonce_updates, - block.source_height, - ) - .await?; + if !tx.nonce_updates.is_empty() { + nonce_updates.push((tx_ref, &tx.nonce_updates)); + } } crate::TargetChainTx::AwaitingNonce(tx) => { - let info = self + let info = me .nonces .get_mut(&( tx.target_tx.signer_id().clone(), @@ -445,133 +450,39 @@ impl TxTracker { .unwrap(); info.txs_awaiting_nonce.insert(tx_ref.clone()); info.queued_txs.insert(tx_ref.clone()); - self.insert_access_key_updates( - target_view_client, - db, - &tx_ref, - &tx.nonce_updates, - block.source_height, - ) - .await?; + if !tx.nonce_updates.is_empty() { + nonce_updates.push((tx_ref, &tx.nonce_updates)); + } } }; } } - self.queued_blocks.push_back(block); - Ok(()) - } - - pub(crate) fn next_batch_time(&self) -> Instant { - self.send_time.as_ref().deadline().into_std() + Ok(nonce_updates) } - async fn try_set_batch_nonces( - &mut self, + pub(crate) async fn queue_block( + lock: &Mutex, + tx_block_queue: &Mutex>, + block: MappedBlock, target_view_client: &Addr, db: &DB, ) -> anyhow::Result<()> { - let mut needed_access_keys = HashSet::new(); - for c in self.queued_blocks[0].chunks.iter_mut() { - for tx in c.txs.iter_mut() { - if let TargetChainTx::AwaitingNonce(t) = tx { - needed_access_keys.insert(( - t.target_tx.signer_id().clone(), - t.target_tx.public_key().clone(), - )); - } - } - } - for access_key in needed_access_keys.iter() { - self.try_set_nonces(target_view_client, db, access_key, None).await?; - } - let block = &mut self.queued_blocks[0]; - self.height_popped = Some(block.source_height); - for c in block.chunks.iter_mut() { - for (tx_idx, tx) in c.txs.iter_mut().enumerate() { - match tx { - TargetChainTx::AwaitingNonce(_) => { - let tx_ref = TxRef { - source_height: block.source_height, - shard_id: c.shard_id, - tx_idx, - }; - tx.try_set_nonce(None); - match tx { - TargetChainTx::Ready(t) => { - tracing::debug!( - target: "mirror", "Prepared {} for ({}, {:?}) with nonce {} even though there are still pending outcomes that may affect the access key", - &t.provenance, t.target_tx.transaction.signer_id(), t.target_tx.transaction.public_key(), t.target_tx.transaction.nonce() - ); - self.nonces - .get_mut(&( - t.target_tx.transaction.signer_id().clone(), - t.target_tx.transaction.public_key().clone(), - )) - .unwrap() - .txs_awaiting_nonce - .remove(&tx_ref); - } - TargetChainTx::AwaitingNonce(t) => { - tracing::warn!( - target: "mirror", "Could not prepare {} for ({}, {:?}). Nonce unknown", - &t.provenance, t.target_tx.signer_id(), t.target_tx.public_key(), - ); - self.nonces - .get_mut(&( - t.target_tx.signer_id().clone(), - t.target_tx.public_key().clone(), - )) - .unwrap() - .txs_awaiting_nonce - .remove(&tx_ref); - } - }; - } - TargetChainTx::Ready(_) => {} - }; - } + let key_updates = Self::queue_txs(lock, &block)?; + for (tx_ref, nonce_updates) in key_updates { + Self::insert_access_key_updates( + lock, + target_view_client, + db, + &tx_ref, + nonce_updates, + block.source_height, + ) + .await?; } + tx_block_queue.lock().unwrap().push_back(block); Ok(()) } - pub(crate) async fn next_batch( - &mut self, - target_view_client: &Addr, - db: &DB, - ) -> anyhow::Result { - // sleep until 20 milliseconds before we want to send transactions before we check for nonces - // in the target chain. In the second or so between now and then, we might process another block - // that will set the nonces. - tokio::time::sleep_until( - self.send_time.as_ref().deadline() - std::time::Duration::from_millis(20), - ) - .await; - self.try_set_batch_nonces(target_view_client, db).await?; - (&mut self.send_time).await; - let block = self.queued_blocks.pop_front().unwrap(); - let b = TxBatch { - source_height: block.source_height, - source_hash: block.source_hash, - txs: block - .chunks - .into_iter() - .flat_map(|c| { - c.txs.into_iter().enumerate().map(move |(tx_idx, tx)| { - ( - TxRef { - source_height: block.source_height, - shard_id: c.shard_id, - tx_idx, - }, - tx, - ) - }) - }) - .collect(), - }; - Ok(b) - } - fn remove_tx(&mut self, tx: &IndexerTransactionWithOutcome) { let k = (tx.transaction.signer_id.clone(), tx.transaction.public_key.clone()); match self.txs_by_signer.entry(k.clone()) { @@ -693,6 +604,7 @@ impl TxTracker { fn tx_to_receipt( &mut self, + tx_block_queue: &Mutex>, db: &DB, tx_hash: &CryptoHash, receipt_id: &CryptoHash, @@ -716,16 +628,19 @@ impl TxTracker { if let Some(info) = self.nonces.get(access_key) { let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); - for r in txs_awaiting_nonce.iter() { - let tx = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let tx = Self::get_tx(&mut tx_block_queue, r); - match tx { - TargetChainTx::AwaitingNonce(t) => { - assert!(t.target_nonce.pending_outcomes.remove(&updater)); - t.target_nonce.pending_outcomes.insert(new_updater.clone()); - } - TargetChainTx::Ready(_) => unreachable!(), - }; + match tx { + TargetChainTx::AwaitingNonce(t) => { + assert!(t.target_nonce.pending_outcomes.remove(&updater)); + t.target_nonce.pending_outcomes.insert(new_updater.clone()); + } + TargetChainTx::Ready(_) => unreachable!(), + }; + } } let info = self.nonces.get_mut(access_key).unwrap(); @@ -740,63 +655,61 @@ impl TxTracker { Ok(()) } - async fn try_set_nonces( + pub(crate) fn try_set_nonces( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, - access_key: &(AccountId, PublicKey), - id: Option<&CryptoHash>, + updated_key: UpdatedKey, + mut nonce: Option, ) -> anyhow::Result<()> { - let mut n = crate::read_target_nonce(db, &access_key.0, &access_key.1)?.unwrap(); - if let Some(id) = id { - n.pending_outcomes.remove(id); - } - let mut nonce = - crate::fetch_access_key_nonce(target_view_client, &access_key.0, &access_key.1).await?; + let mut n = crate::read_target_nonce(db, &updated_key.account_id, &updated_key.public_key)? + .unwrap(); + n.pending_outcomes.remove(&updated_key.id); n.nonce = std::cmp::max(n.nonce, nonce); - crate::put_target_nonce(db, &access_key.0, &access_key.1, &n)?; + crate::put_target_nonce(db, &updated_key.account_id, &updated_key.public_key, &n)?; - let updater = id.map(|id| NonceUpdater::ChainObjectId(*id)); - if let Some(info) = self.nonces.get_mut(access_key) { - if let Some(updater) = &updater { - info.target_nonce.pending_outcomes.remove(updater); - } + let updater = NonceUpdater::ChainObjectId(updated_key.id); + let access_key = (updated_key.account_id.clone(), updated_key.public_key.clone()); + + if let Some(info) = self.nonces.get_mut(&access_key) { + info.target_nonce.pending_outcomes.remove(&updater); let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); let mut to_remove = Vec::new(); - for r in txs_awaiting_nonce.iter() { - let tx = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let tx = Self::get_tx(&mut tx_block_queue, r); - match tx { - TargetChainTx::AwaitingNonce(t) => { - if let Some(updater) = &updater { - t.target_nonce.pending_outcomes.remove(updater); - } - if let Some(nonce) = &mut nonce { - *nonce += 1; - } + match tx { + TargetChainTx::AwaitingNonce(t) => { + t.target_nonce.pending_outcomes.remove(&updater); + if let Some(nonce) = &mut nonce { + *nonce += 1; + } - if t.target_nonce.pending_outcomes.is_empty() { - to_remove.push(r.clone()); - tx.try_set_nonce(nonce); - match tx { - TargetChainTx::Ready(t) => { - tracing::debug!(target: "mirror", "set nonce for {:?}'s {} to {}", access_key, r, t.target_tx.transaction.nonce()); - } - _ => { - tracing::warn!(target: "mirror", "Couldn't set nonce for {:?}'s {}", access_key, r); + if t.target_nonce.pending_outcomes.is_empty() { + to_remove.push(r.clone()); + tx.try_set_nonce(nonce); + match tx { + TargetChainTx::Ready(t) => { + tracing::debug!(target: "mirror", "set nonce for {:?}'s {} to {}", &access_key, r, t.target_tx.transaction.nonce()); + } + _ => { + tracing::warn!(target: "mirror", "Couldn't set nonce for {:?}'s {}", &access_key, r); + } } + } else { + t.target_nonce.nonce = std::cmp::max(t.target_nonce.nonce, nonce); } - } else { - t.target_nonce.nonce = std::cmp::max(t.target_nonce.nonce, nonce); } - } - TargetChainTx::Ready(_) => unreachable!(), - }; + TargetChainTx::Ready(_) => unreachable!(), + }; + } } - let info = self.nonces.get_mut(access_key).unwrap(); + let info = self.nonces.get_mut(&access_key).unwrap(); for r in to_remove.iter() { info.txs_awaiting_nonce.remove(r); } @@ -805,27 +718,23 @@ impl TxTracker { Ok(()) } - async fn on_outcome_finished( + fn on_outcome_finished( &mut self, - target_view_client: &Addr, db: &DB, id: &CryptoHash, - access_keys: HashSet<(AccountId, PublicKey)>, + access_keys: &HashSet<(AccountId, PublicKey)>, ) -> anyhow::Result<()> { let updater = NonceUpdater::ChainObjectId(*id); if let Some(keys) = self.updater_to_keys.remove(&updater) { - assert!(access_keys == keys); + assert!(access_keys == &keys); } - for access_key in access_keys.iter() { - self.try_set_nonces(target_view_client, db, &access_key, Some(id)).await?; - } crate::delete_pending_outcome(db, id) } - async fn on_target_block_tx( + fn on_target_block_tx( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, tx: IndexerTransactionWithOutcome, ) -> anyhow::Result<()> { @@ -839,44 +748,42 @@ impl TxTracker { if let Some(access_keys) = crate::read_pending_outcome(db, &tx.transaction.hash)? { match tx.outcome.execution_outcome.outcome.status { ExecutionStatusView::SuccessReceiptId(receipt_id) => { - self.tx_to_receipt(db, &tx.transaction.hash, &receipt_id, access_keys)? + self.tx_to_receipt( + tx_block_queue, + db, + &tx.transaction.hash, + &receipt_id, + access_keys, + )?; } ExecutionStatusView::SuccessValue(_) | ExecutionStatusView::Unknown => { unreachable!() } ExecutionStatusView::Failure(_) => { - self.on_outcome_finished( - target_view_client, - db, - &tx.transaction.hash, - access_keys, - ) - .await? + self.on_outcome_finished(db, &tx.transaction.hash, &access_keys)?; } - } + }; } Ok(()) } - async fn on_target_block_applied_receipt( + fn on_target_block_applied_receipt( &mut self, - target_view_client: &Addr, db: &DB, outcome: IndexerExecutionOutcomeWithReceipt, staked_accounts: &mut HashMap<(AccountId, PublicKey), AccountId>, + access_key_updates: &mut Vec, ) -> anyhow::Result<()> { let access_keys = match crate::read_pending_outcome(db, &outcome.execution_outcome.id)? { Some(a) => a, None => return Ok(()), }; - self.on_outcome_finished( - target_view_client, - db, - &outcome.execution_outcome.id, - access_keys, - ) - .await?; + self.on_outcome_finished(db, &outcome.execution_outcome.id, &access_keys)?; + access_key_updates.extend(access_keys.into_iter().map(|(account_id, public_key)| { + UpdatedKey { account_id, public_key, id: outcome.execution_outcome.id } + })); + for receipt_id in outcome.execution_outcome.outcome.receipt_ids { // we don't carry over the access keys here, because we set pending access keys when we send a tx with // an add key action, which should be applied after one receipt. Setting empty access keys here allows us @@ -911,37 +818,38 @@ impl TxTracker { // receipt for any receipts that contain stake actions (w/ nonzero stake) that were // generated by our transactions. Then the caller will send extra stake transactions // to reverse those. - pub(crate) async fn on_target_block( + pub(crate) fn on_target_block( &mut self, - target_view_client: &Addr, + tx_block_queue: &Mutex>, db: &DB, msg: StreamerMessage, - ) -> anyhow::Result> { + ) -> anyhow::Result { self.record_block_timestamp(&msg); self.log_target_block(&msg); + let mut access_key_updates = Vec::new(); let mut staked_accounts = HashMap::new(); for s in msg.shards { if let Some(c) = s.chunk { for tx in c.transactions { - self.on_target_block_tx(target_view_client, db, tx).await?; + self.on_target_block_tx(tx_block_queue, db, tx)?; } for outcome in s.receipt_execution_outcomes { self.on_target_block_applied_receipt( - target_view_client, db, outcome, &mut staked_accounts, - ) - .await?; + &mut access_key_updates, + )?; } } } - Ok(staked_accounts) + Ok(TargetBlockInfo { staked_accounts, access_key_updates }) } - async fn on_tx_sent( + fn on_tx_sent( &mut self, + tx_block_queue: &Mutex>, db: &DB, tx_ref: Option, tx: MappedTx, @@ -1007,16 +915,19 @@ impl TxTracker { access_keys_to_remove.insert(access_key.clone()); } - for r in txs_awaiting_nonce.iter() { - let t = self.get_tx(r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let t = Self::get_tx(&mut tx_block_queue, r); - match t { - TargetChainTx::AwaitingNonce(t) => { - assert!(t.target_nonce.pending_outcomes.remove(&updater)); - t.target_nonce.pending_outcomes.insert(new_updater.clone()); - } - TargetChainTx::Ready(_) => unreachable!(), - }; + match t { + TargetChainTx::AwaitingNonce(t) => { + assert!(t.target_nonce.pending_outcomes.remove(&updater)); + t.target_nonce.pending_outcomes.insert(new_updater.clone()); + } + TargetChainTx::Ready(_) => unreachable!(), + }; + } } } } @@ -1107,6 +1018,7 @@ impl TxTracker { fn on_tx_skipped( &mut self, + tx_block_queue: &Mutex>, tx_ref: &Option, tx: &Transaction, nonce_updates: &HashSet<(AccountId, PublicKey)>, @@ -1124,25 +1036,28 @@ impl TxTracker { if let Some(info) = self.nonces.get(&access_key) { let txs_awaiting_nonce = info.txs_awaiting_nonce.clone(); let mut to_remove = Vec::new(); - for r in txs_awaiting_nonce.iter() { - let target_tx = self.get_tx(r); - match target_tx { - TargetChainTx::AwaitingNonce(tx) => { - assert!(tx.target_nonce.pending_outcomes.remove(&updater)); - if tx.target_nonce.pending_outcomes.is_empty() { - target_tx.try_set_nonce(None); - match target_tx { - TargetChainTx::Ready(t) => { - tracing::debug!(target: "mirror", "After skipping {} setting nonce for {:?}'s {} to {}", tx_ref, &access_key, r, t.target_tx.transaction.nonce()); - } - _ => { - tracing::warn!(target: "mirror", "After skipping {} could not set nonce for {:?}'s {}", tx_ref, &access_key, r); + if !txs_awaiting_nonce.is_empty() { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + for r in txs_awaiting_nonce.iter() { + let target_tx = Self::get_tx(&mut tx_block_queue, r); + match target_tx { + TargetChainTx::AwaitingNonce(tx) => { + assert!(tx.target_nonce.pending_outcomes.remove(&updater)); + if tx.target_nonce.pending_outcomes.is_empty() { + target_tx.try_set_nonce(None); + match target_tx { + TargetChainTx::Ready(t) => { + tracing::debug!(target: "mirror", "After skipping {} setting nonce for {:?}'s {} to {}", tx_ref, &access_key, r, t.target_tx.transaction.nonce()); + } + _ => { + tracing::warn!(target: "mirror", "After skipping {} could not set nonce for {:?}'s {}", tx_ref, &access_key, r); + } } + to_remove.push(r.clone()); } - to_remove.push(r.clone()); } + TargetChainTx::Ready(_) => unreachable!(), } - TargetChainTx::Ready(_) => unreachable!(), } } @@ -1167,24 +1082,36 @@ impl TxTracker { } // We just successfully sent some transactions. Remember them so we can see if they really show up on chain. - pub(crate) async fn on_txs_sent( + // Returns the new amount that we should wait before sending transactions + pub(crate) fn on_txs_sent( &mut self, + tx_block_queue: &Mutex>, db: &DB, sent_batch: SentBatch, target_height: BlockHeight, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let mut total_sent = 0; let now = Instant::now(); let mut access_keys_to_remove = HashSet::new(); let (txs_sent, provenance) = match sent_batch { SentBatch::MappedBlock(b) => { - let block_delay = self.tx_batch_interval.unwrap_or_else(|| { - self.second_longest_recent_block_delay() - .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)) - }); - self.send_time.as_mut().reset(tokio::time::Instant::now() + block_delay); - crate::set_last_source_height(db, b.source_height)?; + self.height_popped = Some(b.source_height); + for (tx_ref, tx) in b.txs.iter() { + match tx { + TargetChainTx::AwaitingNonce(t) => { + self.nonces + .get_mut(&( + t.target_tx.signer_id().clone(), + t.target_tx.public_key().clone(), + )) + .unwrap() + .txs_awaiting_nonce + .remove(&tx_ref); + } + TargetChainTx::Ready(_) => {} + }; + } let txs = b.txs.into_iter().map(|(tx_ref, tx)| (Some(tx_ref), tx)).collect::>(); (txs, format!("source #{}", b.source_height)) @@ -1199,17 +1126,18 @@ impl TxTracker { crate::TargetChainTx::Ready(t) => { if t.sent_successfully { self.on_tx_sent( + tx_block_queue, db, tx_ref, t, target_height, now, &mut access_keys_to_remove, - ) - .await?; + )?; total_sent += 1; } else { self.on_tx_skipped( + tx_block_queue, &tx_ref, &t.target_tx.transaction, &t.nonce_updates, @@ -1219,6 +1147,7 @@ impl TxTracker { } crate::TargetChainTx::AwaitingNonce(t) => { self.on_tx_skipped( + tx_block_queue, &tx_ref, &t.target_tx, &t.nonce_updates, @@ -1236,6 +1165,10 @@ impl TxTracker { total_sent, provenance, target_height ); - Ok(()) + let next_delay = self.tx_batch_interval.unwrap_or_else(|| { + self.second_longest_recent_block_delay() + .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)) + }); + Ok(next_delay) } } diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 2632fe455ab..a131c66f5bb 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -29,10 +29,11 @@ use near_primitives_core::account::id::AccountType; use near_primitives_core::account::{AccessKey, AccessKeyPermission}; use near_primitives_core::types::{Nonce, ShardId}; use rocksdb::DB; -use std::collections::{HashMap, HashSet}; -use std::path::Path; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; use strum::IntoEnumIterator; use tokio::sync::mpsc; @@ -408,17 +409,14 @@ struct MirrorConfig { const CREATE_ACCOUNT_DELTA: usize = 5; +// TODO: separate out the code that uses the target chain clients, and +// make it an option to send the transactions to some RPC node. +// that way it would be possible to run this code and send transactions with an +// old binary not caught up to the current protocol version, since the +// transactions we're constructing should stay valid. struct TxMirror { - target_stream: mpsc::Receiver, source_chain_access: T, - // TODO: separate out the code that uses the target chain clients, and - // make it an option to send the transactions to some RPC node. - // that way it would be possible to run this code and send transactions with an - // old binary not caught up to the current protocol version, since the - // transactions we're constructing should stay valid. - target_view_client: Addr, - target_client: Addr, - db: DB, + db: Arc, target_genesis_height: BlockHeight, target_min_block_production_delay: Duration, secret: Option<[u8; crate::secret::SECRET_LEN]>, @@ -502,7 +500,7 @@ impl std::fmt::Display for MappedTxProvenance { // what nonce to use because the public key was added in an AddKey // action that we haven't seen on chain yet. The target_tx field is complete // except for the nonce field. -#[derive(Debug)] +#[derive(Clone, Debug)] struct TxAwaitingNonce { source_signer_id: AccountId, source_receiver_id: AccountId, @@ -522,7 +520,7 @@ impl TxAwaitingNonce { target_secret_key: SecretKey, target_public_key: PublicKey, actions: Vec, - target_nonce: &TargetNonce, + target_nonce: TargetNonce, ref_hash: &CryptoHash, provenance: MappedTxProvenance, nonce_updates: HashSet<(AccountId, PublicKey)>, @@ -542,7 +540,7 @@ impl TxAwaitingNonce { target_secret_key, target_tx, nonce_updates, - target_nonce: target_nonce.clone(), + target_nonce, } } } @@ -550,7 +548,7 @@ impl TxAwaitingNonce { // A transaction meant for the target chain that is complete/ready to send. // We keep some extra info about the transaction for the purposes of logging // later on when we find it on chain. -#[derive(Debug)] +#[derive(Clone, Debug)] struct MappedTx { source_signer_id: AccountId, source_receiver_id: AccountId, @@ -604,7 +602,7 @@ impl MappedTx { } } -#[derive(Debug)] +#[derive(Clone, Debug)] enum TargetChainTx { Ready(MappedTx), AwaitingNonce(TxAwaitingNonce), @@ -681,7 +679,7 @@ impl TargetChainTx { target_secret_key: &SecretKey, target_public_key: PublicKey, actions: Vec, - target_nonce: &TargetNonce, + target_nonce: TargetNonce, ref_hash: &CryptoHash, provenance: MappedTxProvenance, nonce_updates: HashSet<(AccountId, PublicKey)>, @@ -744,6 +742,31 @@ struct TxBatch { txs: Vec<(TxRef, TargetChainTx)>, } +impl From<&MappedBlock> for TxBatch { + fn from(block: &MappedBlock) -> Self { + Self { + source_height: block.source_height, + source_hash: block.source_hash, + txs: block + .chunks + .iter() + .flat_map(|c| { + c.txs.iter().enumerate().map(move |(tx_idx, tx)| { + ( + TxRef { + source_height: block.source_height, + shard_id: c.shard_id, + tx_idx, + }, + tx.clone(), + ) + }) + }) + .collect(), + } + } +} + async fn account_exists( view_client: &Addr, account_id: &AccountId, @@ -804,31 +827,29 @@ async fn fetch_access_key_nonce( } impl TxMirror { - fn new>( + fn new( source_chain_access: T, - target_home: P, - mirror_db_path: Option

, + target_home: &Path, + mirror_db_path: Option<&Path>, secret: Option<[u8; crate::secret::SECRET_LEN]>, config: MirrorConfig, ) -> anyhow::Result { let target_config = - nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast) - .with_context(|| { - format!("Error loading target config from {:?}", target_home.as_ref()) - })?; + nearcore::config::load_config(target_home, GenesisValidationMode::UnsafeFast) + .with_context(|| format!("Error loading target config from {:?}", target_home))?; if !target_config.client_config.archive { // this is probably not going to come up, but we want to avoid a situation where // we go offline for a long time and then come back online, and we state sync to // the head of the target chain without looking for our outcomes that made it on // chain right before we went offline - anyhow::bail!("config file in {} has archive: false, but archive must be set to true for the target chain", target_home.as_ref().display()); + anyhow::bail!("config file in {} has archive: false, but archive must be set to true for the target chain", target_home.display()); } let db = match mirror_db_path { Some(mirror_db_path) => open_db(mirror_db_path), None => { // keep backward compatibility let mirror_db_path = near_store::NodeStorage::opener( - target_home.as_ref(), + target_home, target_config.config.archive, &target_config.config.store, None, @@ -839,22 +860,11 @@ impl TxMirror { } }; let db = db.context("failed to open mirror DB")?; - let target_indexer = Indexer::new(near_indexer::IndexerConfig { - home_dir: target_home.as_ref().to_path_buf(), - sync_mode: near_indexer::SyncModeEnum::FromInterruption, - await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, - validate_genesis: false, - }) - .context("failed to start target chain indexer")?; - let (target_view_client, target_client) = target_indexer.client_actors(); - let target_stream = target_indexer.streamer(); + let db = Arc::new(db); let default_extra_key = crate::key_mapping::default_extra_key(secret.as_ref()); Ok(Self { source_chain_access, - target_client, - target_view_client, - target_stream, db, target_genesis_height: target_config.genesis.config.genesis_height, target_min_block_production_delay: target_config @@ -868,14 +878,13 @@ impl TxMirror { } async fn send_transactions<'a, I: Iterator>( - &mut self, + target_client: &Addr, txs: I, ) -> anyhow::Result<()> { for tx in txs { match tx { TargetChainTx::Ready(tx) => { - match self - .target_client + match target_client .send( ProcessTxRequest { transaction: tx.target_tx.clone(), @@ -925,6 +934,7 @@ impl TxMirror { async fn map_actions( &self, + target_view_client: &Addr, tx: &SignedTransaction, ) -> anyhow::Result<(Vec, HashSet<(AccountId, PublicKey)>)> { let mut actions = Vec::new(); @@ -969,12 +979,9 @@ impl TxMirror { &tx.transaction.receiver_id(), self.secret.as_ref(), ); - if !account_exists(&self.target_view_client, &target_account) - .await - .with_context(|| { - format!("failed checking existence for account {}", &target_account) - })? - { + if !account_exists(target_view_client, &target_account).await.with_context( + || format!("failed checking existence for account {}", &target_account), + )? { if target_account.get_account_type() == AccountType::NearImplicitAccount { let public_key = @@ -1015,7 +1022,9 @@ impl TxMirror { async fn prepare_tx( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, source_signer_id: AccountId, source_receiver_id: AccountId, target_signer_id: AccountId, @@ -1028,33 +1037,35 @@ impl TxMirror { nonce_updates: HashSet<(AccountId, PublicKey)>, ) -> anyhow::Result { let target_public_key = target_secret_key.public_key(); + // TODO: clean up this function. The logic is hard to follow let target_nonce = match source_height.as_ref() { Some(_) => None, None => Some( - tracker - .insert_nonce( - &self.target_view_client, - &self.db, - &target_signer_id, - &target_public_key, - target_secret_key, - ) - .await?, + crate::chain_tracker::TxTracker::insert_nonce( + tracker, + tx_block_queue, + target_view_client, + &self.db, + &target_signer_id, + &target_public_key, + target_secret_key, + ) + .await?, ), }; let target_nonce = match source_height { Some(source_height) => { - tracker - .next_nonce( - &self.target_view_client, - &self.db, - &target_signer_id, - &target_public_key, - source_height, - ) - .await? + crate::chain_tracker::TxTracker::next_nonce( + tracker, + target_view_client, + &self.db, + &target_signer_id, + &target_public_key, + source_height, + ) + .await? } - None => target_nonce.as_ref().unwrap(), + None => target_nonce.unwrap(), }; if target_nonce.pending_outcomes.is_empty() && target_nonce.nonce.is_some() { @@ -1092,7 +1103,9 @@ impl TxMirror { // then the only keys we will have mapped are the ones added by regular AddKey transactions. async fn push_extra_tx( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, block_hash: CryptoHash, txs: &mut Vec, predecessor_id: AccountId, @@ -1116,7 +1129,7 @@ impl TxMirror { for k in keys.iter() { let target_secret_key = crate::key_mapping::map_key(k, self.secret.as_ref()); if fetch_access_key_nonce( - &self.target_view_client, + target_view_client, &target_signer_id, &target_secret_key.public_key(), ) @@ -1213,6 +1226,8 @@ impl TxMirror { let target_tx = self .prepare_tx( tracker, + tx_block_queue, + target_view_client, predecessor_id, receiver_id, target_signer_id, @@ -1231,7 +1246,9 @@ impl TxMirror { async fn add_function_call_keys( &self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, receipt_id: &CryptoHash, receiver_id: &AccountId, @@ -1314,6 +1331,8 @@ impl TxMirror { self.push_extra_tx( tracker, + tx_block_queue, + target_view_client, outcome.block_hash, txs, receipt.predecessor_id().clone(), @@ -1336,7 +1355,9 @@ impl TxMirror { provenance: MappedTxProvenance, source_height: BlockHeight, ref_hash: &CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { // if signer and receiver are the same then the resulting local receipt @@ -1354,6 +1375,8 @@ impl TxMirror { { self.add_function_call_keys( tracker, + tx_block_queue, + target_view_client, txs, &receipt_id, &tx.transaction.receiver_id(), @@ -1376,13 +1399,17 @@ impl TxMirror { provenance: MappedTxProvenance, source_height: BlockHeight, ref_hash: &CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { if let ReceiptEnum::Action(r) | ReceiptEnum::PromiseYield(r) = receipt.receipt() { if r.actions.iter().any(|a| matches!(a, Action::FunctionCall(_))) { self.add_function_call_keys( tracker, + tx_block_queue, + target_view_client, txs, receipt.receipt_id(), receipt.receiver_id(), @@ -1403,7 +1430,9 @@ impl TxMirror { &self, create_account_height: BlockHeight, ref_hash: CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, txs: &mut Vec, ) -> anyhow::Result<()> { let source_block = @@ -1418,6 +1447,8 @@ impl TxMirror { create_account_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, txs, ) .await?; @@ -1435,6 +1466,8 @@ impl TxMirror { create_account_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, txs, ) .await?; @@ -1451,7 +1484,9 @@ impl TxMirror { source_height: BlockHeight, create_account_height: Option, ref_hash: CryptoHash, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, ) -> anyhow::Result { let source_block = self.source_chain_access.get_txs(source_height).await.with_context(|| { @@ -1463,7 +1498,8 @@ impl TxMirror { let mut txs = Vec::new(); for (idx, source_tx) in ch.transactions.into_iter().enumerate() { - let (actions, nonce_updates) = self.map_actions(&source_tx).await?; + let (actions, nonce_updates) = + self.map_actions(target_view_client, &source_tx).await?; if actions.is_empty() { // If this is a tx containing only stake actions, skip it. continue; @@ -1485,6 +1521,8 @@ impl TxMirror { let target_tx = self .prepare_tx( tracker, + tx_block_queue, + target_view_client, source_tx.transaction.signer_id().clone(), source_tx.transaction.receiver_id().clone(), target_signer_id, @@ -1504,6 +1542,8 @@ impl TxMirror { source_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, &mut txs, ) .await?; @@ -1515,6 +1555,8 @@ impl TxMirror { source_height, &ref_hash, tracker, + tx_block_queue, + target_view_client, &mut txs, ) .await?; @@ -1533,6 +1575,8 @@ impl TxMirror { create_account_height, ref_hash, tracker, + tx_block_queue, + target_view_client, &mut chunks[0].txs, ) .await?; @@ -1546,21 +1590,28 @@ impl TxMirror { // Up to a certain capacity, prepare and queue up batches of // transactions that we want to send to the target chain. + // Returns the number of blocks worth of txs queued at the end. + // `have_stop_height` refers to whether we're going to stop sending transactions and exit after a particular height async fn queue_txs( &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_view_client: &Addr, ref_hash: CryptoHash, - check_send_time: bool, + have_stop_height: bool, ) -> anyhow::Result<()> { - if tracker.num_blocks_queued() > 100 { + let mut num_blocks_queued = { + let tx_block_queue = tx_block_queue.lock().unwrap(); + tx_block_queue.len() + }; + if num_blocks_queued > 100 { return Ok(()); } - let next_batch_time = tracker.next_batch_time(); - loop { let (next_height, create_account_height) = - tracker.next_heights(&self.source_chain_access).await?; + crate::chain_tracker::TxTracker::next_heights(&tracker, &self.source_chain_access) + .await?; let next_height = match next_height { Some(h) => h, @@ -1568,26 +1619,34 @@ impl TxMirror { }; // if we have a stop height, just send the last few blocks without worrying about // extra create account txs, otherwise wait until we get more blocks - if !tracker.has_stop_height() && create_account_height.is_none() { + if !have_stop_height && create_account_height.is_none() { return Ok(()); } let b = self - .fetch_txs(next_height, create_account_height, ref_hash, tracker) + .fetch_txs( + next_height, + create_account_height, + ref_hash, + tracker, + tx_block_queue, + target_view_client, + ) .await .with_context(|| format!("Can't fetch source #{} transactions", next_height))?; - tracker.queue_block(b, &self.target_view_client, &self.db).await?; - if tracker.num_blocks_queued() > 100 { - break; - } + crate::chain_tracker::TxTracker::queue_block( + tracker, + &tx_block_queue, + b, + target_view_client, + &self.db, + ) + .await?; - if check_send_time - && tracker.num_blocks_queued() > 0 - && Instant::now() + Duration::from_millis(20) > next_batch_time - { - break; + num_blocks_queued += 1; + if num_blocks_queued > 100 { + return Ok(()); } } - Ok(()) } // send stake txs for zero stake for each of the stake actions we just saw in @@ -1597,7 +1656,10 @@ impl TxMirror { // retry later if the tx got lost for some reason async fn unstake( &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_client: &Addr, + target_view_client: &Addr, stakes: HashMap<(AccountId, PublicKey), AccountId>, source_hash: &CryptoHash, target_hash: &CryptoHash, @@ -1607,6 +1669,8 @@ impl TxMirror { for ((receiver_id, public_key), predecessor_id) in stakes { self.push_extra_tx( tracker, + tx_block_queue, + target_view_client, *source_hash, &mut txs, predecessor_id, @@ -1619,61 +1683,206 @@ impl TxMirror { .await?; } if !txs.is_empty() { - self.send_transactions(txs.iter_mut()).await?; - tracker - .on_txs_sent( - &self.db, - crate::chain_tracker::SentBatch::ExtraTxs(txs), - target_height, + Self::send_transactions(target_client, txs.iter_mut()).await?; + let mut tracker = tracker.lock().unwrap(); + tracker.on_txs_sent( + tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::ExtraTxs(txs), + target_height, + )?; + } + Ok(()) + } + + async fn send_txs_loop( + db: Arc, + blocks_sent: mpsc::Sender, + tx_block_queue: Arc>>, + mut send_time: Pin>, + send_delay: Arc>, + target_client: Addr, + ) -> anyhow::Result<()> { + let mut sent_source_height = None; + + loop { + (&mut send_time).await; + + let tx_batch = { + let tx_block_queue = tx_block_queue.lock().unwrap(); + let b = match sent_source_height { + Some(sent_source_height) => { + let mut block_idx = None; + for (idx, b) in tx_block_queue.iter().enumerate() { + if b.source_height > sent_source_height { + block_idx = Some(idx); + break; + } + } + match block_idx { + Some(idx) => tx_block_queue.get(idx), + None => None, + } + } + None => tx_block_queue.get(0), + }; + b.map(|b| TxBatch::from(b)) + }; + + let mut tx_batch = match tx_batch { + Some(b) => b, + None => { + tokio::time::sleep(Duration::from_millis(200)).await; + continue; + } + }; + + let start_time = tokio::time::Instant::now(); + + tracing::debug!(target: "mirror", "Sending transactions for source block #{}", tx_batch.source_height); + Self::send_transactions( + &target_client, + tx_batch.txs.iter_mut().map(|(_tx_ref, tx)| tx), + ) + .await?; + set_last_source_height(&db, tx_batch.source_height)?; + sent_source_height = Some(tx_batch.source_height); + + blocks_sent.send(tx_batch).await.unwrap(); + + let send_delay = *send_delay.lock().unwrap(); + tracing::debug!(target: "mirror", "Sleeping for {:?} until sending more transactions", &send_delay); + let next_send_time = start_time + send_delay; + send_time.as_mut().reset(next_send_time); + } + } + + async fn index_target_loop( + tracker: Arc>, + tx_block_queue: Arc>>, + home_dir: PathBuf, + db: Arc, + clients_tx: tokio::sync::oneshot::Sender<(Addr, Addr)>, + accounts_to_unstake: mpsc::Sender>, + target_height: Arc>, + target_head: Arc>, + ) -> anyhow::Result<()> { + let target_indexer = Indexer::new(near_indexer::IndexerConfig { + home_dir, + sync_mode: near_indexer::SyncModeEnum::FromInterruption, + await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, + validate_genesis: false, + }) + .context("failed to start target chain indexer")?; + let (target_view_client, target_client) = target_indexer.client_actors(); + let mut target_stream = target_indexer.streamer(); + let (first_target_height, first_target_head) = Self::index_target_chain( + &tracker, + &tx_block_queue, + &mut target_stream, + db.as_ref(), + &target_view_client, + &target_client, + ) + .await?; + *target_height.write().unwrap() = first_target_height; + *target_head.write().unwrap() = first_target_head; + clients_tx.send((target_client.clone(), target_view_client.clone())).unwrap(); + + loop { + let msg = target_stream.recv().await.unwrap(); + *target_head.write().unwrap() = msg.block.header.hash; + *target_height.write().unwrap() = msg.block.header.height; + let target_block_info = { + let mut tracker = tracker.lock().unwrap(); + tracker.on_target_block(&tx_block_queue, db.as_ref(), msg)? + }; + if !target_block_info.staked_accounts.is_empty() { + accounts_to_unstake.send(target_block_info.staked_accounts).await.unwrap(); + } + for access_key_update in target_block_info.access_key_updates { + let nonce = crate::fetch_access_key_nonce( + &target_view_client, + &access_key_update.account_id, + &access_key_update.public_key, ) .await?; + let mut tracker = tracker.lock().unwrap(); + tracker.try_set_nonces(&tx_block_queue, db.as_ref(), access_key_update, nonce)?; + } } - Ok(()) } - async fn main_loop( + async fn queue_txs_loop( &mut self, - mut tracker: crate::chain_tracker::TxTracker, - mut target_height: BlockHeight, - mut target_head: CryptoHash, + tracker: Arc>, + tx_block_queue: Arc>>, + target_client: Addr, + target_view_client: Addr, + mut blocks_sent: mpsc::Receiver, + mut accounts_to_unstake: mpsc::Receiver>, + send_delay: Arc>, + target_height: Arc>, + target_head: Arc>, mut source_hash: CryptoHash, + have_stop_height: bool, ) -> anyhow::Result<()> { + let mut queue_txs_time = tokio::time::interval(Duration::from_millis(100)); + loop { tokio::select! { // time to send a batch of transactions - tx_batch = tracker.next_batch(&self.target_view_client, &self.db), if tracker.num_blocks_queued() > 0 => { - let mut tx_batch = tx_batch?; - source_hash = tx_batch.source_hash; - self.send_transactions(tx_batch.txs.iter_mut().map(|(_tx_ref, tx)| tx)).await?; - tracker.on_txs_sent(&self.db, crate::chain_tracker::SentBatch::MappedBlock(tx_batch), target_height).await?; - - // now we have one second left until we need to send more transactions. In the - // meantime, we might as well prepare some more batches of transactions. - // TODO: continue in best effort fashion on error - self.queue_txs(&mut tracker, target_head, true).await?; + _ = queue_txs_time.tick() => { + let target_head = *target_head.read().unwrap(); + self.queue_txs(&tracker, &tx_block_queue, &target_view_client, target_head, have_stop_height).await?; } - msg = self.target_stream.recv() => { - let msg = msg.unwrap(); - target_head = msg.block.header.hash; - target_height = msg.block.header.height; - let staked_accounts = tracker.on_target_block(&self.target_view_client, &self.db, msg).await?; - self.unstake(&mut tracker, staked_accounts, &source_hash, &target_head, target_height).await?; + tx_batch = blocks_sent.recv() => { + let tx_batch = tx_batch.unwrap(); + source_hash = tx_batch.source_hash; + // lock the tracker before removing the block from the queue so that + // we don't call on_target_block() in the other thread between removing the block + // and calling on_txs_sent(), because that could lead to a bug looking up transactions + // in TxTracker::get_tx() + let mut tracker = tracker.lock().unwrap(); + { + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + let b = tx_block_queue.pop_front().unwrap(); + assert!(b.source_height == tx_batch.source_height); + }; + let target_height = *target_height.read().unwrap(); + let new_delay = tracker.on_txs_sent( + &tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::MappedBlock(tx_batch), + target_height, + )?; + *send_delay.lock().unwrap() = new_delay; } - // If we don't have any upcoming sets of transactions to send already built, we probably fell behind in the source - // chain and can't fetch the transactions. Check if we have them now here. - _ = tokio::time::sleep(std::time::Duration::from_millis(200)), if tracker.num_blocks_queued() == 0 => { - self.queue_txs(&mut tracker, target_head, true).await?; + msg = accounts_to_unstake.recv() => { + let staked_accounts = msg.unwrap(); + let target_head = *target_head.read().unwrap(); + let target_height = *target_height.read().unwrap(); + self.unstake( + &tracker, &tx_block_queue, &target_client, + &target_view_client, staked_accounts, &source_hash, + &target_head, target_height + ).await?; } }; - if tracker.finished() { - tracing::info!(target: "mirror", "finished sending all transactions"); - return Ok(()); + // TODO: this locking of the mutex before continuing the loop is kind of unnecessary since we should be able to tell + // exactly when we've done the thing that makes finished() return true, usually after a call to on_target_block() + { + let tracker = tracker.lock().unwrap(); + if tracker.finished() { + tracing::info!(target: "mirror", "finished sending all transactions"); + return Ok(()); + } } } } - async fn target_chain_syncing(&self) -> bool { - self.target_client + async fn target_chain_syncing(target_client: &Addr) -> bool { + target_client .send(Status { is_health_check: false, detailed: false }.with_span_context()) .await .unwrap() @@ -1681,9 +1890,10 @@ impl TxMirror { .unwrap_or(true) } - async fn target_chain_head(&self) -> anyhow::Result<(BlockHeight, CryptoHash)> { - let header = self - .target_view_client + async fn target_chain_head( + target_view_client: &Addr, + ) -> anyhow::Result<(BlockHeight, CryptoHash)> { + let header = target_view_client .send(GetBlock(BlockReference::Finality(Finality::Final)).with_span_context()) .await .unwrap() @@ -1694,16 +1904,25 @@ impl TxMirror { // call tracker.on_target_block() on each target chain block until that client is synced async fn index_target_chain( - &mut self, - tracker: &mut crate::chain_tracker::TxTracker, + tracker: &Mutex, + tx_block_queue: &Mutex>, + target_stream: &mut mpsc::Receiver, + db: &DB, + target_view_client: &Addr, + target_client: &Addr, ) -> anyhow::Result<(BlockHeight, CryptoHash)> { let mut head = None; loop { - let msg = self.target_stream.recv().await.unwrap(); + let msg = target_stream.recv().await.unwrap(); let height = msg.block.header.height; - tracker.on_target_block(&self.target_view_client, &self.db, msg).await?; + { + let mut tracker = tracker.lock().unwrap(); + // TODO: handle the return value. it is possible we want to unstake or update nonces + // after a restart. + tracker.on_target_block(&tx_block_queue, db, msg)?; + } match head { Some((head_height, head_hash)) => { @@ -1712,15 +1931,19 @@ impl TxMirror { } } None => { - if !self.target_chain_syncing().await { - head = Some(self.target_chain_head().await?); + if !Self::target_chain_syncing(target_client).await { + head = Some(Self::target_chain_head(target_view_client).await?); } } } } } - async fn run(mut self, stop_height: Option) -> anyhow::Result<()> { + async fn run( + mut self, + stop_height: Option, + target_home: PathBuf, + ) -> anyhow::Result<()> { let last_stored_height = get_last_source_height(&self.db)?; let last_height = last_stored_height.unwrap_or(self.target_genesis_height - 1); @@ -1747,13 +1970,56 @@ impl TxMirror { tracing::debug!(target: "mirror", "source chain initialized with first heights: {:?}", &next_heights); - let mut tracker = crate::chain_tracker::TxTracker::new( + let tracker = Arc::new(Mutex::new(crate::chain_tracker::TxTracker::new( self.target_min_block_production_delay, self.config.tx_batch_interval, next_heights.iter(), stop_height, - ); - let (target_height, target_head) = self.index_target_chain(&mut tracker).await?; + ))); + let target_height = Arc::new(RwLock::new(0)); + let target_head = Arc::new(RwLock::new(CryptoHash::default())); + let (clients_tx, clients_rx) = tokio::sync::oneshot::channel(); + let (target_indexer_done_tx, target_indexer_done_rx) = + tokio::sync::oneshot::channel::>(); + let (unstake_tx, unstake_rx) = mpsc::channel(10); + + let db = self.db.clone(); + let target_height2 = target_height.clone(); + let target_head2 = target_head.clone(); + let tracker2 = tracker.clone(); + let index_target_thread = actix::Arbiter::new(); + + let tx_block_queue = Arc::new(Mutex::new(VecDeque::new())); + + let tx_block_queue2 = tx_block_queue.clone(); + index_target_thread.spawn(async move { + let res = Self::index_target_loop( + tracker2, + tx_block_queue2, + target_home, + db, + clients_tx, + unstake_tx, + target_height2, + target_head2, + ) + .await; + target_indexer_done_tx.send(res).unwrap(); + }); + + // wait til we set the values in target_height and target_head after receiving a message from the indexer + let (target_client, target_view_client) = clients_rx.await.unwrap(); + + // Wait at least 15 seconds before sending any transactions because for + // a few seconds after the node starts, transaction routing requests + // will be silently dropped by the peer manager. + let mut send_time = Box::pin(tokio::time::sleep(std::time::Duration::from_secs(15))); + let mut send_delay = self + .config + .tx_batch_interval + .unwrap_or(self.target_min_block_production_delay + Duration::from_millis(100)); + + let initial_target_head = *target_head.read().unwrap(); if last_stored_height.is_none() { // send any extra function call-initiated create accounts for the first few blocks right now // we set source_hash to 0 because we don't actually care about it here, and it doesn't even exist since these are @@ -1763,35 +2029,101 @@ impl TxMirror { source_height: last_height, chunks: vec![MappedChunk { shard_id: 0, txs: Vec::new() }], }; + for h in next_heights { - self.add_create_account_txs(h, target_head, &mut tracker, &mut block.chunks[0].txs) - .await?; + self.add_create_account_txs( + h, + initial_target_head, + &tracker, + &tx_block_queue, + &target_view_client, + &mut block.chunks[0].txs, + ) + .await?; } if block.chunks.iter().any(|c| !c.txs.is_empty()) { tracing::debug!(target: "mirror", "sending extra create account transactions for the first {} blocks", CREATE_ACCOUNT_DELTA); - tracker.queue_block(block, &self.target_view_client, &self.db).await?; - let mut b = tracker.next_batch(&self.target_view_client, &self.db).await?; - self.send_transactions(b.txs.iter_mut().map(|(_tx_ref, tx)| tx)).await?; - tracker - .on_txs_sent( + let mut b = { + crate::chain_tracker::TxTracker::queue_block( + &tracker, + &tx_block_queue, + block, + &target_view_client, &self.db, - crate::chain_tracker::SentBatch::MappedBlock(b), - target_height, ) .await?; + (&mut send_time).await; + let mut tx_block_queue = tx_block_queue.lock().unwrap(); + TxBatch::from(&tx_block_queue.pop_front().unwrap()) + }; + Self::send_transactions(&target_client, b.txs.iter_mut().map(|(_tx_ref, tx)| tx)) + .await?; + let mut tracker = tracker.lock().unwrap(); + send_delay = tracker.on_txs_sent( + &tx_block_queue, + &self.db, + crate::chain_tracker::SentBatch::MappedBlock(b), + *target_height.read().unwrap(), + )?; + } + } + self.queue_txs( + &tracker, + &tx_block_queue, + &target_view_client, + initial_target_head, + stop_height.is_some(), + ) + .await?; + + let send_delay = Arc::new(Mutex::new(send_delay)); + let send_delay2 = send_delay.clone(); + let (blocks_sent_tx, blocks_sent_rx) = mpsc::channel(10); + let tx_block_queue2 = tx_block_queue.clone(); + let target_client2 = target_client.clone(); + let db = self.db.clone(); + let send_txs_thread = actix::Arbiter::new(); + let (send_txs_done_tx, send_txs_done_rx) = + tokio::sync::oneshot::channel::>(); + send_txs_thread.spawn(async move { + let res = Self::send_txs_loop( + db, + blocks_sent_tx, + tx_block_queue2, + send_time, + send_delay2, + target_client2, + ) + .await; + send_txs_done_tx.send(res).unwrap(); + }); + tokio::select! { + res = self.queue_txs_loop( + tracker, tx_block_queue, target_client, target_view_client, + blocks_sent_rx, unstake_rx, send_delay, target_height, target_head, + source_hash, stop_height.is_some(), + ) => { + // TODO: cancel other threads + res + } + res = target_indexer_done_rx => { + let res = res.unwrap(); + tracing::error!("target indexer thread exited"); + res.context("target indexer thread failure") + } + res = send_txs_done_rx => { + let res = res.unwrap(); + tracing::error!("transaction sending thread exited"); + res.context("target indexer thread failure") } } - - self.queue_txs(&mut tracker, target_head, false).await?; - - self.main_loop(tracker, target_height, target_head, source_hash).await } } async fn run>( source_home: P, target_home: P, - mirror_db_path: Option

, + mirror_db_path: Option, secret: Option<[u8; crate::secret::SECRET_LEN]>, stop_height: Option, online_source: bool, @@ -1811,18 +2143,24 @@ async fn run>( let stop_height = stop_height.unwrap_or( source_chain_access.head_height().await.context("could not fetch source chain head")?, ); - TxMirror::new(source_chain_access, target_home, mirror_db_path, secret, config)? - .run(Some(stop_height)) - .await + TxMirror::new( + source_chain_access, + target_home.as_ref(), + mirror_db_path.as_deref(), + secret, + config, + )? + .run(Some(stop_height), target_home.as_ref().to_path_buf()) + .await } else { TxMirror::new( crate::online::ChainAccess::new(source_home)?, - target_home, - mirror_db_path, + target_home.as_ref(), + mirror_db_path.as_deref(), secret, config, )? - .run(stop_height) + .run(stop_height, target_home.as_ref().to_path_buf()) .await } }