Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stage-extensions): New abstraction for non-core stages and tables #4325

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"crates/rpc/rpc-types",
"crates/rpc/rpc-testing-util",
"crates/stages",
"crates/stage-extensions",
"crates/storage/codecs",
"crates/storage/db",
"crates/storage/libmdbx-rs",
Expand Down
1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
reth-revm = { path = "../../crates/revm" }
reth-revm-inspectors = { path = "../../crates/revm/revm-inspectors" }
reth-stages = { path = "../../crates/stages" }
reth-stage-extensions = { path = "../../crates/stage-extensions" }
reth-interfaces = { workspace = true, features = ["test-utils", "clap"] }
reth-transaction-pool.workspace = true
reth-beacon-consensus = { path = "../../crates/consensus/beacon" }
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_provider::{ProviderFactory, StageCheckpointReader};

use crate::args::{utils::genesis_value_parser, DatabaseArgs};
use reth_config::Config;
use reth_db::{database::Database, init_db};
use reth_db::{database::Database, init_db, NO_TABLES};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl ImportCommand {
let db_path = data_dir.db_path();

info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?);
info!(target: "reth::cli", "Database opened");

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/chain/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
init::init_genesis,
};
use clap::Parser;
use reth_db::init_db;
use reth_db::{init_db, NO_TABLES};
use reth_primitives::ChainSpec;
use std::sync::Arc;
use tracing::info;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl InitCommand {
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(&db_path, self.db.log_level)?);
let db = Arc::new(init_db(&db_path, self.db.log_level, NO_TABLES)?);
info!(target: "reth::cli", "Database opened");

info!(target: "reth::cli", "Writing genesis block");
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/db/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use reth_db::{
database::Database,
table::Table,
transaction::{DbTx, DbTxMut},
TableViewer, Tables,
TableViewer, Tables, TableMetadata,
};

/// The arguments for the `reth db clear` command
Expand Down
9 changes: 5 additions & 4 deletions bin/reth/src/db/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use reth_db::{
AccountChangeSet, AccountHistory, AccountsTrie, BlockBodyIndices, BlockOmmers,
BlockWithdrawals, Bytecodes, CanonicalHeaders, DatabaseEnvRO, HashedAccount, HashedStorage,
HeaderNumbers, HeaderTD, Headers, PlainAccountState, PlainStorageState, PruneCheckpoints,
Receipts, StorageChangeSet, StorageHistory, StoragesTrie, SyncStage, SyncStageProgress, Tables,
TransactionBlock, Transactions, TxHashNumber, TxSenders,
Receipts, StorageChangeSet, StorageHistory, StoragesTrie, SyncStage, SyncStageProgress,
TableMetadata, Tables, TransactionBlock, Transactions, TxHashNumber, TxSenders,
NO_TABLES,
};
use tracing::info;

Expand Down Expand Up @@ -61,11 +62,11 @@ impl Command {
pub fn execute(self, tool: &DbTool<'_, DatabaseEnvRO>) -> eyre::Result<()> {
// open second db
let second_db_path: PathBuf = self.secondary_datadir.join("db").into();
let second_db = open_db_read_only(&second_db_path, self.second_db.log_level)?;
let second_db = open_db_read_only(&second_db_path, self.second_db.log_level, NO_TABLES)?;

let tables = match self.table {
Some(table) => vec![table],
None => Tables::ALL.to_vec(),
None => Tables::all_tables_in_group(),
};

for table in tables {
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/db/get.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::utils::DbTool;
use clap::Parser;

use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables};
use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables, TableMetadata};
use tracing::error;

/// The arguments for the `reth db get` command
Expand All @@ -13,7 +13,7 @@ pub struct Command {
#[arg()]
pub table: Tables,

/// The key to get content for
/// The key to get content for
#[arg(value_parser = maybe_json_value_parser)]
pub key: String,
}
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::tui::DbListTUI;
use crate::utils::{DbTool, ListFilter};
use clap::Parser;
use eyre::WrapErr;
use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables};
use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables, TableMetadata};
use std::cell::RefCell;
use tracing::error;

Expand Down
20 changes: 11 additions & 9 deletions bin/reth/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_db::{
database::Database,
open_db, open_db_read_only,
version::{get_db_version, DatabaseVersionError, DB_VERSION},
Tables,
TableMetadata, Tables, NO_TABLES,
};
use reth_primitives::ChainSpec;
use std::{
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Command {
match self.command {
// TODO: We'll need to add this on the DB trait.
Subcommands::Stats { .. } => {
let db = open_db_read_only(&db_path, self.db.log_level)?;
let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let mut stats_table = ComfyTable::new();
stats_table.load_preset(comfy_table::presets::ASCII_MARKDOWN);
Expand All @@ -114,8 +114,10 @@ impl Command {
]);

tool.db.view(|tx| {
let mut tables =
Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
let mut tables = Tables::all_tables_in_group()
.into_iter()
.map(|table| table.name())
.collect::<Vec<_>>();
tables.sort();
let mut total_size = 0;
for table in tables {
Expand Down Expand Up @@ -171,17 +173,17 @@ impl Command {
println!("{stats_table}");
}
Subcommands::List(command) => {
let db = open_db_read_only(&db_path, self.db.log_level)?;
let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?;
let tool = DbTool::new(&db, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Diff(command) => {
let db = open_db_read_only(&db_path, self.db.log_level)?;
let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?;
let tool = DbTool::new(&db, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Get(command) => {
let db = open_db_read_only(&db_path, self.db.log_level)?;
let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?;
let tool = DbTool::new(&db, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -201,12 +203,12 @@ impl Command {
}
}

let db = open_db(&db_path, self.db.log_level)?;
let db = open_db(&db_path, self.db.log_level, NO_TABLES)?;
let mut tool = DbTool::new(&db, self.chain.clone())?;
tool.drop(db_path)?;
}
Subcommands::Clear(command) => {
let db = open_db(&db_path, self.db.log_level)?;
let db = open_db(&db_path, self.db.log_level, NO_TABLES)?;
command.execute(&db)?;
}
Subcommands::Version => {
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{database::Database, init_db, DatabaseEnv};
use reth_db::{database::Database, init_db, DatabaseEnv, NO_TABLES};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl Command {
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
fs::create_dir_all(&db_path)?;
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?);

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use backon::{ConstantBuilder, Retryable};
use clap::Parser;
use reth_config::Config;
use reth_db::{init_db, DatabaseEnv};
use reth_db::{init_db, DatabaseEnv, NO_TABLES};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Command {
fs::create_dir_all(&db_path)?;

// initialize the database
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?);
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider = factory.provider()?;

Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use backon::{ConstantBuilder, Retryable};
use clap::Parser;
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv};
use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv, NO_TABLES};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_interfaces::{consensus::Consensus, p2p::full_block::FullBlockClient};
use reth_network::NetworkHandle;
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Command {
fs::create_dir_all(&db_path)?;

// initialize the database
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?);
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;

Expand Down
21 changes: 14 additions & 7 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_config::{config::PruneConfig, Config};
use reth_db::{database::Database, init_db, DatabaseEnv};
use reth_db::{database::Database, init_db, DatabaseEnv, TableMetadata};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
Expand Down Expand Up @@ -55,12 +55,13 @@ use reth_prune::BatchSizes;
use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi;
use reth_stage_extensions::{AddressStage, NonCoreTable};
use reth_stages::{
prelude::*,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, NonCoreStage,
},
MetricEventsSender, MetricsListener,
};
Expand Down Expand Up @@ -212,7 +213,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {

let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(&db_path, self.db.log_level)?);
let optional_tables = Some(NonCoreTable::all_tables_in_group());
let db = Arc::new(init_db(&db_path, self.db.log_level, optional_tables)?);
info!(target: "reth::cli", "Database opened");

self.start_metrics_endpoint(Arc::clone(&db)).await?;
Expand Down Expand Up @@ -767,7 +769,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {

let header_mode =
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
let pipeline = builder
let mut pipeline_builder = builder
.with_tip_sender(tip_tx)
.with_metrics_tx(metrics_tx.clone())
.add_stages(
Expand Down Expand Up @@ -816,9 +818,14 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
))
.set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold,
)),
)
.build(db, self.chain.clone());
))
);
// TODO: Get non-core stages from CLI args or config file.
let non_core_stages = vec![AddressStage::new()];
for stage in non_core_stages {
pipeline_builder = pipeline_builder.add_stage(stage);
};
let pipeline = pipeline_builder.build(db, self.chain.clone());

Ok(pipeline)
}
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use backon::{ConstantBuilder, Retryable};
use clap::{Parser, Subcommand};
use reth_config::Config;
use reth_db::open_db;
use reth_db::{open_db, NO_TABLES};
use reth_discv4::NatResolver;
use reth_interfaces::p2p::bodies::client::BodiesClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord};
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Command {
/// Execute `p2p` command
pub async fn execute(&self) -> eyre::Result<()> {
let tempdir = tempfile::TempDir::new()?;
let noop_db = Arc::new(open_db(&tempdir.into_path(), self.db.log_level)?);
let noop_db = Arc::new(open_db(&tempdir.into_path(), self.db.log_level, NO_TABLES)?);

// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
Expand Down
5 changes: 3 additions & 2 deletions bin/reth/src/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hyper::{
};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use reth_db::{database::Database, tables, DatabaseEnv};
use reth_db::{database::Database, tables, DatabaseEnv, TableMetadata};
use reth_metrics::metrics::{self, absolute_counter, describe_counter, Unit};
use std::{convert::Infallible, net::SocketAddr, sync::Arc};

Expand Down Expand Up @@ -76,7 +76,8 @@ pub(crate) async fn initialize(
// TODO: A generic stats abstraction for other DB types to deduplicate this and `reth db
// stats`
let _ = db.view(|tx| {
for table in tables::Tables::ALL.iter().map(|table| table.name()) {
for t in &tables::Tables::all_tables_in_group() {
let table = t.name();
let table_db =
tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/recover/storage_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use clap::Parser;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRW},
init_db, tables,
transaction::DbTx,
transaction::DbTx, NO_TABLES,
};
use reth_primitives::ChainSpec;
use reth_provider::{BlockNumReader, HeaderProvider, ProviderError, ProviderFactory};
Expand Down Expand Up @@ -53,7 +53,7 @@ impl Command {
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
fs::create_dir_all(&db_path)?;
let db = Arc::new(init_db(db_path, None)?);
let db = Arc::new(init_db(db_path, None, NO_TABLES)?);

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
Expand Down
Loading