Skip to content

Commit

Permalink
metrics: add metrics to ananlze bottleneck
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Dec 3, 2024
1 parent fd3d73c commit 3e8cc66
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/grevm-fmt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ jobs:

- name: Run cargo fmt
run: cargo +nightly fmt --check

- name: Run cargo check
run: RUSTFLAGS="-D warnings" cargo check
30 changes: 22 additions & 8 deletions src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
use revm::primitives::{
alloy_primitives::U160, keccak256, ruint::UintTryFrom, Address, Bytes, TxEnv, TxKind, B256,
U256,
};
use std::sync::Arc;

use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

/// This module provides functionality for parsing and handling execution hints
/// for parallel transaction execution in the context of Ethereum-like blockchains.
Expand Down Expand Up @@ -80,11 +82,13 @@ pub(crate) struct ParallelExecutionHints {
/// Shared transaction states that will be updated with read/write sets
/// based on the contract interactions.
tx_states: SharedTxStates,
pub unknown_contract_tx_cnt: u64,
pub raw_tx_cnt: u64,
}

impl ParallelExecutionHints {
pub(crate) fn new(tx_states: SharedTxStates) -> Self {
Self { tx_states }
Self { tx_states, unknown_contract_tx_cnt: 0, raw_tx_cnt: 0 }
}

/// Obtain a mutable reference to shared transaction states, and parse execution hints for each
Expand All @@ -98,7 +102,9 @@ impl ParallelExecutionHints {
/// no conflicts between transactions, making the `Mutex` approach unnecessarily verbose and
/// cumbersome.
#[fastrace::trace]
pub(crate) fn parse_hints(&self, txs: Arc<Vec<TxEnv>>) {
pub(crate) fn parse_hints(&mut self, txs: Arc<Vec<TxEnv>>) {
let num_unknown_contract_tx = AtomicU64::new(0);
let num_raw_tx = AtomicU64::new(0);
// Utilize fork-join utility to process transactions in parallel
fork_join_util(txs.len(), None, |start_tx, end_tx, _| {
#[allow(invalid_reference_casting)]
Expand All @@ -122,18 +128,26 @@ impl ParallelExecutionHints {
&tx_env.data,
rw_set,
) {
num_unknown_contract_tx.fetch_add(1, Ordering::Relaxed);
rw_set.insert_location(
LocationAndType::Basic(to_address),
RWType::WriteOnly,
);
}
} else if to_address != tx_env.caller {
rw_set
.insert_location(LocationAndType::Basic(to_address), RWType::ReadWrite);
} else {
num_raw_tx.fetch_add(1, Ordering::Relaxed);
if to_address != tx_env.caller {
rw_set.insert_location(
LocationAndType::Basic(to_address),
RWType::ReadWrite,
);
}
}
}
}
});
self.unknown_contract_tx_cnt = num_unknown_contract_tx.load(Ordering::Acquire);
self.raw_tx_cnt = num_raw_tx.load(Ordering::Acquire);
}

/// This function computes the storage slot using the provided slot number and a vector of
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub mod storage;
mod tx_dependency;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

lazy_static! {
static ref DEBUG_BOTTLENECK: bool =
std::env::var("DEBUG_BOTTLENECK").map_or(false, |v| v == "on");
}

lazy_static! {
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
}
Expand Down
7 changes: 2 additions & 5 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,10 @@ where
*evm.tx_mut() = tx.clone();
evm.db_mut().current_txid = txid;
evm.db_mut().raw_transfer = true; // no need to wait miner rewards
let mut raw_transfer = true;
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
raw_transfer = info.is_empty_code_hash();
}
let mut raw_transfer = false;
if let TxKind::Call(to) = tx.transact_to {
if let Ok(Some(info)) = evm.db_mut().basic(to) {
raw_transfer &= info.is_empty_code_hash();
raw_transfer = info.is_empty_code_hash();
}
}
evm.db_mut().raw_transfer = raw_transfer;
Expand Down
49 changes: 35 additions & 14 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
storage::{SchedulerDB, State},
tx_dependency::{DependentTxsVec, TxDependency},
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
DEBUG_BOTTLENECK, GREVM_RUNTIME, MAX_NUM_ROUND,
};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use atomic::Atomic;
Expand Down Expand Up @@ -75,6 +75,10 @@ struct ExecuteMetrics {
commit_transition_time: metrics::Histogram,
/// Time taken to execute transactions in sequential(in nanoseconds).
sequential_execute_time: metrics::Histogram,
/// Number of transactions that failed to parse contract
unknown_contract_tx_cnt: metrics::Histogram,
/// Number of raw transactions
raw_tx_cnt: metrics::Histogram,
}

impl Default for ExecuteMetrics {
Expand All @@ -100,6 +104,8 @@ impl Default for ExecuteMetrics {
merge_write_set_time: histogram!("grevm.merge_write_set_time"),
commit_transition_time: histogram!("grevm.commit_transition_time"),
sequential_execute_time: histogram!("grevm.sequential_execute_time"),
unknown_contract_tx_cnt: histogram!("grevm.unknown_contract_tx_cnt"),
raw_tx_cnt: histogram!("grevm.raw_tx_cnt"),
}
}
}
Expand Down Expand Up @@ -127,6 +133,8 @@ struct ExecuteMetricsCollector {
merge_write_set_time: u64,
commit_transition_time: u64,
sequential_execute_time: u64,
unknown_contract_tx_cnt: u64,
raw_tx_cnt: u64,
}

impl ExecuteMetricsCollector {
Expand All @@ -152,6 +160,8 @@ impl ExecuteMetricsCollector {
execute_metrics.merge_write_set_time.record(self.merge_write_set_time as f64);
execute_metrics.commit_transition_time.record(self.commit_transition_time as f64);
execute_metrics.sequential_execute_time.record(self.sequential_execute_time as f64);
execute_metrics.unknown_contract_tx_cnt.record(self.unknown_contract_tx_cnt as f64);
execute_metrics.raw_tx_cnt.record(self.raw_tx_cnt as f64);
}
}

Expand Down Expand Up @@ -348,9 +358,10 @@ where

/// Get the partitioned transactions by dependencies.
#[fastrace::trace]
pub(crate) fn partition_transactions(&mut self) {
pub(crate) fn partition_transactions(&mut self, round: usize) {
// compute and assign partitioned_txs
let start = Instant::now();
self.tx_dependencies.round = Some(round);
self.partitioned_txs = self.tx_dependencies.fetch_best_partitions(self.num_partitions);
self.num_partitions = self.partitioned_txs.len();
let mut max = 0;
Expand Down Expand Up @@ -412,13 +423,15 @@ where
let start = Instant::now();
let mut merged_write_set: HashMap<LocationAndType, BTreeSet<TxId>> = HashMap::new();
let mut end_skip_id = self.num_finality_txs;
for txid in self.num_finality_txs..self.tx_states.len() {
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
end_skip_id == txid
{
end_skip_id += 1;
} else {
break;
if !(*DEBUG_BOTTLENECK) {
for txid in self.num_finality_txs..self.tx_states.len() {
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
end_skip_id == txid
{
end_skip_id += 1;
} else {
break;
}
}
}
if end_skip_id != self.tx_states.len() {
Expand All @@ -438,8 +451,7 @@ where
/// and there is no need to record the dependency and dependent relationships of these
/// transactions. Thus achieving the purpose of pruning.
#[fastrace::trace]
fn update_and_pruning_dependency(&mut self) {
let num_finality_txs = self.num_finality_txs;
fn update_and_pruning_dependency(&mut self, num_finality_txs: usize) {
if num_finality_txs == self.txs.len() {
return;
}
Expand Down Expand Up @@ -524,6 +536,13 @@ where
}
}
});
if *DEBUG_BOTTLENECK && self.num_finality_txs == 0 {
// Use the read-write set to build accurate dependencies,
// and try to find the bottleneck
self.update_and_pruning_dependency(0);
self.tx_dependencies.round = None;
self.tx_dependencies.fetch_best_partitions(self.num_partitions);
}
miner_involved_txs.into_iter().collect()
}

Expand Down Expand Up @@ -651,7 +670,7 @@ where
let miner_involved_txs = self.generate_unconfirmed_txs();
let finality_tx_cnt = self.find_continuous_min_txid()?;
// update and pruning tx dependencies
self.update_and_pruning_dependency();
self.update_and_pruning_dependency(self.num_finality_txs);
self.commit_transition(finality_tx_cnt)?;
let mut rewards_accumulators = RewardsAccumulators::new();
for txid in miner_involved_txs {
Expand Down Expand Up @@ -721,10 +740,12 @@ where
#[fastrace::trace]
fn parse_hints(&mut self) {
let start = Instant::now();
let hints = ParallelExecutionHints::new(self.tx_states.clone());
let mut hints = ParallelExecutionHints::new(self.tx_states.clone());
hints.parse_hints(self.txs.clone());
self.tx_dependencies.init_tx_dependency(self.tx_states.clone());
self.metrics.parse_hints_time += start.elapsed().as_nanos() as u64;
self.metrics.unknown_contract_tx_cnt += hints.unknown_contract_tx_cnt;
self.metrics.raw_tx_cnt += hints.raw_tx_cnt;
}

#[fastrace::trace]
Expand Down Expand Up @@ -754,7 +775,7 @@ where
let mut round = 0;
while round < MAX_NUM_ROUND {
if self.num_finality_txs < self.txs.len() {
self.partition_transactions();
self.partition_transactions(round);
if self.num_partitions == 1 && !force_parallel {
break;
}
Expand Down
82 changes: 76 additions & 6 deletions src/tx_dependency.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxId, DEBUG_BOTTLENECK};
use smallvec::SmallVec;
use std::{
cmp::{min, Reverse},
cmp::{max, min, Reverse},
collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
};

use crate::{fork_join_util, LocationAndType, SharedTxStates, TxId};

pub(crate) type DependentTxsVec = SmallVec<[TxId; 1]>;

use ahash::AHashMap as HashMap;
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use metrics::counter;

const RAW_TRANSFER_WEIGHT: usize = 1;

Expand All @@ -31,6 +31,8 @@ pub(crate) struct TxDependency {
// type, while in the second round, weights can be assigned based on tx_running_time.
#[allow(dead_code)]
tx_weight: Option<Vec<usize>>,

pub round: Option<usize>,
}

impl TxDependency {
Expand All @@ -40,19 +42,21 @@ impl TxDependency {
num_finality_txs: 0,
tx_running_time: None,
tx_weight: None,
round: None,
}
}

#[fastrace::trace]
pub fn init_tx_dependency(&mut self, tx_states: SharedTxStates) {
let mut last_write_tx: HashMap<LocationAndType, TxId> = HashMap::new();
for (txid, rw_set) in tx_states.iter().enumerate() {
let dependencies = &mut self.tx_dependency[txid];
let mut dependencies = HashSet::new();
for (location, _) in rw_set.read_set.iter() {
if let Some(previous) = last_write_tx.get(location) {
dependencies.push(*previous);
dependencies.insert(*previous);
}
}
self.tx_dependency[txid] = dependencies.into_iter().collect();
for location in rw_set.write_set.iter() {
last_write_tx.insert(location.clone(), txid);
}
Expand Down Expand Up @@ -139,6 +143,7 @@ impl TxDependency {
}
txid -= 1;
}
self.skew_analyze(&weighted_group);

let num_partitions = min(partition_count, num_group);
if num_partitions == 0 {
Expand Down Expand Up @@ -204,4 +209,69 @@ impl TxDependency {
self.tx_dependency = tx_dependency;
self.num_finality_txs = num_finality_txs;
}

fn skew_analyze(&self, weighted_group: &BTreeMap<usize, Vec<DependentTxsVec>>) {
if !(*DEBUG_BOTTLENECK) {
return;
}
if let Some(0) = self.round {
counter!("grevm.total_block_cnt").increment(1);
}
let num_finality_txs = self.num_finality_txs;
let num_txs = num_finality_txs + self.tx_dependency.len();
let num_remaining = self.tx_dependency.len();
if num_txs < 64 || num_remaining < num_txs / 3 {
return;
}
let mut subgraph = BTreeSet::new();
if let Some((_, groups)) = weighted_group.last_key_value() {
if groups[0].len() >= num_remaining / 3 {
subgraph.extend(groups[0].clone());
}
}
if subgraph.is_empty() {
return;
}

// ChainLength -> ChainNumber
let mut chains = BTreeMap::new();
let mut visited = HashSet::new();
for txid in subgraph.iter().rev() {
if !visited.contains(txid) {
let mut txid = *txid;
let mut chain_len = 0;
while !visited.contains(&txid) {
chain_len += 1;
visited.insert(txid);
let dep: BTreeSet<TxId> =
self.tx_dependency[txid - num_finality_txs].clone().into_iter().collect();
for dep_id in dep.into_iter().rev() {
if !visited.contains(&dep_id) {
txid = dep_id;
break;
}
}
}
let chain_num = chains.get(&chain_len).cloned().unwrap_or(0) + 1;
chains.insert(chain_len, chain_num);
}
}

let graph_len = subgraph.len();
let tip = self.round.map(|r| format!("round{}", r)).unwrap_or(String::from("none"));
counter!("grevm.large_graph_block_cnt", "tip" => tip.clone()).increment(1);
if let Some((chain_len, _)) = chains.last_key_value() {
let chain_len = *chain_len;
if chain_len > graph_len * 2 / 3 {
// Long chain
counter!("grevm.large_graph", "type" => "chain", "tip" => tip.clone()).increment(1);
} else if chain_len < max(3, graph_len / 8) {
// Star Graph
counter!("grevm.large_graph", "type" => "star", "tip" => tip.clone()).increment(1);
} else {
// Fork Graph
counter!("grevm.large_graph", "type" => "fork", "tip" => tip.clone()).increment(1);
}
}
}
}
2 changes: 1 addition & 1 deletion tests/common/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub(crate) fn compare_evm_execute<DB>(

let snapshot = recorder.snapshotter().snapshot();
for (key, _, _, value) in snapshot.into_vec() {
println!("metrics: {} => value: {:?}", key.key().name(), value);
println!("metrics: {} => value: {:?}", key.key(), value);
if let Some(metric) = parallel_metrics.get(key.key().name()) {
let v = match value {
DebugValue::Counter(v) => v as usize,
Expand Down

0 comments on commit 3e8cc66

Please sign in to comment.