From 91331d0f6bca30cdf850a6903c89b23c8c77b8fe Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 23 Aug 2023 10:41:15 +1000 Subject: [PATCH 01/15] feat(storage): add optional non-core tables --- .../storage/db/src/implementation/mdbx/mod.rs | 52 +++++-- .../storage/db/src/implementation/mdbx/tx.rs | 20 +-- crates/storage/db/src/lib.rs | 71 +++++++--- crates/storage/db/src/tables/mod.rs | 130 ++++++++++++------ .../provider/src/providers/database/mod.rs | 12 +- 5 files changed, 195 insertions(+), 90 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index b9cce246938c..78844bd2fec3 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -4,7 +4,7 @@ use crate::{ database::{Database, DatabaseGAT}, tables::{TableType, Tables}, utils::default_page_size, - DatabaseError, + DatabaseError, TableMetadata, }; use reth_interfaces::db::LogLevel; use reth_libmdbx::{ @@ -66,6 +66,7 @@ impl Env { path: &Path, kind: EnvKind, log_level: Option, + max_tables: usize, ) -> Result, DatabaseError> { let mode = match kind { EnvKind::RO => Mode::ReadOnly, @@ -73,7 +74,7 @@ impl Env { }; let mut inner_env = Environment::new(); - inner_env.set_max_dbs(Tables::ALL.len()); + inner_env.set_max_dbs(max_tables); inner_env.set_geometry(Geometry { // Maximum database size of 4 terabytes size: Some(0..(4 * TERABYTE)), @@ -127,19 +128,32 @@ impl Env { } /// Creates all the defined tables, if necessary. - pub fn create_tables(&self) -> Result<(), DatabaseError> { + pub fn create_tables( + &self, + non_core_tables: Option>, + ) -> Result<(), DatabaseError> { let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?; - - for table in Tables::ALL { + // Core + for table in Tables::all_tables_in_group() { let flags = match table.table_type() { TableType::Table => DatabaseFlags::default(), TableType::DupSort => DatabaseFlags::DUP_SORT, }; - tx.create_db(Some(table.name()), flags) .map_err(|e| DatabaseError::TableCreation(e.into()))?; } - + // Non-core + if let Some(tables) = non_core_tables { + for table in tables { + let flags = match table.table_type() { + TableType::Table => DatabaseFlags::default(), + TableType::DupSort => DatabaseFlags::DUP_SORT, + }; + + tx.create_db(Some(table.name()), flags) + .map_err(|e| DatabaseError::TableCreation(e.into()))?; + } + } tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?; Ok(()) @@ -165,7 +179,7 @@ mod tests { tables::{AccountHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState}, test_utils::*, transaction::{DbTx, DbTxMut}, - AccountChangeSet, DatabaseError, + AccountChangeSet, DatabaseError, NUM_TABLES, }; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{NoWriteMap, WriteMap}; @@ -175,16 +189,26 @@ mod tests { /// Create database for testing fn create_test_db(kind: EnvKind) -> Arc> { + let non_core_tables: Option> = None; Arc::new(create_test_db_with_path( kind, &tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), + non_core_tables, )) } /// Create database for testing with specified path - fn create_test_db_with_path(kind: EnvKind, path: &Path) -> Env { - let env = Env::::open(path, kind, None).expect(ERROR_DB_CREATION); - env.create_tables().expect(ERROR_TABLE_CREATION); + fn create_test_db_with_path( + kind: EnvKind, + path: &Path, + non_core_tables: Option>, + ) -> Env { + let mut max_tables = NUM_TABLES; + if let Some(non_core) = &non_core_tables { + max_tables += non_core.len(); + }; + let env = Env::::open(path, kind, None, max_tables).expect(ERROR_DB_CREATION); + env.create_tables(non_core_tables).expect(ERROR_TABLE_CREATION); env } @@ -784,7 +808,6 @@ mod tests { #[test] fn db_closure_put_get() { let path = TempDir::new().expect(ERROR_TEMPDIR).into_path(); - let value = Account { nonce: 18446744073709551615, bytecode_hash: Some(H256::random()), @@ -794,7 +817,7 @@ mod tests { .expect(ERROR_ETH_ADDRESS); { - let env = create_test_db_with_path::(EnvKind::RW, &path); + let env = create_test_db_with_path::(EnvKind::RW, &path, None); // PUT let result = env.update(|tx| { @@ -804,7 +827,8 @@ mod tests { assert!(result.expect(ERROR_RETURN_VALUE) == 200); } - let env = Env::::open(&path, EnvKind::RO, None).expect(ERROR_DB_CREATION); + let env = + Env::::open(&path, EnvKind::RO, None, NUM_TABLES).expect(ERROR_DB_CREATION); // GET let result = diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 6e8558726c80..ab7281173dc8 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -3,7 +3,7 @@ use super::cursor::Cursor; use crate::{ table::{Compress, DupSort, Encode, Table, TableImporter}, - tables::{utils::decode_one, Tables, NUM_TABLES}, + tables::{utils::decode_one, Tables}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, DatabaseError, }; @@ -19,7 +19,7 @@ pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { /// Libmdbx-sys transaction. pub inner: Transaction<'a, K, E>, /// Database table handle cache - pub db_handles: Arc; NUM_TABLES]>>, + pub db_handles: Arc>>>, } impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { @@ -42,17 +42,17 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { let table = Tables::from_str(T::NAME).expect("Requested table should be part of `Tables`."); - let dbi_handle = handles.get_mut(table as usize).expect("should exist"); - if dbi_handle.is_none() { - *dbi_handle = Some( - self.inner + match handles.get_mut(table as usize) { + Some(Some(index)) => return Ok(*index), + Some(None) | None => { + // Either guard contains no table handle, or no available handle guard. + return Ok(self + .inner .open_db(Some(T::NAME)) .map_err(|e| DatabaseError::InitCursor(e.into()))? - .dbi(), - ); + .dbi()) + } } - - Ok(dbi_handle.expect("is some; qed")) } /// Create db Cursor diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 725b2c9dbf6b..eb2cbb3f75c4 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -105,7 +105,11 @@ use std::path::Path; /// Opens up an existing database or creates a new one at the specified path. Creates tables if /// necessary. Read/Write mode. -pub fn init_db>(path: P, log_level: Option) -> eyre::Result { +pub fn init_db, T: TableMetadata>( + path: P, + log_level: Option, + non_core_tables: Option>, +) -> eyre::Result { use crate::version::{check_db_version_file, create_db_version_file, DatabaseVersionError}; let rpath = path.as_ref(); @@ -122,8 +126,12 @@ pub fn init_db>(path: P, log_level: Option) -> eyre::Re } #[cfg(feature = "mdbx")] { - let db = DatabaseEnv::open(rpath, EnvKind::RW, log_level)?; - db.create_tables()?; + let mut max_tables = Tables::NUM_TABLES; + if let Some(non_core) = &non_core_tables { + max_tables += non_core.len(); + }; + let db = DatabaseEnv::open(rpath, EnvKind::RW, log_level, max_tables)?; + db.create_tables(non_core_tables)?; Ok(db) } #[cfg(not(feature = "mdbx"))] @@ -133,10 +141,18 @@ pub fn init_db>(path: P, log_level: Option) -> eyre::Re } /// Opens up an existing database. Read only mode. It doesn't create it or create tables if missing. -pub fn open_db_read_only(path: &Path, log_level: Option) -> eyre::Result { +pub fn open_db_read_only( + path: &Path, + log_level: Option, + non_core_tables: Option>, +) -> eyre::Result { #[cfg(feature = "mdbx")] { - Env::::open(path, EnvKind::RO, log_level) + let mut max_tables = NUM_TABLES; + if let Some(non_core) = non_core_tables { + max_tables += non_core.len(); + }; + Env::::open(path, EnvKind::RO, log_level, max_tables) .with_context(|| format!("Could not open database at path: {}", path.display())) } #[cfg(not(feature = "mdbx"))] @@ -147,10 +163,18 @@ pub fn open_db_read_only(path: &Path, log_level: Option) -> eyre::Resu /// Opens up an existing database. Read/Write mode. It doesn't create it or create tables if /// missing. -pub fn open_db(path: &Path, log_level: Option) -> eyre::Result { +pub fn open_db( + path: &Path, + log_level: Option, + non_core_tables: Option>, +) -> eyre::Result { #[cfg(feature = "mdbx")] { - Env::::open(path, EnvKind::RW, log_level) + let mut max_tables = NUM_TABLES; + if let Some(non_core) = non_core_tables { + max_tables += non_core.len(); + }; + Env::::open(path, EnvKind::RW, log_level, max_tables) .with_context(|| format!("Could not open database at path: {}", path.display())) } #[cfg(not(feature = "mdbx"))] @@ -163,7 +187,7 @@ pub fn open_db(path: &Path, log_level: Option) -> eyre::Result Arc { Arc::new( - init_db(tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), None) - .expect(ERROR_DB_CREATION), + init_db::<_, Tables>( + tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), + None, + None, + ) + .expect(ERROR_DB_CREATION), ) } /// Create read/write database for testing pub fn create_test_rw_db_with_path>(path: P) -> Arc { - Arc::new(init_db(path.as_ref(), None).expect(ERROR_DB_CREATION)) + Arc::new(init_db::<&Path, Tables>(path.as_ref(), None, None).expect(ERROR_DB_CREATION)) } /// Create read only database for testing - pub fn create_test_ro_db() -> Arc { + pub fn create_test_ro_db() -> Arc { let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); { - init_db(path.as_path(), None).expect(ERROR_DB_CREATION); + init_db::<&Path, T>(path.as_path(), None, None).expect(ERROR_DB_CREATION); } - Arc::new(open_db_read_only(path.as_path(), None).expect(ERROR_DB_OPEN)) + Arc::new(open_db_read_only::(path.as_path(), None, None).expect(ERROR_DB_OPEN)) } } #[cfg(test)] mod tests { + use super::*; + use crate::{ init_db, version::{db_version_file_path, DatabaseVersionError}, @@ -209,16 +239,17 @@ mod tests { #[test] fn db_version() { let path = tempdir().unwrap(); - // Database is empty { - let db = init_db(&path, None); + let non_core_tables: Option> = None; + let db = init_db(&path, None, non_core_tables); assert_matches!(db, Ok(_)); } // Database is not empty, current version is the same as in the file { - let db = init_db(&path, None); + let non_core_tables: Option> = None; + let db = init_db(&path, None, non_core_tables); assert_matches!(db, Ok(_)); } @@ -226,7 +257,8 @@ mod tests { { std::fs::write(path.path().join(db_version_file_path(&path)), "invalid-version") .unwrap(); - let db = init_db(&path, None); + let non_core_tables: Option> = None; + let db = init_db(&path, None, non_core_tables); assert!(db.is_err()); assert_matches!( db.unwrap_err().downcast_ref::(), @@ -237,7 +269,8 @@ mod tests { // Database is not empty, version file contains not matching version { std::fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap(); - let db = init_db(&path, None); + let non_core_tables: Option> = None; + let db = init_db(&path, None, non_core_tables); assert!(db.is_err()); assert_matches!( db.unwrap_err().downcast_ref::(), diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 1c3c148c23c3..632e04120c3a 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -19,7 +19,6 @@ pub(crate) mod utils; use crate::abstraction::table::Table; pub use raw::{RawDupSort, RawKey, RawTable, RawValue, TableRawRow}; -use std::{fmt::Display, str::FromStr}; /// Declaration of all Database tables. use crate::{ @@ -89,65 +88,101 @@ pub trait TableViewer { fn view(&self) -> Result; } +/// Information about a table. +pub trait TableMetadata { + /// Number of tables + const NUM_TABLES: usize; + + /// Gets every variant in the enum where the table is defined. + /// Note that other table-containing enums may exist. + fn all_tables_in_group() -> Vec + where + Self: Sized; + + /// The name of the given table in database + fn name(&self) -> &str; + + /// The type of the given table in database + fn table_type(&self) -> TableType; + + /// Allows to operate on specific table type + fn view(&self, visitor: &T) -> Result + where + T: TableViewer; +} + +/// Helper methods for table documentation +/// +/// Useage: `tables!(tables_enum_name, num_tables, [(table_struct, table_type), ...])` +#[macro_export] macro_rules! tables { - ([$(($table:ident, $type:expr)),*]) => { + + ($enum_name:ident, $count:expr, [$(($table:ident, $type:expr)),*]) => { + use std::fmt::Debug; + + #[derive(Debug, PartialEq, Copy, Clone)] /// Default tables that should be present inside database. - pub enum Tables { + pub enum $enum_name { $( #[doc = concat!("Represents a ", stringify!($table), " table")] $table, )* } - impl Tables { + impl $crate::TableMetadata for $enum_name { + const NUM_TABLES: usize = $count; + /// Array of all tables in database - pub const ALL: [Tables; NUM_TABLES] = [$(Tables::$table,)*]; + fn all_tables_in_group() -> Vec<$enum_name> { + vec![$($enum_name::$table,)*] + } /// The name of the given table in database - pub const fn name(&self) -> &str { + fn name(&self) -> &str { match self { - $(Tables::$table => { + $($enum_name::$table => { $table::NAME },)* } } /// The type of the given table in database - pub const fn table_type(&self) -> TableType { + fn table_type(&self) -> $crate::TableType { match self { - $(Tables::$table => { + $($enum_name::$table => { $type },)* } } /// Allows to operate on specific table type - pub fn view(&self, visitor: &T) -> Result + fn view(&self, visitor: &T) -> Result where - T: TableViewer, + T: $crate::TableViewer, { match self { - $(Tables::$table => { + $($enum_name::$table => { visitor.view::<$table>() },)* } } } - impl Display for Tables { + + impl std::fmt::Display for $enum_name { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name()) } } - impl FromStr for Tables { + impl std::str::FromStr for $enum_name { type Err = String; fn from_str(s: &str) -> Result { match s { $($table::NAME => { - return Ok(Tables::$table) + return Ok($enum_name::$table) },)* _ => { return Err("Unknown table".to_string()) @@ -158,34 +193,38 @@ macro_rules! tables { }; } -tables!([ - (CanonicalHeaders, TableType::Table), - (HeaderTD, TableType::Table), - (HeaderNumbers, TableType::Table), - (Headers, TableType::Table), - (BlockBodyIndices, TableType::Table), - (BlockOmmers, TableType::Table), - (BlockWithdrawals, TableType::Table), - (TransactionBlock, TableType::Table), - (Transactions, TableType::Table), - (TxHashNumber, TableType::Table), - (Receipts, TableType::Table), - (PlainAccountState, TableType::Table), - (PlainStorageState, TableType::DupSort), - (Bytecodes, TableType::Table), - (AccountHistory, TableType::Table), - (StorageHistory, TableType::Table), - (AccountChangeSet, TableType::DupSort), - (StorageChangeSet, TableType::DupSort), - (HashedAccount, TableType::Table), - (HashedStorage, TableType::DupSort), - (AccountsTrie, TableType::Table), - (StoragesTrie, TableType::DupSort), - (TxSenders, TableType::Table), - (SyncStage, TableType::Table), - (SyncStageProgress, TableType::Table), - (PruneCheckpoints, TableType::Table) -]); +tables!( + Tables, + NUM_TABLES, + [ + (CanonicalHeaders, TableType::Table), + (HeaderTD, TableType::Table), + (HeaderNumbers, TableType::Table), + (Headers, TableType::Table), + (BlockBodyIndices, TableType::Table), + (BlockOmmers, TableType::Table), + (BlockWithdrawals, TableType::Table), + (TransactionBlock, TableType::Table), + (Transactions, TableType::Table), + (TxHashNumber, TableType::Table), + (Receipts, TableType::Table), + (PlainAccountState, TableType::Table), + (PlainStorageState, TableType::DupSort), + (Bytecodes, TableType::Table), + (AccountHistory, TableType::Table), + (StorageHistory, TableType::Table), + (AccountChangeSet, TableType::DupSort), + (StorageChangeSet, TableType::DupSort), + (HashedAccount, TableType::Table), + (HashedStorage, TableType::DupSort), + (AccountsTrie, TableType::Table), + (StoragesTrie, TableType::DupSort), + (TxSenders, TableType::Table), + (SyncStage, TableType::Table), + (SyncStageProgress, TableType::Table), + (PruneCheckpoints, TableType::Table) + ] +); #[macro_export] /// Macro to declare key value table. @@ -473,4 +512,9 @@ mod tests { assert_eq!(table.name(), table_name); } } + + #[test] + fn check_tables_length() { + assert_eq!(Tables::all_tables_in_group().len(), NUM_TABLES); + } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 506cc59ebab3..86dcf15a00ca 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -5,7 +5,9 @@ use crate::{ HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; +use reth_db::{ + database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv, TableMetadata, +}; use reth_interfaces::Result; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -60,13 +62,14 @@ impl ProviderFactory { impl ProviderFactory { /// create new database provider by passing a path. [`ProviderFactory`] will own the database /// instance. - pub fn new_with_database_path>( + pub fn new_with_database_path, T: TableMetadata>( path: P, chain_spec: Arc, log_level: Option, + non_core_tables: Option>, ) -> Result> { Ok(ProviderFactory:: { - db: init_db(path, log_level) + db: init_db(path, log_level, non_core_tables) .map_err(|e| reth_interfaces::Error::Custom(e.to_string()))?, chain_spec, }) @@ -392,7 +395,7 @@ mod tests { use crate::{BlockHashReader, BlockNumReader}; use reth_db::{ test_utils::{create_test_rw_db, ERROR_TEMPDIR}, - DatabaseEnv, + DatabaseEnv, Tables, }; use reth_primitives::{ChainSpecBuilder, H256}; use std::sync::Arc; @@ -436,6 +439,7 @@ mod tests { tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), Arc::new(chain_spec), None, + None::>, ) .unwrap(); From a534ee7a73790b8ae038cc11681e5957528de896 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 23 Aug 2023 18:15:55 +1000 Subject: [PATCH 02/15] feat(reth): use alias for non-core table None --- bin/reth/Cargo.toml | 1 + bin/reth/src/chain/import.rs | 4 ++-- bin/reth/src/chain/init.rs | 4 ++-- bin/reth/src/db/clear.rs | 2 +- bin/reth/src/db/diff.rs | 9 +++++---- bin/reth/src/db/get.rs | 4 ++-- bin/reth/src/db/list.rs | 2 +- bin/reth/src/db/mod.rs | 20 ++++++++++--------- bin/reth/src/debug_cmd/execution.rs | 4 ++-- bin/reth/src/debug_cmd/in_memory_merkle.rs | 4 ++-- bin/reth/src/debug_cmd/merkle.rs | 4 ++-- bin/reth/src/p2p/mod.rs | 4 ++-- bin/reth/src/prometheus_exporter.rs | 5 +++-- bin/reth/src/recover/storage_tries.rs | 4 ++-- bin/reth/src/stage/drop.rs | 4 ++-- bin/reth/src/stage/dump/mod.rs | 6 +++--- bin/reth/src/stage/run.rs | 4 ++-- bin/reth/src/stage/unwind.rs | 4 ++-- .../storage/db/src/implementation/mdbx/mod.rs | 5 ++--- crates/storage/db/src/lib.rs | 12 ++++------- crates/storage/db/src/tables/mod.rs | 13 ++++++++---- examples/db-access.rs | 4 ++-- examples/rpc-db.rs | 4 ++-- 23 files changed, 66 insertions(+), 61 deletions(-) diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 24e35172428a..a73924cad554 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -17,6 +17,7 @@ reth-provider = { workspace = true, features = ["test-utils"] } reth-revm = { path = "../../crates/revm" } reth-revm-inspectors = { path = "../../crates/revm/revm-inspectors" } reth-stages = { path = "../../crates/stages" } +reth-stage-extensions = { path = "../../crates/stage-extensions" } reth-interfaces = { workspace = true, features = ["test-utils", "clap"] } reth-transaction-pool.workspace = true reth-beacon-consensus = { path = "../../crates/consensus/beacon" } diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 429e5c796194..c191af16d11b 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -12,7 +12,7 @@ use reth_provider::{ProviderFactory, StageCheckpointReader}; use crate::args::{utils::genesis_value_parser, DatabaseArgs}; use reth_config::Config; -use reth_db::{database::Database, init_db}; +use reth_db::{database::Database, init_db, NO_TABLES}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, @@ -90,7 +90,7 @@ impl ImportCommand { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); info!(target: "reth::cli", "Database opened"); debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); diff --git a/bin/reth/src/chain/init.rs b/bin/reth/src/chain/init.rs index e25cbf8a2cfb..d822db6d47a0 100644 --- a/bin/reth/src/chain/init.rs +++ b/bin/reth/src/chain/init.rs @@ -4,7 +4,7 @@ use crate::{ init::init_genesis, }; use clap::Parser; -use reth_db::init_db; +use reth_db::{init_db, NO_TABLES}; use reth_primitives::ChainSpec; use std::sync::Arc; use tracing::info; @@ -52,7 +52,7 @@ impl InitCommand { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(&db_path, self.db.log_level)?); + let db = Arc::new(init_db(&db_path, self.db.log_level, NO_TABLES)?); info!(target: "reth::cli", "Database opened"); info!(target: "reth::cli", "Writing genesis block"); diff --git a/bin/reth/src/db/clear.rs b/bin/reth/src/db/clear.rs index 0bd816db8b45..c04bbb5ec6ec 100644 --- a/bin/reth/src/db/clear.rs +++ b/bin/reth/src/db/clear.rs @@ -4,7 +4,7 @@ use reth_db::{ database::Database, table::Table, transaction::{DbTx, DbTxMut}, - TableViewer, Tables, + TableViewer, Tables, TableMetadata, }; /// The arguments for the `reth db clear` command diff --git a/bin/reth/src/db/diff.rs b/bin/reth/src/db/diff.rs index b7527b0ab188..46fc35a518bf 100644 --- a/bin/reth/src/db/diff.rs +++ b/bin/reth/src/db/diff.rs @@ -19,8 +19,9 @@ use reth_db::{ AccountChangeSet, AccountHistory, AccountsTrie, BlockBodyIndices, BlockOmmers, BlockWithdrawals, Bytecodes, CanonicalHeaders, DatabaseEnvRO, HashedAccount, HashedStorage, HeaderNumbers, HeaderTD, Headers, PlainAccountState, PlainStorageState, PruneCheckpoints, - Receipts, StorageChangeSet, StorageHistory, StoragesTrie, SyncStage, SyncStageProgress, Tables, - TransactionBlock, Transactions, TxHashNumber, TxSenders, + Receipts, StorageChangeSet, StorageHistory, StoragesTrie, SyncStage, SyncStageProgress, + TableMetadata, Tables, TransactionBlock, Transactions, TxHashNumber, TxSenders, + NO_TABLES, }; use tracing::info; @@ -61,11 +62,11 @@ impl Command { pub fn execute(self, tool: &DbTool<'_, DatabaseEnvRO>) -> eyre::Result<()> { // open second db let second_db_path: PathBuf = self.secondary_datadir.join("db").into(); - let second_db = open_db_read_only(&second_db_path, self.second_db.log_level)?; + let second_db = open_db_read_only(&second_db_path, self.second_db.log_level, NO_TABLES)?; let tables = match self.table { Some(table) => vec![table], - None => Tables::ALL.to_vec(), + None => Tables::all_tables_in_group(), }; for table in tables { diff --git a/bin/reth/src/db/get.rs b/bin/reth/src/db/get.rs index 5e4b127b37f6..4d386251184b 100644 --- a/bin/reth/src/db/get.rs +++ b/bin/reth/src/db/get.rs @@ -1,7 +1,7 @@ use crate::utils::DbTool; use clap::Parser; -use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables}; +use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables, TableMetadata}; use tracing::error; /// The arguments for the `reth db get` command @@ -13,7 +13,7 @@ pub struct Command { #[arg()] pub table: Tables, - /// The key to get content for + /// The key to get content for #[arg(value_parser = maybe_json_value_parser)] pub key: String, } diff --git a/bin/reth/src/db/list.rs b/bin/reth/src/db/list.rs index 4fc1fb7cad65..f8973a861b42 100644 --- a/bin/reth/src/db/list.rs +++ b/bin/reth/src/db/list.rs @@ -2,7 +2,7 @@ use super::tui::DbListTUI; use crate::utils::{DbTool, ListFilter}; use clap::Parser; use eyre::WrapErr; -use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables}; +use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables, TableMetadata}; use std::cell::RefCell; use tracing::error; diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 9671f8d795ba..7c12bb2e8f1b 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -12,7 +12,7 @@ use reth_db::{ database::Database, open_db, open_db_read_only, version::{get_db_version, DatabaseVersionError, DB_VERSION}, - Tables, + TableMetadata, Tables, NO_TABLES, }; use reth_primitives::ChainSpec; use std::{ @@ -100,7 +100,7 @@ impl Command { match self.command { // TODO: We'll need to add this on the DB trait. Subcommands::Stats { .. } => { - let db = open_db_read_only(&db_path, self.db.log_level)?; + let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?; let tool = DbTool::new(&db, self.chain.clone())?; let mut stats_table = ComfyTable::new(); stats_table.load_preset(comfy_table::presets::ASCII_MARKDOWN); @@ -114,8 +114,10 @@ impl Command { ]); tool.db.view(|tx| { - let mut tables = - Tables::ALL.iter().map(|table| table.name()).collect::>(); + let mut tables = Tables::all_tables_in_group() + .into_iter() + .map(|table| table.name()) + .collect::>(); tables.sort(); let mut total_size = 0; for table in tables { @@ -171,17 +173,17 @@ impl Command { println!("{stats_table}"); } Subcommands::List(command) => { - let db = open_db_read_only(&db_path, self.db.log_level)?; + let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?; let tool = DbTool::new(&db, self.chain.clone())?; command.execute(&tool)?; } Subcommands::Diff(command) => { - let db = open_db_read_only(&db_path, self.db.log_level)?; + let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?; let tool = DbTool::new(&db, self.chain.clone())?; command.execute(&tool)?; } Subcommands::Get(command) => { - let db = open_db_read_only(&db_path, self.db.log_level)?; + let db = open_db_read_only(&db_path, self.db.log_level, NO_TABLES)?; let tool = DbTool::new(&db, self.chain.clone())?; command.execute(&tool)?; } @@ -201,12 +203,12 @@ impl Command { } } - let db = open_db(&db_path, self.db.log_level)?; + let db = open_db(&db_path, self.db.log_level, NO_TABLES)?; let mut tool = DbTool::new(&db, self.chain.clone())?; tool.drop(db_path)?; } Subcommands::Clear(command) => { - let db = open_db(&db_path, self.db.log_level)?; + let db = open_db(&db_path, self.db.log_level, NO_TABLES)?; command.execute(&db)?; } Subcommands::Version => { diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 3550413897ef..54bbd4fcbebb 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -11,7 +11,7 @@ use clap::Parser; use futures::{stream::select as stream_select, StreamExt}; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; -use reth_db::{database::Database, init_db, DatabaseEnv}; +use reth_db::{database::Database, init_db, DatabaseEnv, NO_TABLES}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -204,7 +204,7 @@ impl Command { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); fs::create_dir_all(&db_path)?; - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); init_genesis(db.clone(), self.chain.clone())?; diff --git a/bin/reth/src/debug_cmd/in_memory_merkle.rs b/bin/reth/src/debug_cmd/in_memory_merkle.rs index 701b2196fa57..bd6e35b4c539 100644 --- a/bin/reth/src/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/debug_cmd/in_memory_merkle.rs @@ -8,7 +8,7 @@ use crate::{ use backon::{ConstantBuilder, Retryable}; use clap::Parser; use reth_config::Config; -use reth_db::{init_db, DatabaseEnv}; +use reth_db::{init_db, DatabaseEnv, NO_TABLES}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; @@ -114,7 +114,7 @@ impl Command { fs::create_dir_all(&db_path)?; // initialize the database - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); let factory = ProviderFactory::new(&db, self.chain.clone()); let provider = factory.provider()?; diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 47e37c36996e..d26ffa3a0236 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -9,7 +9,7 @@ use backon::{ConstantBuilder, Retryable}; use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; -use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv}; +use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv, NO_TABLES}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_interfaces::{consensus::Consensus, p2p::full_block::FullBlockClient}; use reth_network::NetworkHandle; @@ -124,7 +124,7 @@ impl Command { fs::create_dir_all(&db_path)?; // initialize the database - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); let factory = ProviderFactory::new(&db, self.chain.clone()); let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?; diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index 4fc87b294cac..e0fbc55f5dc1 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -11,7 +11,7 @@ use crate::{ use backon::{ConstantBuilder, Retryable}; use clap::{Parser, Subcommand}; use reth_config::Config; -use reth_db::open_db; +use reth_db::{open_db, NO_TABLES}; use reth_discv4::NatResolver; use reth_interfaces::p2p::bodies::client::BodiesClient; use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord}; @@ -104,7 +104,7 @@ impl Command { /// Execute `p2p` command pub async fn execute(&self) -> eyre::Result<()> { let tempdir = tempfile::TempDir::new()?; - let noop_db = Arc::new(open_db(&tempdir.into_path(), self.db.log_level)?); + let noop_db = Arc::new(open_db(&tempdir.into_path(), self.db.log_level, NO_TABLES)?); // add network name to data dir let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); diff --git a/bin/reth/src/prometheus_exporter.rs b/bin/reth/src/prometheus_exporter.rs index c1cea799d918..74bfa9ae0204 100644 --- a/bin/reth/src/prometheus_exporter.rs +++ b/bin/reth/src/prometheus_exporter.rs @@ -6,7 +6,7 @@ use hyper::{ }; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_util::layers::{PrefixLayer, Stack}; -use reth_db::{database::Database, tables, DatabaseEnv}; +use reth_db::{database::Database, tables, DatabaseEnv, TableMetadata}; use reth_metrics::metrics::{self, absolute_counter, describe_counter, Unit}; use std::{convert::Infallible, net::SocketAddr, sync::Arc}; @@ -76,7 +76,8 @@ pub(crate) async fn initialize( // TODO: A generic stats abstraction for other DB types to deduplicate this and `reth db // stats` let _ = db.view(|tx| { - for table in tables::Tables::ALL.iter().map(|table| table.name()) { + for t in &tables::Tables::all_tables_in_group() { + let table = t.name(); let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?; diff --git a/bin/reth/src/recover/storage_tries.rs b/bin/reth/src/recover/storage_tries.rs index 564410eca06f..235364e72540 100644 --- a/bin/reth/src/recover/storage_tries.rs +++ b/bin/reth/src/recover/storage_tries.rs @@ -8,7 +8,7 @@ use clap::Parser; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRW}, init_db, tables, - transaction::DbTx, + transaction::DbTx, NO_TABLES, }; use reth_primitives::ChainSpec; use reth_provider::{BlockNumReader, HeaderProvider, ProviderError, ProviderFactory}; @@ -53,7 +53,7 @@ impl Command { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); fs::create_dir_all(&db_path)?; - let db = Arc::new(init_db(db_path, None)?); + let db = Arc::new(init_db(db_path, None, NO_TABLES)?); debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); init_genesis(db.clone(), self.chain.clone())?; diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 33298d287fd5..4b947578dd6a 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -6,7 +6,7 @@ use crate::{ utils::DbTool, }; use clap::Parser; -use reth_db::{database::Database, open_db, tables, transaction::DbTxMut, DatabaseEnv}; +use reth_db::{database::Database, open_db, tables, transaction::DbTxMut, DatabaseEnv, NO_TABLES}; use reth_primitives::{fs, stage::StageId, ChainSpec}; use std::sync::Arc; use tracing::info; @@ -55,7 +55,7 @@ impl Command { let db_path = data_dir.db_path(); fs::create_dir_all(&db_path)?; - let db = open_db(db_path.as_ref(), self.db.log_level)?; + let db = open_db(db_path.as_ref(), self.db.log_level, NO_TABLES)?; let tool = DbTool::new(&db, self.chain.clone())?; diff --git a/bin/reth/src/stage/dump/mod.rs b/bin/reth/src/stage/dump/mod.rs index 792777d1a672..57926e2ae8b8 100644 --- a/bin/reth/src/stage/dump/mod.rs +++ b/bin/reth/src/stage/dump/mod.rs @@ -6,7 +6,7 @@ use crate::{ use clap::Parser; use reth_db::{ cursor::DbCursorRO, database::Database, init_db, table::TableImporter, tables, - transaction::DbTx, DatabaseEnv, + transaction::DbTx, DatabaseEnv, NO_TABLES, }; use reth_primitives::ChainSpec; use std::{path::PathBuf, sync::Arc}; @@ -101,7 +101,7 @@ impl Command { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); info!(target: "reth::cli", "Database opened"); let tool = DbTool::new(&db, self.chain.clone())?; @@ -137,7 +137,7 @@ pub(crate) fn setup( info!(target: "reth::cli", ?output_db, "Creating separate db"); - let output_db = init_db(output_db, None)?; + let output_db = init_db(output_db, None, NO_TABLES)?; output_db.update(|tx| { tx.import_table_with_range::( diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 0ec5d7d4fccc..a593081ed60b 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -10,7 +10,7 @@ use crate::{ use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; -use reth_db::init_db; +use reth_db::{init_db, NO_TABLES}; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_primitives::ChainSpec; use reth_provider::{ProviderFactory, StageCheckpointReader}; @@ -122,7 +122,7 @@ impl Command { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(db_path, self.db.log_level)?); + let db = Arc::new(init_db(db_path, self.db.log_level, NO_TABLES)?); info!(target: "reth::cli", "Database opened"); let factory = ProviderFactory::new(&db, self.chain.clone()); diff --git a/bin/reth/src/stage/unwind.rs b/bin/reth/src/stage/unwind.rs index d88346bc714c..c805d0da0be8 100644 --- a/bin/reth/src/stage/unwind.rs +++ b/bin/reth/src/stage/unwind.rs @@ -5,7 +5,7 @@ use crate::{ dirs::{DataDirPath, MaybePlatformPath}, }; use clap::{Parser, Subcommand}; -use reth_db::{cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx}; +use reth_db::{cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx, NO_TABLES}; use reth_primitives::{BlockHashOrNumber, ChainSpec}; use reth_provider::{BlockExecutionWriter, ProviderFactory}; use std::{ops::RangeInclusive, sync::Arc}; @@ -58,7 +58,7 @@ impl Command { eyre::bail!("Database {db_path:?} does not exist.") } - let db = open_db(db_path.as_ref(), self.db.log_level)?; + let db = open_db(db_path.as_ref(), self.db.log_level, NO_TABLES)?; let range = self.command.unwind_range(&db)?; diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 78844bd2fec3..fd92bc6ba736 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -179,7 +179,7 @@ mod tests { tables::{AccountHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState}, test_utils::*, transaction::{DbTx, DbTxMut}, - AccountChangeSet, DatabaseError, NUM_TABLES, + AccountChangeSet, DatabaseError, NUM_TABLES, NO_TABLES, }; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{NoWriteMap, WriteMap}; @@ -189,11 +189,10 @@ mod tests { /// Create database for testing fn create_test_db(kind: EnvKind) -> Arc> { - let non_core_tables: Option> = None; Arc::new(create_test_db_with_path( kind, &tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), - non_core_tables, + NO_TABLES, )) } diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index eb2cbb3f75c4..ab831ed22377 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -241,15 +241,13 @@ mod tests { let path = tempdir().unwrap(); // Database is empty { - let non_core_tables: Option> = None; - let db = init_db(&path, None, non_core_tables); + let db = init_db(&path, None, NO_TABLES); assert_matches!(db, Ok(_)); } // Database is not empty, current version is the same as in the file { - let non_core_tables: Option> = None; - let db = init_db(&path, None, non_core_tables); + let db = init_db(&path, None, NO_TABLES); assert_matches!(db, Ok(_)); } @@ -257,8 +255,7 @@ mod tests { { std::fs::write(path.path().join(db_version_file_path(&path)), "invalid-version") .unwrap(); - let non_core_tables: Option> = None; - let db = init_db(&path, None, non_core_tables); + let db = init_db(&path, None, NO_TABLES); assert!(db.is_err()); assert_matches!( db.unwrap_err().downcast_ref::(), @@ -269,8 +266,7 @@ mod tests { // Database is not empty, version file contains not matching version { std::fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap(); - let non_core_tables: Option> = None; - let db = init_db(&path, None, non_core_tables); + let db = init_db(&path, None, NO_TABLES); assert!(db.is_err()); assert_matches!( db.unwrap_err().downcast_ref::(), diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 632e04120c3a..e30d9cfcb736 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -52,6 +52,11 @@ pub enum TableType { /// Number of tables that should be present inside database. pub const NUM_TABLES: usize = 26; +/// An alias used in core stages to represent non-core tables (which aren't required). +/// +/// This allows core stages to be unaware of non-core table types. +pub const NO_TABLES: Option> = None; + /// The general purpose of this is to use with a combination of Tables enum, /// by implementing a `TableViewer` trait you can operate on db tables in an abstract way. /// @@ -89,7 +94,7 @@ pub trait TableViewer { } /// Information about a table. -pub trait TableMetadata { +pub trait TableMetadata: PartialEq { /// Number of tables const NUM_TABLES: usize; @@ -97,10 +102,10 @@ pub trait TableMetadata { /// Note that other table-containing enums may exist. fn all_tables_in_group() -> Vec where - Self: Sized; + Self: Sized + 'static; /// The name of the given table in database - fn name(&self) -> &str; + fn name(&self) -> &'static str; /// The type of the given table in database fn table_type(&self) -> TableType; @@ -139,7 +144,7 @@ macro_rules! tables { } /// The name of the given table in database - fn name(&self) -> &str { + fn name(&self) -> &'static str { match self { $($enum_name::$table => { $table::NAME diff --git a/examples/db-access.rs b/examples/db-access.rs index 60089dba75b2..03f567995ad9 100644 --- a/examples/db-access.rs +++ b/examples/db-access.rs @@ -1,4 +1,4 @@ -use reth_db::open_db_read_only; +use reth_db::{open_db_read_only, NO_TABLES}; use reth_primitives::{Address, ChainSpecBuilder, H256, U256}; use reth_provider::{ AccountReader, BlockReader, BlockSource, HeaderProvider, ProviderFactory, ReceiptProvider, @@ -18,7 +18,7 @@ fn main() -> eyre::Result<()> { // Opens a RO handle to the database file. // TODO: Should be able to do `ProviderFactory::new_with_db_path_ro(...)` instead of // doing in 2 steps. - let db = open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None)?; + let db = open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None, NO_TABLES)?; // Instantiate a provider factory for Ethereum mainnet using the provided DB. // TODO: Should the DB version include the spec so that you do not need to specify it here? diff --git a/examples/rpc-db.rs b/examples/rpc-db.rs index 5ff672f82ced..35888302d890 100644 --- a/examples/rpc-db.rs +++ b/examples/rpc-db.rs @@ -1,5 +1,5 @@ // Talking to the DB -use reth_db::open_db_read_only; +use reth_db::{open_db_read_only, NO_TABLES}; use reth_primitives::ChainSpecBuilder; use reth_provider::{providers::BlockchainProvider, ProviderFactory}; @@ -29,7 +29,7 @@ use std::{path::Path, sync::Arc}; #[tokio::main] async fn main() -> eyre::Result<()> { // 1. Setup the DB - let db = Arc::new(open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None)?); + let db = Arc::new(open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None, NO_TABLES)?); let spec = Arc::new(ChainSpecBuilder::mainnet().build()); let factory = ProviderFactory::new(db.clone(), spec.clone()); From a31ea82ce001579d2b048b30f6335b0c6e0a849a Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 23 Aug 2023 19:17:15 +1000 Subject: [PATCH 03/15] feat(stage-extension): add non-core stage via trait --- Cargo.lock | 18 +++ Cargo.toml | 1 + bin/reth/src/node/mod.rs | 21 ++-- crates/stage-extensions/Cargo.toml | 25 ++++ crates/stage-extensions/src/address.rs | 159 +++++++++++++++++++++++++ crates/stage-extensions/src/lib.rs | 23 ++++ crates/stages/src/stages/mod.rs | 3 + crates/stages/src/stages/non_core.rs | 25 ++++ crates/stages/src/test_utils/mod.rs | 2 +- crates/stages/src/test_utils/runner.rs | 8 +- 10 files changed, 273 insertions(+), 12 deletions(-) create mode 100644 crates/stage-extensions/Cargo.toml create mode 100644 crates/stage-extensions/src/address.rs create mode 100644 crates/stage-extensions/src/lib.rs create mode 100644 crates/stages/src/stages/non_core.rs diff --git a/Cargo.lock b/Cargo.lock index 4a8d706e779a..cb4c0613f6a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5265,6 +5265,7 @@ dependencies = [ "reth-rpc-builder", "reth-rpc-engine-api", "reth-rpc-types", + "reth-stage-extensions", "reth-stages", "reth-tasks", "reth-tracing", @@ -6098,6 +6099,23 @@ dependencies = [ "reth-rpc-types", ] +[[package]] +name = "reth-stage-extensions" +version = "0.1.0-alpha.6" +dependencies = [ + "assert_matches", + "async-trait", + "reth-db", + "reth-interfaces", + "reth-primitives", + "reth-provider", + "reth-revm", + "reth-rlp", + "reth-stages", + "serde", + "tokio", +] + [[package]] name = "reth-stages" version = "0.1.0-alpha.6" diff --git a/Cargo.toml b/Cargo.toml index 80438385205d..fee55d15095f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "crates/rpc/rpc-types", "crates/rpc/rpc-testing-util", "crates/stages", + "crates/stage-extensions", "crates/storage/codecs", "crates/storage/db", "crates/storage/libmdbx-rs", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 0fd37caac6b0..ee139dc3d8e3 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -27,7 +27,7 @@ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; use reth_config::{config::PruneConfig, Config}; -use reth_db::{database::Database, init_db, DatabaseEnv}; +use reth_db::{database::Database, init_db, DatabaseEnv, TableMetadata}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -55,12 +55,13 @@ use reth_prune::BatchSizes; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; +use reth_stage_extensions::{AddressStage, NonCoreTable}; use reth_stages::{ prelude::*, stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, - StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, + StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, NonCoreStage, }, MetricEventsSender, MetricsListener, }; @@ -212,7 +213,8 @@ impl NodeCommand { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(&db_path, self.db.log_level)?); + let optional_tables = Some(NonCoreTable::all_tables_in_group()); + let db = Arc::new(init_db(&db_path, self.db.log_level, optional_tables)?); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint(Arc::clone(&db)).await?; @@ -767,7 +769,7 @@ impl NodeCommand { let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; - let pipeline = builder + let mut pipeline_builder = builder .with_tip_sender(tip_tx) .with_metrics_tx(metrics_tx.clone()) .add_stages( @@ -816,9 +818,14 @@ impl NodeCommand { )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, - )), - ) - .build(db, self.chain.clone()); + )) + ); + // TODO: Get non-core stages from CLI args or config file. + let non_core_stages = vec![AddressStage::new()]; + for stage in non_core_stages { + pipeline_builder = pipeline_builder.add_stage(stage); + }; + let pipeline = pipeline_builder.build(db, self.chain.clone()); Ok(pipeline) } diff --git a/crates/stage-extensions/Cargo.toml b/crates/stage-extensions/Cargo.toml new file mode 100644 index 000000000000..b001f82cda4c --- /dev/null +++ b/crates/stage-extensions/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "reth-stage-extensions" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +[dependencies] +# reth +reth-db = { path = "../../crates/storage/db", features = ["mdbx", "test-utils"] } +reth-interfaces = { workspace = true, features = ["test-utils"] } +reth-primitives = { workspace = true, features = ["arbitrary"] } +reth-provider = { workspace = true, features = ["test-utils"] } +reth-rlp.workspace = true +reth-revm = { path = "../revm" } +reth-stages = { path = "../../crates/stages", features = ["test-utils"] } + +# misc +assert_matches = "1.5.0" +async-trait.workspace = true +serde.workspace = true +tokio = { workspace = true, features = ["rt", "sync", "macros"] } \ No newline at end of file diff --git a/crates/stage-extensions/src/address.rs b/crates/stage-extensions/src/address.rs new file mode 100644 index 000000000000..d07944945cb9 --- /dev/null +++ b/crates/stage-extensions/src/address.rs @@ -0,0 +1,159 @@ +//! A stage for indexing the addresses that appear in historical transactions. + +use async_trait::async_trait; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + database::Database, + models::ShardedKey, + table, + transaction::DbTxMut, +}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + Address, IntegerList, +}; +use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError}; +use reth_stages::{ + stages::NonCoreStage, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, +}; +use serde::{Deserialize, Serialize}; + +// === Stage === +// For setting up a stage. + +/// A stage for indexing addresses that appear. +#[derive(Debug)] +pub struct AddressStage { + /// Threshold block number to commit to db after. + pub commit_threshold: u64, +} + +impl NonCoreStage for AddressStage { + type Config = AddressAppearancesConfig; + + fn new() -> Self { + Self {commit_threshold: Self::Config::default().commit_threshold} + } +} + +#[async_trait] +impl Stage for AddressStage { + /// Get the ID of the stage. + /// + /// Stage IDs must be unique. + fn id(&self) -> StageId { + StageId::Other("AddressAppearances") + } + + /// Execute the stage. + async fn execute( + &mut self, + provider: &DatabaseProviderRW<'_, &DB>, + input: ExecInput, + ) -> Result { + /* + Algorithm: + - Equivalent of debug_traceTransaction with CallTracer + - Repeat for every block in batch + - Filter for addresses, group together + - Write to addresses table. + */ + + let tx = provider.tx_ref(); + let block_number = input.next_block(); + + // todo: for all blocks in batch + let first_tx_num = provider + .block_body_indices(block_number)? + .ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))? + .first_tx_num; + + let block = provider + .block_with_senders(block_number)? + .ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))?; + + let mut appearances = vec![]; + // todo: TraceCall + + let encoded_locations = vec![first_tx_num]; // todo: encode the tx_num upper bits for the block + appearances.push((block.beneficiary, encoded_locations)); + + let mut addresses_read_cursor = tx.cursor_write::()?; + let mut addresses_write_cursor = tx.cursor_write::()?; + + for (address, txs) in appearances { + // todo: Store as encoded Tx and use ShardedKey. + let txs_usize: Vec = txs.iter().map(|x| *x as usize).collect(); + let locations = IntegerList::new(txs_usize).expect("Could not create integerlist"); + + // todo: Address sharded key already has data and extend otherwise make new entry. + let key = ShardedKey::new(address, block_number); + let _ = match addresses_read_cursor.seek_exact(key.clone())? { + Some((_address, existing)) => { + // todo: Do this more efficiently. + let mut combined_vec: Vec = existing.iter(0).collect(); + for tx in txs { + combined_vec.push(tx as usize); + } + let int_list = + IntegerList::new(combined_vec).expect("Could not create integerlist"); + addresses_write_cursor.upsert(key, int_list) + } + None => addresses_write_cursor.upsert(key, locations), + }; + } + let is_done = true; // todo + Ok(ExecOutput { checkpoint: StageCheckpoint::new(block_number), done: is_done }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + _provider: &DatabaseProviderRW<'_, &DB>, + _input: UnwindInput, + ) -> Result { + todo!() + } +} + +// === Table === +// For setting up a database table + +table!( + /// Stores pointers to transactions for a particular address. + /// + /// Last shard key of the storage will contain `u64::MAX` `BlockNumber`, + /// this would allows us small optimization on db access when change is in plain state. + /// + /// Imagine having shards as: + /// * `Address | 100` + /// * `Address | u64::MAX` + /// + /// What we need to find is number that is one greater than N. Db `seek` function allows us to fetch + /// the shard that equal or more than asked. For example: + /// * For N=50 we would get first shard. + /// * for N=150 we would get second shard. + /// * If max block number is 200 and we ask for N=250 we would fetch last shard and + /// know that needed entry is in `AccountPlainState`. + /// * If there were no shard we would get `None` entry or entry of different storage key. + /// + /// Code example can be found in `reth_provider::HistoricalStateProviderRef` + ( AddressAppearances ) ShardedKey
| IntegerList +); + +// === Config === +// For setting up a stage. + +/// Address appearance stage configuration. +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[serde(default)] +pub struct AddressAppearancesConfig { + /// The maximum number of blocks to process before committing progress to the database. + pub commit_threshold: u64, +} + +impl Default for AddressAppearancesConfig { + fn default() -> Self { + Self { commit_threshold: 100_000 } + } +} diff --git a/crates/stage-extensions/src/lib.rs b/crates/stage-extensions/src/lib.rs new file mode 100644 index 000000000000..943027d042d5 --- /dev/null +++ b/crates/stage-extensions/src/lib.rs @@ -0,0 +1,23 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxzy/reth/issues/" +)] +#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] +#![allow(clippy::result_large_err)] +#![feature(assert_matches)] +//! Crate for non-core stages. New stages added MAY be used by reth. +pub mod address; + +pub use address::*; +use reth_db::{table::Table, tables, TableMetadata, TableType}; + +// Sets up a new NonCoreTable enum containing only these tables (no core reth tables). +tables!(NonCoreTable, 1, [(AddressAppearances, TableType::Table)]); + diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 777041fbcac1..7dc6ba5b7f82 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -16,6 +16,8 @@ mod index_account_history; mod index_storage_history; /// Stage for computing state root. mod merkle; +/// Non-core stage. +mod non_core; /// The sender recovery stage. mod sender_recovery; /// The total difficulty stage @@ -32,6 +34,7 @@ pub use headers::*; pub use index_account_history::*; pub use index_storage_history::*; pub use merkle::*; +pub use non_core::*; pub use sender_recovery::*; pub use total_difficulty::*; pub use tx_lookup::*; diff --git a/crates/stages/src/stages/non_core.rs b/crates/stages/src/stages/non_core.rs new file mode 100644 index 000000000000..0943e8c2ffc6 --- /dev/null +++ b/crates/stages/src/stages/non_core.rs @@ -0,0 +1,25 @@ +//! A non core stage is one that is not required for node functionality. +//! +//! Users may define additional stages and activate them via flags. +//! Implementation is achieved by implementing the NonCoreStage trait on a new struct. +//! +//! Non core stages may also define new non-core database tables and associate them here. + +use async_trait::async_trait; +use reth_db::database::Database; + +use crate::Stage; + +/// A collection of methods and types that make up a non-core Stage. +#[async_trait] +pub trait NonCoreStage +where + Self: Stage, + DB: Database, +{ + /// A config struct used to set up the stage. + type Config; + + /// Used by the pipline builder to set the new stage. + fn new() -> Self; +} diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index b9fe397a8873..ff9262d49d43 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -5,7 +5,7 @@ mod macros; pub(crate) use macros::*; mod runner; -pub(crate) use runner::{ +pub use runner::{ ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, }; diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index 0626ca084863..e250f8d0096a 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -7,7 +7,7 @@ use std::{borrow::Borrow, sync::Arc}; use tokio::sync::oneshot; #[derive(thiserror::Error, Debug)] -pub(crate) enum TestRunnerError { +pub enum TestRunnerError { #[error("Database error occurred.")] Database(#[from] reth_interfaces::db::DatabaseError), #[error("Internal runner error occurred.")] @@ -18,7 +18,7 @@ pub(crate) enum TestRunnerError { /// A generic test runner for stages. #[async_trait::async_trait] -pub(crate) trait StageTestRunner { +pub trait StageTestRunner { type S: Stage + 'static; /// Return a reference to the database. @@ -29,7 +29,7 @@ pub(crate) trait StageTestRunner { } #[async_trait::async_trait] -pub(crate) trait ExecuteStageTestRunner: StageTestRunner { +pub trait ExecuteStageTestRunner: StageTestRunner { type Seed: Send + Sync; /// Seed database for stage execution @@ -64,7 +64,7 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner { } #[async_trait::async_trait] -pub(crate) trait UnwindStageTestRunner: StageTestRunner { +pub trait UnwindStageTestRunner: StageTestRunner { /// Validate the unwind fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError>; From a283c4c4e65874d501fdd235a4fdfddc37c82874 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Tue, 29 Aug 2023 16:10:01 +1000 Subject: [PATCH 04/15] remove stage-extensions crate --- bin/reth/Cargo.toml | 1 - crates/stage-extensions/Cargo.toml | 25 ---- crates/stage-extensions/src/address.rs | 159 ------------------------- crates/stage-extensions/src/lib.rs | 23 ---- 4 files changed, 208 deletions(-) delete mode 100644 crates/stage-extensions/Cargo.toml delete mode 100644 crates/stage-extensions/src/address.rs delete mode 100644 crates/stage-extensions/src/lib.rs diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index a73924cad554..24e35172428a 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -17,7 +17,6 @@ reth-provider = { workspace = true, features = ["test-utils"] } reth-revm = { path = "../../crates/revm" } reth-revm-inspectors = { path = "../../crates/revm/revm-inspectors" } reth-stages = { path = "../../crates/stages" } -reth-stage-extensions = { path = "../../crates/stage-extensions" } reth-interfaces = { workspace = true, features = ["test-utils", "clap"] } reth-transaction-pool.workspace = true reth-beacon-consensus = { path = "../../crates/consensus/beacon" } diff --git a/crates/stage-extensions/Cargo.toml b/crates/stage-extensions/Cargo.toml deleted file mode 100644 index b001f82cda4c..000000000000 --- a/crates/stage-extensions/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "reth-stage-extensions" -version.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -homepage.workspace = true -repository.workspace = true -exclude.workspace = true - -[dependencies] -# reth -reth-db = { path = "../../crates/storage/db", features = ["mdbx", "test-utils"] } -reth-interfaces = { workspace = true, features = ["test-utils"] } -reth-primitives = { workspace = true, features = ["arbitrary"] } -reth-provider = { workspace = true, features = ["test-utils"] } -reth-rlp.workspace = true -reth-revm = { path = "../revm" } -reth-stages = { path = "../../crates/stages", features = ["test-utils"] } - -# misc -assert_matches = "1.5.0" -async-trait.workspace = true -serde.workspace = true -tokio = { workspace = true, features = ["rt", "sync", "macros"] } \ No newline at end of file diff --git a/crates/stage-extensions/src/address.rs b/crates/stage-extensions/src/address.rs deleted file mode 100644 index d07944945cb9..000000000000 --- a/crates/stage-extensions/src/address.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! A stage for indexing the addresses that appear in historical transactions. - -use async_trait::async_trait; -use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, - database::Database, - models::ShardedKey, - table, - transaction::DbTxMut, -}; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - Address, IntegerList, -}; -use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError}; -use reth_stages::{ - stages::NonCoreStage, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, -}; -use serde::{Deserialize, Serialize}; - -// === Stage === -// For setting up a stage. - -/// A stage for indexing addresses that appear. -#[derive(Debug)] -pub struct AddressStage { - /// Threshold block number to commit to db after. - pub commit_threshold: u64, -} - -impl NonCoreStage for AddressStage { - type Config = AddressAppearancesConfig; - - fn new() -> Self { - Self {commit_threshold: Self::Config::default().commit_threshold} - } -} - -#[async_trait] -impl Stage for AddressStage { - /// Get the ID of the stage. - /// - /// Stage IDs must be unique. - fn id(&self) -> StageId { - StageId::Other("AddressAppearances") - } - - /// Execute the stage. - async fn execute( - &mut self, - provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, - ) -> Result { - /* - Algorithm: - - Equivalent of debug_traceTransaction with CallTracer - - Repeat for every block in batch - - Filter for addresses, group together - - Write to addresses table. - */ - - let tx = provider.tx_ref(); - let block_number = input.next_block(); - - // todo: for all blocks in batch - let first_tx_num = provider - .block_body_indices(block_number)? - .ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))? - .first_tx_num; - - let block = provider - .block_with_senders(block_number)? - .ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))?; - - let mut appearances = vec![]; - // todo: TraceCall - - let encoded_locations = vec![first_tx_num]; // todo: encode the tx_num upper bits for the block - appearances.push((block.beneficiary, encoded_locations)); - - let mut addresses_read_cursor = tx.cursor_write::()?; - let mut addresses_write_cursor = tx.cursor_write::()?; - - for (address, txs) in appearances { - // todo: Store as encoded Tx and use ShardedKey. - let txs_usize: Vec = txs.iter().map(|x| *x as usize).collect(); - let locations = IntegerList::new(txs_usize).expect("Could not create integerlist"); - - // todo: Address sharded key already has data and extend otherwise make new entry. - let key = ShardedKey::new(address, block_number); - let _ = match addresses_read_cursor.seek_exact(key.clone())? { - Some((_address, existing)) => { - // todo: Do this more efficiently. - let mut combined_vec: Vec = existing.iter(0).collect(); - for tx in txs { - combined_vec.push(tx as usize); - } - let int_list = - IntegerList::new(combined_vec).expect("Could not create integerlist"); - addresses_write_cursor.upsert(key, int_list) - } - None => addresses_write_cursor.upsert(key, locations), - }; - } - let is_done = true; // todo - Ok(ExecOutput { checkpoint: StageCheckpoint::new(block_number), done: is_done }) - } - - /// Unwind the stage. - async fn unwind( - &mut self, - _provider: &DatabaseProviderRW<'_, &DB>, - _input: UnwindInput, - ) -> Result { - todo!() - } -} - -// === Table === -// For setting up a database table - -table!( - /// Stores pointers to transactions for a particular address. - /// - /// Last shard key of the storage will contain `u64::MAX` `BlockNumber`, - /// this would allows us small optimization on db access when change is in plain state. - /// - /// Imagine having shards as: - /// * `Address | 100` - /// * `Address | u64::MAX` - /// - /// What we need to find is number that is one greater than N. Db `seek` function allows us to fetch - /// the shard that equal or more than asked. For example: - /// * For N=50 we would get first shard. - /// * for N=150 we would get second shard. - /// * If max block number is 200 and we ask for N=250 we would fetch last shard and - /// know that needed entry is in `AccountPlainState`. - /// * If there were no shard we would get `None` entry or entry of different storage key. - /// - /// Code example can be found in `reth_provider::HistoricalStateProviderRef` - ( AddressAppearances ) ShardedKey
| IntegerList -); - -// === Config === -// For setting up a stage. - -/// Address appearance stage configuration. -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] -#[serde(default)] -pub struct AddressAppearancesConfig { - /// The maximum number of blocks to process before committing progress to the database. - pub commit_threshold: u64, -} - -impl Default for AddressAppearancesConfig { - fn default() -> Self { - Self { commit_threshold: 100_000 } - } -} diff --git a/crates/stage-extensions/src/lib.rs b/crates/stage-extensions/src/lib.rs deleted file mode 100644 index 943027d042d5..000000000000 --- a/crates/stage-extensions/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -#![cfg_attr(docsrs, feature(doc_cfg))] -#![doc( - html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", - html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", - issue_tracker_base_url = "https://github.com/paradigmxzy/reth/issues/" -)] -#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] -#![deny(unused_must_use, rust_2018_idioms)] -#![doc(test( - no_crate_inject, - attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) -))] -#![allow(clippy::result_large_err)] -#![feature(assert_matches)] -//! Crate for non-core stages. New stages added MAY be used by reth. -pub mod address; - -pub use address::*; -use reth_db::{table::Table, tables, TableMetadata, TableType}; - -// Sets up a new NonCoreTable enum containing only these tables (no core reth tables). -tables!(NonCoreTable, 1, [(AddressAppearances, TableType::Table)]); - From df858f302d45a41239f8d4e8309c113eaa731f0d Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Tue, 29 Aug 2023 16:11:24 +1000 Subject: [PATCH 05/15] feat(example): Stage extension CLI example --- Cargo.lock | 33 +++-- Cargo.toml | 2 +- bin/reth/src/cli/ext.rs | 22 ++++ bin/reth/src/node/mod.rs | 23 ++-- crates/storage/db/src/lib.rs | 12 +- examples/additional-stage/Cargo.toml | 20 +++ examples/additional-stage/src/main.rs | 163 +++++++++++++++++++++++++ examples/additional-stage/src/stage.rs | 61 +++++++++ examples/additional-stage/src/table.rs | 13 ++ 9 files changed, 309 insertions(+), 40 deletions(-) create mode 100644 examples/additional-stage/Cargo.toml create mode 100644 examples/additional-stage/src/main.rs create mode 100644 examples/additional-stage/src/stage.rs create mode 100644 examples/additional-stage/src/table.rs diff --git a/Cargo.lock b/Cargo.lock index cb4c0613f6a0..e2655d25f427 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,21 @@ dependencies = [ "reth-transaction-pool", ] +[[package]] +name = "additional-stage" +version = "0.0.0" +dependencies = [ + "async-trait", + "clap", + "eyre", + "jsonrpsee", + "reth", + "reth-db", + "reth-primitives", + "reth-stages", + "reth-transaction-pool", +] + [[package]] name = "addr2line" version = "0.20.0" @@ -5265,7 +5280,6 @@ dependencies = [ "reth-rpc-builder", "reth-rpc-engine-api", "reth-rpc-types", - "reth-stage-extensions", "reth-stages", "reth-tasks", "reth-tracing", @@ -6099,23 +6113,6 @@ dependencies = [ "reth-rpc-types", ] -[[package]] -name = "reth-stage-extensions" -version = "0.1.0-alpha.6" -dependencies = [ - "assert_matches", - "async-trait", - "reth-db", - "reth-interfaces", - "reth-primitives", - "reth-provider", - "reth-revm", - "reth-rlp", - "reth-stages", - "serde", - "tokio", -] - [[package]] name = "reth-stages" version = "0.1.0-alpha.6" diff --git a/Cargo.toml b/Cargo.toml index fee55d15095f..b6d14db5b09a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ members = [ "crates/rpc/rpc-types", "crates/rpc/rpc-testing-util", "crates/stages", - "crates/stage-extensions", "crates/storage/codecs", "crates/storage/db", "crates/storage/libmdbx-rs", @@ -49,6 +48,7 @@ members = [ "crates/rpc/rpc-types-compat", "examples", "examples/additional-rpc-namespace-in-cli", + "examples/additional-stage", ] default-members = ["bin/reth"] diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 663328cda025..941430a6afb2 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -3,6 +3,7 @@ use crate::cli::config::{PayloadBuilderConfig, RethRpcConfig}; use clap::Args; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; +use reth_db::{database::Database, TableMetadata, NO_TABLES}; use reth_network_api::{NetworkInfo, Peers}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_primitives::ChainSpec; @@ -11,6 +12,7 @@ use reth_provider::{ StateProviderFactory, }; use reth_rpc_builder::{RethModuleRegistry, TransportRpcModules}; +use reth_stages::PipelineBuilder; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use std::{fmt, sync::Arc}; @@ -101,6 +103,26 @@ pub trait RethNodeCommandConfig: fmt::Debug { Ok(payload_builder) } + /// Add a new stage to be executed after native Reth stages. + /// + /// Usage: In an external binary, implement the RethNodeCommandConfig trait for a node command. + /// Define a stage set, and then add it to the pipeline builder. + fn add_custom_stage( + &self, + pipeline_builder: &mut PipelineBuilder, + ) -> eyre::Result<()> + where + DB: Database, + { + Ok(()) + } + /// Gets information about non-core tables so they can be instantiated. + fn get_custom_tables( + &self, + ) -> Option> + { + NO_TABLES + } } /// A trait that allows for extending parts of the CLI with additional functionality. diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index ee139dc3d8e3..991eb14e0b01 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -27,7 +27,7 @@ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; use reth_config::{config::PruneConfig, Config}; -use reth_db::{database::Database, init_db, DatabaseEnv, TableMetadata}; +use reth_db::{database::Database, init_db, DatabaseEnv}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -55,13 +55,12 @@ use reth_prune::BatchSizes; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; -use reth_stage_extensions::{AddressStage, NonCoreTable}; use reth_stages::{ prelude::*, stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, - StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, NonCoreStage, + StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, MetricEventsSender, MetricsListener, }; @@ -213,7 +212,7 @@ impl NodeCommand { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let optional_tables = Some(NonCoreTable::all_tables_in_group()); + let optional_tables = self.ext.get_custom_tables(); let db = Arc::new(init_db(&db_path, self.db.log_level, optional_tables)?); info!(target: "reth::cli", "Database opened"); @@ -769,10 +768,8 @@ impl NodeCommand { let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; - let mut pipeline_builder = builder - .with_tip_sender(tip_tx) - .with_metrics_tx(metrics_tx.clone()) - .add_stages( + let mut pipeline_builder = + builder.with_tip_sender(tip_tx).with_metrics_tx(metrics_tx.clone()).add_stages( DefaultStages::new( header_mode, Arc::clone(&consensus), @@ -818,14 +815,10 @@ impl NodeCommand { )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, - )) + )), ); - // TODO: Get non-core stages from CLI args or config file. - let non_core_stages = vec![AddressStage::new()]; - for stage in non_core_stages { - pipeline_builder = pipeline_builder.add_stage(stage); - }; - let pipeline = pipeline_builder.build(db, self.chain.clone()); + self.ext.add_custom_stage(&mut pipeline_builder)?; + let pipeline = pipeline_builder.build(db, self.chain.clone()); Ok(pipeline) } diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index ab831ed22377..646599183e62 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -201,10 +201,10 @@ pub mod test_utils { /// Create read/write database for testing pub fn create_test_rw_db() -> Arc { Arc::new( - init_db::<_, Tables>( + init_db( tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), None, - None, + NO_TABLES, ) .expect(ERROR_DB_CREATION), ) @@ -212,16 +212,16 @@ pub mod test_utils { /// Create read/write database for testing pub fn create_test_rw_db_with_path>(path: P) -> Arc { - Arc::new(init_db::<&Path, Tables>(path.as_ref(), None, None).expect(ERROR_DB_CREATION)) + Arc::new(init_db(path.as_ref(), None, NO_TABLES).expect(ERROR_DB_CREATION)) } /// Create read only database for testing - pub fn create_test_ro_db() -> Arc { + pub fn create_test_ro_db() -> Arc { let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(); { - init_db::<&Path, T>(path.as_path(), None, None).expect(ERROR_DB_CREATION); + init_db(path.as_path(), None, NO_TABLES).expect(ERROR_DB_CREATION); } - Arc::new(open_db_read_only::(path.as_path(), None, None).expect(ERROR_DB_OPEN)) + Arc::new(open_db_read_only(path.as_path(), None, NO_TABLES).expect(ERROR_DB_OPEN)) } } diff --git a/examples/additional-stage/Cargo.toml b/examples/additional-stage/Cargo.toml new file mode 100644 index 000000000000..1f62090b0dd9 --- /dev/null +++ b/examples/additional-stage/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "additional-stage" +version = "0.0.0" +publish = false +edition.workspace = true +license.workspace = true + +[dependencies] +# reth +reth.workspace = true +reth-db = { workspace = true, features = ["mdbx", "test-utils"] } +reth-primitives.workspace = true +reth-stages = { path = "../../crates/stages" } +reth-transaction-pool.workspace = true + +# misc +async-trait.workspace = true +clap = { version = "4", features = ["derive"] } +jsonrpsee = { workspace = true, features = ["server", "macros"] } +eyre = "0.6" diff --git a/examples/additional-stage/src/main.rs b/examples/additional-stage/src/main.rs new file mode 100644 index 000000000000..2c645aa8f560 --- /dev/null +++ b/examples/additional-stage/src/main.rs @@ -0,0 +1,163 @@ +//! Example of how to add an additional stage to Reth. +//! +//! Run with +//! +//! ```not_rust +//! cargo run -p additional-stage -- node --http --ws --enable-mynamespace +//! ``` +//! +//! This example features an additional: +//! - stage (./stage.rs) +//! - database table (./table.rs) +//! - namespace +//! +//! The example shows Reth being started with normal functionality, but with an additional features: +//! - `MyStage`, that executes after native Reth stages. +//! - `MyTable`, that `MyStage` uses to store data. +//! - `mynamespace_myMethod`, that uses MyTable data to respond to JSON-RPC requests. +//! +//! This installs an additional RPC method `mynamespace_myMethod` that can queried via [cast](https://github.com/foundry-rs/foundry) +//! +//! ```sh +//! cast rpc mynamespace_myMethod +//! ``` +//! +//! Specifically, the example adds a new stage that records the most recent block that a miner +//! produced. A new table stores this data. The JSON-RPC method returns the latest block produced +//! (if any) for any given address. +use std::sync::Arc; + +use clap::Parser; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use reth::{ + cli::{ + config::RethRpcConfig, + ext::{RethCliExt, RethNodeCommandConfig}, + Cli, + }, + db, + network::{NetworkInfo, Peers}, + providers::{ + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, + EvmEnvProvider, StateProviderFactory, + }, + rpc::builder::{RethModuleRegistry, TransportRpcModules}, + tasks::TaskSpawner, +}; +use reth_db::{database::Database, transaction::DbTx, TableMetadata}; +use reth_primitives::{Address, BlockNumber}; +use reth_stages::{PipelineBuilder, StageSet}; +use reth_transaction_pool::TransactionPool; +use stage::{MyStage, MyStageSet}; +use table::{MyTable, NonCoreTable}; + +// Custom modules for the example. +mod stage; +mod table; + +fn main() { + // Start reth + Cli::::parse().run().unwrap(); +} + +/// The type that tells the reth CLI what extensions to use +struct MyRethCliExt; + +impl RethCliExt for MyRethCliExt { + /// This tells the reth CLI to install the `mynamespace` rpc namespace via `RethCliNamespaceExt` + type Node = RethCliNamespaceExt; +} + +/// Our custom cli args extension that adds one flag to reth default CLI. +#[derive(Debug, Clone, Copy, Default, clap::Args)] +struct RethCliNamespaceExt { + /// CLI flag to enable the mynamespace extension namespace + #[clap(long)] + pub enable_mynamespace: bool, +} + +// Note: For all the methods in this trait, the method arguments are provided by Reth. +impl RethNodeCommandConfig for RethCliNamespaceExt { + // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. + fn extend_rpc_modules( + &mut self, + _config: &Conf, + registry: &mut RethModuleRegistry, + modules: &mut TransportRpcModules, + ) -> eyre::Result<()> + where + Conf: RethRpcConfig, + Provider: BlockReaderIdExt + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, + { + if !self.enable_mynamespace { + return Ok(()) + } + + let database = todo!("Get reference to database"); + let ext = MyNamespace { database }; + + // now we merge our extension namespace into all configured transports + modules.merge_configured(ext.into_rpc())?; + + println!("mynamespace extension enabled"); + Ok(()) + } + + fn add_custom_stage(&self, pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> + where + DB: Database, + { + // Make a new set of stages. + let set = MyStageSet::new().set(MyStage::new()); + // Add set to pipeline. + pipeline_builder.add_stages(set); + Ok(()) + } + fn get_custom_tables(&self) -> Option> { + Some(NonCoreTable::all_tables_in_group()) + } +} + +/// trait interface for a custom rpc namespace: `mynamespace` +/// +/// This defines an additional namespace where all methods are configured as trait functions. +#[rpc(server, namespace = "MyNamespace")] +pub trait MyNamespaceApi { + /// Returns the response to mynamespace_myMethod + #[method(name = "myMethod")] + fn my_method(&self, my_param: Address) -> RpcResult; +} + +/// The type that implements the `mynamespace` rpc namespace trait +/// +/// Usage: Members of this struct hold data for constructing the response. +pub struct MyNamespace { + /// A reference to the reth database. Used by MyNamespace to construct responses. + database: Arc, +} + +impl MyNamespaceApiServer for MyNamespace { + fn my_method(&self, my_param: Address) -> RpcResult { + let response: Option = self + .database + .tx() + .expect("Couldn't get DB Tx") + .get::(my_param) + .expect("Couldn't read MyTable"); + Ok(response) + } +} + +/// Data for the response to the JSON-RPC method. +type MyMethodResponse = Option; diff --git a/examples/additional-stage/src/stage.rs b/examples/additional-stage/src/stage.rs new file mode 100644 index 000000000000..486d08115c5f --- /dev/null +++ b/examples/additional-stage/src/stage.rs @@ -0,0 +1,61 @@ +//! Example custom stage implementation. + +use async_trait::async_trait; +use reth::providers::{BlockReader, DatabaseProviderRW}; +use reth_db::{database::Database, transaction::DbTxMut}; +use reth_primitives::stage::StageId; +use reth_stages::{ + ExecInput, ExecOutput, Stage, StageError, StageSet, StageSetBuilder, UnwindInput, UnwindOutput, +}; + +/// A single stage. +#[derive(Debug, Default)] +pub struct MyStage {} + +impl MyStage { + pub fn new() -> Self { + MyStage {} + } +} + +#[async_trait] +impl Stage for MyStage { + fn id(&self) -> StageId { + StageId::Other("MyStage") + } + + async fn execute( + &mut self, + provider: &DatabaseProviderRW<'_, &DB>, + input: ExecInput, + ) -> Result { + // For demonstration, the stage stores the most recent block that a "miner" produces. + if let Some(block) = provider.block_by_number(input.next_block()?)? { + provider.tx_mut().put(block.beneficiary, block.number)?; + } + todo!("Return ExecOutput"); + } + + async fn unwind( + &mut self, + provider: &DatabaseProviderRW<'_, &DB>, + input: UnwindInput, + ) -> Result { + todo!("Remove entries from database as appropriate for the unwind.") + } +} + +/// A group of stages that are related. +pub struct MyStageSet {} + +impl StageSet for MyStageSet { + fn builder(self) -> StageSetBuilder { + StageSetBuilder::default().add_stage(MyStage::default()) + } +} + +impl MyStageSet { + pub fn new() -> Self { + todo!() + } +} diff --git a/examples/additional-stage/src/table.rs b/examples/additional-stage/src/table.rs new file mode 100644 index 000000000000..d074320906f4 --- /dev/null +++ b/examples/additional-stage/src/table.rs @@ -0,0 +1,13 @@ +//! Example custom table implementation. + +use reth_db::{table, table::Table, tables, TableMetadata, TableType}; +use reth_primitives::{Address, BlockNumber}; + +// Usage: (TableName) KeyType | ValueType +table!( + /// Stores the last block an address appears in (for demonstration purposes). + ( MyTable ) Address | BlockNumber +); + +// Sets up a new NonCoreTable enum containing only new tables (no core reth tables). +tables!(NonCoreTable, 1, [(MyTable, TableType::Table)]); From f52d0860678e59ce7a87395addbc469688e7ea67 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Tue, 29 Aug 2023 16:19:43 +1000 Subject: [PATCH 06/15] fix: remove unused NonCoreStage --- crates/stages/src/stages/mod.rs | 3 --- crates/stages/src/stages/non_core.rs | 25 ------------------------- 2 files changed, 28 deletions(-) delete mode 100644 crates/stages/src/stages/non_core.rs diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 7dc6ba5b7f82..777041fbcac1 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -16,8 +16,6 @@ mod index_account_history; mod index_storage_history; /// Stage for computing state root. mod merkle; -/// Non-core stage. -mod non_core; /// The sender recovery stage. mod sender_recovery; /// The total difficulty stage @@ -34,7 +32,6 @@ pub use headers::*; pub use index_account_history::*; pub use index_storage_history::*; pub use merkle::*; -pub use non_core::*; pub use sender_recovery::*; pub use total_difficulty::*; pub use tx_lookup::*; diff --git a/crates/stages/src/stages/non_core.rs b/crates/stages/src/stages/non_core.rs deleted file mode 100644 index 0943e8c2ffc6..000000000000 --- a/crates/stages/src/stages/non_core.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! A non core stage is one that is not required for node functionality. -//! -//! Users may define additional stages and activate them via flags. -//! Implementation is achieved by implementing the NonCoreStage trait on a new struct. -//! -//! Non core stages may also define new non-core database tables and associate them here. - -use async_trait::async_trait; -use reth_db::database::Database; - -use crate::Stage; - -/// A collection of methods and types that make up a non-core Stage. -#[async_trait] -pub trait NonCoreStage -where - Self: Stage, - DB: Database, -{ - /// A config struct used to set up the stage. - type Config; - - /// Used by the pipline builder to set the new stage. - fn new() -> Self; -} From e3b70673634884343e0b0f4c28e31289693db00d Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Tue, 29 Aug 2023 21:26:26 +1000 Subject: [PATCH 07/15] fix: Use assoc. type for table extensions --- bin/reth/src/cli/ext.rs | 48 ++++++++++++++++++++++++++++++++-------- bin/reth/src/cli/mod.rs | 4 +++- bin/reth/src/node/mod.rs | 9 +++++--- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 941430a6afb2..afeccd0d1455 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -3,7 +3,9 @@ use crate::cli::config::{PayloadBuilderConfig, RethRpcConfig}; use clap::Args; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_db::{database::Database, TableMetadata, NO_TABLES}; +use reth_db::{ + database::Database, table::Table, TableMetadata, TableType, TableViewer, Tables, NO_TABLES, +}; use reth_network_api::{NetworkInfo, Peers}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_primitives::ChainSpec; @@ -28,11 +30,14 @@ pub trait RethCliExt { /// /// If no additional CLI arguments are required, the [NoArgs] wrapper type can be used. type Node: RethNodeCommandExt; + /// Provides additional non-core tables for the node CLI extension. + type TableExt: TableMetadata; } /// The default CLI extension. impl RethCliExt for () { type Node = DefaultRethNodeCommandConfig; + type TableExt = DefaultRethTablesConfig; } /// A trait that allows for extending and customizing parts of the node command @@ -107,20 +112,14 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// Usage: In an external binary, implement the RethNodeCommandConfig trait for a node command. /// Define a stage set, and then add it to the pipeline builder. - fn add_custom_stage( - &self, - pipeline_builder: &mut PipelineBuilder, - ) -> eyre::Result<()> + fn add_custom_stage(&self, pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> where DB: Database, { Ok(()) } /// Gets information about non-core tables so they can be instantiated. - fn get_custom_tables( - &self, - ) -> Option> - { + fn get_custom_tables(&self) -> Option> { NO_TABLES } } @@ -141,6 +140,37 @@ impl RethNodeCommandConfig for DefaultRethNodeCommandConfig {} impl RethNodeCommandConfig for () {} +/// The default configuration for starting Reth with no non-core tables. +#[derive(PartialEq)] +pub struct DefaultRethTablesConfig; + +impl TableMetadata for DefaultRethTablesConfig { + const NUM_TABLES: usize = 0; + + fn all_tables_in_group() -> Vec + where + Self: Sized + 'static, + { + vec![] + } + + fn name(&self) -> &'static str { + "no extra tables" + } + + fn table_type(&self) -> reth_db::TableType { + TableType::Table + } + + fn view(&self, visitor: &T) -> Result + where + T: TableViewer, + { + let no_table: R; + Ok(no_table) + } +} + /// A helper struct that allows for wrapping a [RethNodeCommandConfig] value without providing /// additional CLI arguments. /// diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 50864f5bfa0a..3cce1ff026c9 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -68,7 +68,9 @@ impl Cli { let runner = CliRunner::default(); match self.command { - Commands::Node(command) => runner.run_command_until_exit(|ctx| command.execute(ctx)), + Commands::Node(command) => { + runner.run_command_until_exit(|ctx| command.execute::(ctx)) + }, Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()), diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 991eb14e0b01..99c40501fb85 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -27,7 +27,7 @@ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; use reth_config::{config::PruneConfig, Config}; -use reth_db::{database::Database, init_db, DatabaseEnv}; +use reth_db::{database::Database, init_db, DatabaseEnv, TableMetadata}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -194,7 +194,10 @@ impl NodeCommand { } /// Execute `node` command - pub async fn execute(mut self, ctx: CliContext) -> eyre::Result<()> { + pub async fn execute(mut self, ctx: CliContext) -> eyre::Result<()> + where + T: TableMetadata, + { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); // Raise the fd limit of the process. @@ -212,7 +215,7 @@ impl NodeCommand { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let optional_tables = self.ext.get_custom_tables(); + let optional_tables: Option> = self.ext.get_custom_tables(); let db = Arc::new(init_db(&db_path, self.db.log_level, optional_tables)?); info!(target: "reth::cli", "Database opened"); From 623358ed1682d3c6ac633f7384fcdb1e3e0e4616 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 12:03:26 +1000 Subject: [PATCH 08/15] feat(reth): Add assc. table type to config --- bin/reth/src/cli/ext.rs | 37 +++++++++++++------ bin/reth/src/node/mod.rs | 2 +- .../src/main.rs | 5 ++- examples/additional-stage/src/main.rs | 12 +++--- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index afeccd0d1455..a62dd441edf4 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -31,18 +31,24 @@ pub trait RethCliExt { /// If no additional CLI arguments are required, the [NoArgs] wrapper type can be used. type Node: RethNodeCommandExt; /// Provides additional non-core tables for the node CLI extension. + /// + /// For no extra tables, use: `type TableExt = NoAdditionalTablesConfig` type TableExt: TableMetadata; } /// The default CLI extension. impl RethCliExt for () { type Node = DefaultRethNodeCommandConfig; - type TableExt = DefaultRethTablesConfig; + type TableExt = NoAdditionalTablesConfig; } /// A trait that allows for extending and customizing parts of the node command /// [NodeCommand](crate::node::NodeCommand). pub trait RethNodeCommandConfig: fmt::Debug { + /// Provides additional non-core tables for the node CLI extension. + /// + /// For no extra tables, use: `type TableExt = NoAdditionalTablesConfig` + type TableExt: TableMetadata + 'static; /// Allows for registering additional RPC modules for the transports. /// /// This is expected to call the merge functions of [TransportRpcModules], for example @@ -112,15 +118,19 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// Usage: In an external binary, implement the RethNodeCommandConfig trait for a node command. /// Define a stage set, and then add it to the pipeline builder. - fn add_custom_stage(&self, pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> + fn add_custom_stage(&self, _pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> where DB: Database, { Ok(()) } /// Gets information about non-core tables so they can be instantiated. - fn get_custom_tables(&self) -> Option> { - NO_TABLES + fn get_custom_tables(&self) -> Option> { + let extra_tables = Self::TableExt::all_tables_in_group(); + if extra_tables.is_empty() { + return None + } + Some(extra_tables) } } @@ -136,15 +146,19 @@ impl RethNodeCommandExt for T where T: RethNodeCommandConfig + fmt::Debug + c #[derive(Debug, Clone, Copy, Default, Args)] pub struct DefaultRethNodeCommandConfig; -impl RethNodeCommandConfig for DefaultRethNodeCommandConfig {} +impl RethNodeCommandConfig for DefaultRethNodeCommandConfig { + type TableExt = NoAdditionalTablesConfig; +} -impl RethNodeCommandConfig for () {} +impl RethNodeCommandConfig for () { + type TableExt = NoAdditionalTablesConfig; +} /// The default configuration for starting Reth with no non-core tables. #[derive(PartialEq)] -pub struct DefaultRethTablesConfig; +pub struct NoAdditionalTablesConfig; -impl TableMetadata for DefaultRethTablesConfig { +impl TableMetadata for NoAdditionalTablesConfig { const NUM_TABLES: usize = 0; fn all_tables_in_group() -> Vec @@ -162,12 +176,11 @@ impl TableMetadata for DefaultRethTablesConfig { TableType::Table } - fn view(&self, visitor: &T) -> Result + fn view(&self, _visitor: &T) -> Result where T: TableViewer, { - let no_table: R; - Ok(no_table) + unimplemented!("Called view on helper implementation") } } @@ -215,6 +228,8 @@ impl NoArgs { } impl RethNodeCommandConfig for NoArgs { + type TableExt = NoAdditionalTablesConfig; + fn extend_rpc_modules( &mut self, config: &Conf, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 99c40501fb85..ba7bb38d8956 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -215,7 +215,7 @@ impl NodeCommand { let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let optional_tables: Option> = self.ext.get_custom_tables(); + let optional_tables = self.ext.get_custom_tables(); let db = Arc::new(init_db(&db_path, self.db.log_level, optional_tables)?); info!(target: "reth::cli", "Database opened"); diff --git a/examples/additional-rpc-namespace-in-cli/src/main.rs b/examples/additional-rpc-namespace-in-cli/src/main.rs index e929fc5a7ec9..b388f1db280d 100644 --- a/examples/additional-rpc-namespace-in-cli/src/main.rs +++ b/examples/additional-rpc-namespace-in-cli/src/main.rs @@ -16,7 +16,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth::{ cli::{ config::RethRpcConfig, - ext::{RethCliExt, RethNodeCommandConfig}, + ext::{NoAdditionalTablesConfig, RethCliExt, RethNodeCommandConfig}, Cli, }, network::{NetworkInfo, Peers}, @@ -39,6 +39,8 @@ struct MyRethCliExt; impl RethCliExt for MyRethCliExt { /// This tells the reth CLI to install the `txpool` rpc namespace via `RethCliTxpoolExt` type Node = RethCliTxpoolExt; + /// No additional tables are added in this example. + type TableExt = NoAdditionalTablesConfig; } /// Our custom cli args extension that adds one flag to reth default CLI. @@ -50,6 +52,7 @@ struct RethCliTxpoolExt { } impl RethNodeCommandConfig for RethCliTxpoolExt { + type TableExt = NoAdditionalTablesConfig; // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. fn extend_rpc_modules( &mut self, diff --git a/examples/additional-stage/src/main.rs b/examples/additional-stage/src/main.rs index 2c645aa8f560..bbcbdeb39dbd 100644 --- a/examples/additional-stage/src/main.rs +++ b/examples/additional-stage/src/main.rs @@ -1,4 +1,5 @@ -//! Example of how to add an additional stage to Reth. +//! Example of how to add a custom index to Reth. The index is populated by a custom stage, +//! and can be queried by a custom JSON-RPC endpoint. //! //! Run with //! @@ -32,7 +33,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth::{ cli::{ config::RethRpcConfig, - ext::{RethCliExt, RethNodeCommandConfig}, + ext::{NoAdditionalTablesConfig, RethCliExt, RethNodeCommandConfig}, Cli, }, db, @@ -66,6 +67,8 @@ struct MyRethCliExt; impl RethCliExt for MyRethCliExt { /// This tells the reth CLI to install the `mynamespace` rpc namespace via `RethCliNamespaceExt` type Node = RethCliNamespaceExt; + /// This tells the reth CLI to use additional non-core tables. + type TableExt = NonCoreTable; } /// Our custom cli args extension that adds one flag to reth default CLI. @@ -78,6 +81,8 @@ struct RethCliNamespaceExt { // Note: For all the methods in this trait, the method arguments are provided by Reth. impl RethNodeCommandConfig for RethCliNamespaceExt { + type TableExt = NonCoreTable; + // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. fn extend_rpc_modules( &mut self, @@ -124,9 +129,6 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { pipeline_builder.add_stages(set); Ok(()) } - fn get_custom_tables(&self) -> Option> { - Some(NonCoreTable::all_tables_in_group()) - } } /// trait interface for a custom rpc namespace: `mynamespace` From 37039c896a890b8df98a9671db7437dff216a339 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 12:04:28 +1000 Subject: [PATCH 09/15] feat(example): Impl extra stage table use --- examples/additional-stage/src/stage.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/examples/additional-stage/src/stage.rs b/examples/additional-stage/src/stage.rs index 486d08115c5f..601a880e1a18 100644 --- a/examples/additional-stage/src/stage.rs +++ b/examples/additional-stage/src/stage.rs @@ -3,11 +3,13 @@ use async_trait::async_trait; use reth::providers::{BlockReader, DatabaseProviderRW}; use reth_db::{database::Database, transaction::DbTxMut}; -use reth_primitives::stage::StageId; +use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_stages::{ ExecInput, ExecOutput, Stage, StageError, StageSet, StageSetBuilder, UnwindInput, UnwindOutput, }; +use crate::table::MyTable; + /// A single stage. #[derive(Debug, Default)] pub struct MyStage {} @@ -29,11 +31,19 @@ impl Stage for MyStage { provider: &DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } // For demonstration, the stage stores the most recent block that a "miner" produces. - if let Some(block) = provider.block_by_number(input.next_block()?)? { - provider.tx_mut().put(block.beneficiary, block.number)?; + let range = input.next_block_range(); + let Some(range_end) = range.last() else { return Ok(ExecOutput::done(input.checkpoint())) }; + for block_number in range { + if let Some(block) = provider.block_by_number(block_number)? { + provider.tx_mut().put::(block.beneficiary, block.number)?; + provider.commit()?; + } } - todo!("Return ExecOutput"); + Ok(ExecOutput { checkpoint: StageCheckpoint::new(range_end), done: input.target_reached() }) } async fn unwind( From 2919007392774070bf85675fa93e0716258297af Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 12:04:44 +1000 Subject: [PATCH 10/15] chore: fmt --- bin/reth/src/cli/mod.rs | 2 +- bin/reth/src/db/clear.rs | 2 +- bin/reth/src/db/diff.rs | 3 +-- bin/reth/src/db/get.rs | 2 +- bin/reth/src/db/list.rs | 4 +++- bin/reth/src/recover/storage_tries.rs | 3 ++- bin/reth/src/stage/unwind.rs | 4 +++- crates/stages/src/test_utils/mod.rs | 4 +--- crates/storage/db/src/implementation/mdbx/mod.rs | 2 +- crates/storage/db/src/lib.rs | 8 ++------ examples/rpc-db.rs | 3 ++- 11 files changed, 18 insertions(+), 19 deletions(-) diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 3cce1ff026c9..7d33dde10287 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -70,7 +70,7 @@ impl Cli { match self.command { Commands::Node(command) => { runner.run_command_until_exit(|ctx| command.execute::(ctx)) - }, + } Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()), diff --git a/bin/reth/src/db/clear.rs b/bin/reth/src/db/clear.rs index c04bbb5ec6ec..beb5f20735cc 100644 --- a/bin/reth/src/db/clear.rs +++ b/bin/reth/src/db/clear.rs @@ -4,7 +4,7 @@ use reth_db::{ database::Database, table::Table, transaction::{DbTx, DbTxMut}, - TableViewer, Tables, TableMetadata, + TableMetadata, TableViewer, Tables, }; /// The arguments for the `reth db clear` command diff --git a/bin/reth/src/db/diff.rs b/bin/reth/src/db/diff.rs index 46fc35a518bf..606af9da835f 100644 --- a/bin/reth/src/db/diff.rs +++ b/bin/reth/src/db/diff.rs @@ -20,8 +20,7 @@ use reth_db::{ BlockWithdrawals, Bytecodes, CanonicalHeaders, DatabaseEnvRO, HashedAccount, HashedStorage, HeaderNumbers, HeaderTD, Headers, PlainAccountState, PlainStorageState, PruneCheckpoints, Receipts, StorageChangeSet, StorageHistory, StoragesTrie, SyncStage, SyncStageProgress, - TableMetadata, Tables, TransactionBlock, Transactions, TxHashNumber, TxSenders, - NO_TABLES, + TableMetadata, Tables, TransactionBlock, Transactions, TxHashNumber, TxSenders, NO_TABLES, }; use tracing::info; diff --git a/bin/reth/src/db/get.rs b/bin/reth/src/db/get.rs index 4d386251184b..23d2bb9fae5d 100644 --- a/bin/reth/src/db/get.rs +++ b/bin/reth/src/db/get.rs @@ -1,7 +1,7 @@ use crate::utils::DbTool; use clap::Parser; -use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables, TableMetadata}; +use reth_db::{database::Database, table::Table, TableMetadata, TableType, TableViewer, Tables}; use tracing::error; /// The arguments for the `reth db get` command diff --git a/bin/reth/src/db/list.rs b/bin/reth/src/db/list.rs index f8973a861b42..c3a3e392f544 100644 --- a/bin/reth/src/db/list.rs +++ b/bin/reth/src/db/list.rs @@ -2,7 +2,9 @@ use super::tui::DbListTUI; use crate::utils::{DbTool, ListFilter}; use clap::Parser; use eyre::WrapErr; -use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables, TableMetadata}; +use reth_db::{ + database::Database, table::Table, DatabaseEnvRO, TableMetadata, TableType, TableViewer, Tables, +}; use std::cell::RefCell; use tracing::error; diff --git a/bin/reth/src/recover/storage_tries.rs b/bin/reth/src/recover/storage_tries.rs index 235364e72540..bcee1fc54145 100644 --- a/bin/reth/src/recover/storage_tries.rs +++ b/bin/reth/src/recover/storage_tries.rs @@ -8,7 +8,8 @@ use clap::Parser; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRW}, init_db, tables, - transaction::DbTx, NO_TABLES, + transaction::DbTx, + NO_TABLES, }; use reth_primitives::ChainSpec; use reth_provider::{BlockNumReader, HeaderProvider, ProviderError, ProviderFactory}; diff --git a/bin/reth/src/stage/unwind.rs b/bin/reth/src/stage/unwind.rs index c805d0da0be8..942428c60526 100644 --- a/bin/reth/src/stage/unwind.rs +++ b/bin/reth/src/stage/unwind.rs @@ -5,7 +5,9 @@ use crate::{ dirs::{DataDirPath, MaybePlatformPath}, }; use clap::{Parser, Subcommand}; -use reth_db::{cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx, NO_TABLES}; +use reth_db::{ + cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx, NO_TABLES, +}; use reth_primitives::{BlockHashOrNumber, ChainSpec}; use reth_provider::{BlockExecutionWriter, ProviderFactory}; use std::{ops::RangeInclusive, sync::Arc}; diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index ff9262d49d43..edab0e576904 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -5,9 +5,7 @@ mod macros; pub(crate) use macros::*; mod runner; -pub use runner::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, -}; +pub use runner::{ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner}; mod test_db; pub use test_db::TestTransaction; diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index fd92bc6ba736..72680f2f0b79 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -179,7 +179,7 @@ mod tests { tables::{AccountHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState}, test_utils::*, transaction::{DbTx, DbTxMut}, - AccountChangeSet, DatabaseError, NUM_TABLES, NO_TABLES, + AccountChangeSet, DatabaseError, NO_TABLES, NUM_TABLES, }; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{NoWriteMap, WriteMap}; diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 646599183e62..ca30b9289b53 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -201,12 +201,8 @@ pub mod test_utils { /// Create read/write database for testing pub fn create_test_rw_db() -> Arc { Arc::new( - init_db( - tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), - None, - NO_TABLES, - ) - .expect(ERROR_DB_CREATION), + init_db(tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), None, NO_TABLES) + .expect(ERROR_DB_CREATION), ) } diff --git a/examples/rpc-db.rs b/examples/rpc-db.rs index 35888302d890..db07a742e60f 100644 --- a/examples/rpc-db.rs +++ b/examples/rpc-db.rs @@ -29,7 +29,8 @@ use std::{path::Path, sync::Arc}; #[tokio::main] async fn main() -> eyre::Result<()> { // 1. Setup the DB - let db = Arc::new(open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None, NO_TABLES)?); + let db = + Arc::new(open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), None, NO_TABLES)?); let spec = Arc::new(ChainSpecBuilder::mainnet().build()); let factory = ProviderFactory::new(db.clone(), spec.clone()); From bd92254ea7cd36e988886e74aee0d283cd4c2dbe Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 18:17:26 +1000 Subject: [PATCH 11/15] feat(storage): add DatabaseProvider trait --- crates/storage/provider/src/lib.rs | 4 ++-- crates/storage/provider/src/providers/mod.rs | 23 ++++++++++++++----- .../storage/provider/src/traits/database.rs | 15 ++++++++++++ 3 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 crates/storage/provider/src/traits/database.rs diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 4e7048cd4621..7d8316f32000 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -25,8 +25,8 @@ pub use traits::{ BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, BlockWriter, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, - ChainSpecProvider, ChangeSetReader, EvmEnvProvider, ExecutorFactory, HashingWriter, - HeaderProvider, HistoryWriter, PostStateDataProvider, PruneCheckpointReader, + ChainSpecProvider, ChangeSetReader, DatabaseReader, EvmEnvProvider, ExecutorFactory, + HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider, PruneCheckpointReader, PruneCheckpointWriter, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader, TransactionsProvider, WithdrawalsProvider, diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 4a930e9c17bf..51aa651eb195 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -1,10 +1,10 @@ use crate::{ - BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, - PostStateDataProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, - ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory, - TransactionsProvider, WithdrawalsProvider, + traits::DatabaseReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, + BlockReaderIdExt, BlockchainTreePendingStateProvider, CanonChainTracker, + CanonStateNotifications, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, + EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError, PruneCheckpointReader, + ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, + StateProviderFactory, TransactionsProvider, WithdrawalsProvider, }; use reth_db::{database::Database, models::StoredBlockBodyIndices}; use reth_interfaces::{ @@ -837,3 +837,14 @@ where self.database.provider()?.account_block_changeset(block_number) } } + +impl DatabaseReader for BlockchainProvider +where + DB: Database, + Tree: Sync + Send, +{ + fn database(&self) -> Result> { + let provider = self.database.provider()?; + Ok(provider) + } +} diff --git a/crates/storage/provider/src/traits/database.rs b/crates/storage/provider/src/traits/database.rs new file mode 100644 index 000000000000..085e36a5758b --- /dev/null +++ b/crates/storage/provider/src/traits/database.rs @@ -0,0 +1,15 @@ +use auto_impl::auto_impl; +use reth_db::database::Database; +use reth_interfaces::Result; + +use crate::DatabaseProviderRO; + +/// A type that provides read access to any part of the database. +#[auto_impl(&, Arc, Box)] +pub trait DatabaseReader: Send + Sync +where + DB: Database, +{ + /// Returns a readable database. + fn database(&self) -> Result>; +} From c96f67796ca39b00cfeab74c99c80f763f35015e Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 18:20:03 +1000 Subject: [PATCH 12/15] fix(stages): Pass &mut to PipelineBuilder --- bin/reth/src/args/rpc_server_args.rs | 11 +- bin/reth/src/chain/import.rs | 7 +- bin/reth/src/cli/ext.rs | 27 +++-- bin/reth/src/debug_cmd/execution.rs | 55 +++++----- bin/reth/src/node/mod.rs | 100 +++++++++--------- crates/consensus/beacon/src/engine/sync.rs | 5 +- .../consensus/beacon/src/engine/test_utils.rs | 9 +- crates/stages/src/pipeline/builder.rs | 10 +- crates/stages/src/pipeline/mod.rs | 46 ++++---- 9 files changed, 142 insertions(+), 128 deletions(-) diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 3b2c463788c3..9a3567c0bf2d 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -9,10 +9,11 @@ use clap::{ Arg, Args, Command, }; use futures::TryFutureExt; +use reth_db::database::Database; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ - BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, - HeaderProvider, StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseReader, + EvmEnvProvider, HeaderProvider, StateProviderFactory, }; use reth_rpc::{ eth::{ @@ -223,7 +224,7 @@ impl RpcServerArgs { /// for the auth server that handles the `engine_` API that's accessed by the consensus /// layer. #[allow(clippy::too_many_arguments)] - pub async fn start_servers( + pub async fn start_servers( &self, provider: Provider, pool: Pool, @@ -235,7 +236,9 @@ impl RpcServerArgs { conf: &mut Conf, ) -> eyre::Result<(RpcServerHandle, AuthServerHandle)> where - Provider: BlockReaderIdExt + DB: Database + 'static, + Provider: DatabaseReader + + BlockReaderIdExt + HeaderProvider + StateProviderFactory + EvmEnvProvider diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index c191af16d11b..66293b535e09 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -160,7 +160,8 @@ impl ImportCommand { let factory = reth_revm::Factory::new(self.chain.clone()); let max_block = file_client.max_block().unwrap_or(0); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty .with_max_block(max_block) @@ -193,8 +194,8 @@ impl ImportCommand { .max(config.stages.storage_hashing.clean_threshold), config.prune.map(|prune| prune.parts).unwrap_or_default(), )), - ) - .build(db, self.chain.clone()); + ); + let mut pipeline = pipeline.build(db, self.chain.clone()); let events = pipeline.events().map(Into::into); diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index a62dd441edf4..75bebb8deda8 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -3,15 +3,13 @@ use crate::cli::config::{PayloadBuilderConfig, RethRpcConfig}; use clap::Args; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_db::{ - database::Database, table::Table, TableMetadata, TableType, TableViewer, Tables, NO_TABLES, -}; +use reth_db::{database::Database, TableMetadata, TableType, TableViewer}; use reth_network_api::{NetworkInfo, Peers}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_primitives::ChainSpec; use reth_provider::{ - BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseReader, + EvmEnvProvider, StateProviderFactory, }; use reth_rpc_builder::{RethModuleRegistry, TransportRpcModules}; use reth_stages::PipelineBuilder; @@ -53,15 +51,17 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// This is expected to call the merge functions of [TransportRpcModules], for example /// [TransportRpcModules::merge_configured] - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, _config: &Conf, _registry: &mut RethModuleRegistry, _modules: &mut TransportRpcModules, ) -> eyre::Result<()> where + DB: Database + 'static, Conf: RethRpcConfig, - Provider: BlockReaderIdExt + Provider: DatabaseReader + + BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + ChainSpecProvider @@ -118,11 +118,14 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// Usage: In an external binary, implement the RethNodeCommandConfig trait for a node command. /// Define a stage set, and then add it to the pipeline builder. - fn add_custom_stage(&self, _pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> + fn add_custom_stage<'a, DB>( + &self, + pipeline_builder: &'a mut PipelineBuilder, + ) -> eyre::Result<&'a mut PipelineBuilder> where DB: Database, { - Ok(()) + Ok(pipeline_builder) } /// Gets information about non-core tables so they can be instantiated. fn get_custom_tables(&self) -> Option> { @@ -230,15 +233,17 @@ impl NoArgs { impl RethNodeCommandConfig for NoArgs { type TableExt = NoAdditionalTablesConfig; - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, config: &Conf, registry: &mut RethModuleRegistry, modules: &mut TransportRpcModules<()>, ) -> eyre::Result<()> where + DB: Database + 'static, Conf: RethRpcConfig, - Provider: BlockReaderIdExt + Provider: DatabaseReader + + BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + ChainSpecProvider diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 54bbd4fcbebb..5bae873f7a47 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -116,35 +116,34 @@ impl Command { let factory = reth_revm::Factory::new(self.chain.clone()); let header_mode = HeaderSyncMode::Tip(tip_rx); - let pipeline = Pipeline::builder() - .with_tip_sender(tip_tx) - .add_stages( - DefaultStages::new( - header_mode, - Arc::clone(&consensus), - header_downloader, - body_downloader, - factory.clone(), - ) - .set( - TotalDifficultyStage::new(consensus) - .with_commit_threshold(stage_conf.total_difficulty.commit_threshold), - ) - .set(SenderRecoveryStage { - commit_threshold: stage_conf.sender_recovery.commit_threshold, - }) - .set(ExecutionStage::new( - factory, - ExecutionStageThresholds { max_blocks: None, max_changes: None }, - stage_conf - .merkle - .clean_threshold - .max(stage_conf.account_hashing.clean_threshold) - .max(stage_conf.storage_hashing.clean_threshold), - config.prune.map(|prune| prune.parts).unwrap_or_default(), - )), + let mut pipeline = Pipeline::builder(); + pipeline.with_tip_sender(tip_tx).add_stages( + DefaultStages::new( + header_mode, + Arc::clone(&consensus), + header_downloader, + body_downloader, + factory.clone(), ) - .build(db, self.chain.clone()); + .set( + TotalDifficultyStage::new(consensus) + .with_commit_threshold(stage_conf.total_difficulty.commit_threshold), + ) + .set(SenderRecoveryStage { + commit_threshold: stage_conf.sender_recovery.commit_threshold, + }) + .set(ExecutionStage::new( + factory, + ExecutionStageThresholds { max_blocks: None, max_changes: None }, + stage_conf + .merkle + .clean_threshold + .max(stage_conf.account_hashing.clean_threshold) + .max(stage_conf.storage_hashing.clean_threshold), + config.prune.map(|prune| prune.parts).unwrap_or_default(), + )), + ); + let pipeline = pipeline.build(db, self.chain.clone()); Ok(pipeline) } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index ba7bb38d8956..2d169466106b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -747,7 +747,7 @@ impl NodeCommand { if let Some(max_block) = max_block { debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); - builder = builder.with_max_block(max_block) + builder.with_max_block(max_block); } let (tip_tx, tip_rx) = watch::channel(H256::zero()); @@ -771,57 +771,55 @@ impl NodeCommand { let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; - let mut pipeline_builder = - builder.with_tip_sender(tip_tx).with_metrics_tx(metrics_tx.clone()).add_stages( - DefaultStages::new( - header_mode, - Arc::clone(&consensus), - header_downloader, - body_downloader, - factory.clone(), - ) - .set( - TotalDifficultyStage::new(consensus) - .with_commit_threshold(stage_config.total_difficulty.commit_threshold), - ) - .set(SenderRecoveryStage { - commit_threshold: stage_config.sender_recovery.commit_threshold, - }) - .set( - ExecutionStage::new( - factory, - ExecutionStageThresholds { - max_blocks: stage_config.execution.max_blocks, - max_changes: stage_config.execution.max_changes, - }, - stage_config - .merkle - .clean_threshold - .max(stage_config.account_hashing.clean_threshold) - .max(stage_config.storage_hashing.clean_threshold), - prune_config.map(|prune| prune.parts).unwrap_or_default(), - ) - .with_metrics_tx(metrics_tx), + + builder.with_tip_sender(tip_tx).with_metrics_tx(metrics_tx.clone()).add_stages( + DefaultStages::new( + header_mode, + Arc::clone(&consensus), + header_downloader, + body_downloader, + factory.clone(), + ) + .set( + TotalDifficultyStage::new(consensus) + .with_commit_threshold(stage_config.total_difficulty.commit_threshold), + ) + .set(SenderRecoveryStage { + commit_threshold: stage_config.sender_recovery.commit_threshold, + }) + .set( + ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: stage_config.execution.max_blocks, + max_changes: stage_config.execution.max_changes, + }, + stage_config + .merkle + .clean_threshold + .max(stage_config.account_hashing.clean_threshold) + .max(stage_config.storage_hashing.clean_threshold), + prune_config.map(|prune| prune.parts).unwrap_or_default(), ) - .set(AccountHashingStage::new( - stage_config.account_hashing.clean_threshold, - stage_config.account_hashing.commit_threshold, - )) - .set(StorageHashingStage::new( - stage_config.storage_hashing.clean_threshold, - stage_config.storage_hashing.commit_threshold, - )) - .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) - .set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold)) - .set(IndexAccountHistoryStage::new( - stage_config.index_account_history.commit_threshold, - )) - .set(IndexStorageHistoryStage::new( - stage_config.index_storage_history.commit_threshold, - )), - ); - self.ext.add_custom_stage(&mut pipeline_builder)?; - let pipeline = pipeline_builder.build(db, self.chain.clone()); + .with_metrics_tx(metrics_tx), + ) + .set(AccountHashingStage::new( + stage_config.account_hashing.clean_threshold, + stage_config.account_hashing.commit_threshold, + )) + .set(StorageHashingStage::new( + stage_config.storage_hashing.clean_threshold, + stage_config.storage_hashing.commit_threshold, + )) + .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) + .set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold)) + .set(IndexAccountHistoryStage::new(stage_config.index_account_history.commit_threshold)) + .set(IndexStorageHistoryStage::new( + stage_config.index_storage_history.commit_threshold, + )), + ); + self.ext.add_custom_stage(&mut builder)?; + let pipeline = builder.build(db, self.chain.clone()); Ok(pipeline) } diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 24b4d04a3b4c..8e6aecf036df 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -457,12 +457,13 @@ mod tests { // Setup pipeline let (tip_tx, _tip_rx) = watch::channel(H256::default()); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) .with_tip_sender(tip_tx); if let Some(max_block) = self.max_block { - pipeline = pipeline.with_max_block(max_block); + pipeline.with_max_block(max_block); } pipeline.build(db, chain_spec) diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 78b3825e6b39..356ce1ca1f5b 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -419,8 +419,9 @@ where // Setup pipeline let (tip_tx, tip_rx) = watch::channel(H256::default()); - let mut pipeline = match self.base_config.pipeline_config { - TestPipelineConfig::Test(outputs) => Pipeline::builder() + let mut pipeline = Pipeline::builder(); + match self.base_config.pipeline_config { + TestPipelineConfig::Test(outputs) => pipeline .add_stages(TestStages::new(outputs, Default::default())) .with_tip_sender(tip_tx), TestPipelineConfig::Real => { @@ -432,7 +433,7 @@ where .build(client.clone(), consensus.clone(), db.clone()) .into_task(); - Pipeline::builder().add_stages(DefaultStages::new( + pipeline.add_stages(DefaultStages::new( HeaderSyncMode::Tip(tip_rx.clone()), Arc::clone(&consensus) as Arc, header_downloader, @@ -443,7 +444,7 @@ where }; if let Some(max_block) = self.base_config.max_block { - pipeline = pipeline.with_max_block(max_block); + pipeline.with_max_block(max_block); } let pipeline = pipeline.build(db.clone(), self.base_config.chain_spec.clone()); diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 7679361c839f..ea844b984ccf 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -25,7 +25,7 @@ where DB: Database, { /// Add a stage to the pipeline. - pub fn add_stage(mut self, stage: S) -> Self + pub fn add_stage(&mut self, stage: S) -> &mut Self where S: Stage + 'static, { @@ -40,7 +40,7 @@ where /// To customize the stages in the set (reorder, disable, insert a stage) call /// [`builder`][StageSet::builder] on the set which will convert it to a /// [`StageSetBuilder`][crate::StageSetBuilder]. - pub fn add_stages>(mut self, set: Set) -> Self { + pub fn add_stages>(&mut self, set: Set) -> &mut Self { for stage in set.builder().build() { self.stages.push(stage); } @@ -50,19 +50,19 @@ where /// Set the target block. /// /// Once this block is reached, the pipeline will stop. - pub fn with_max_block(mut self, block: BlockNumber) -> Self { + pub fn with_max_block(&mut self, block: BlockNumber) -> &mut Self { self.max_block = Some(block); self } /// Set the tip sender. - pub fn with_tip_sender(mut self, tip_tx: watch::Sender) -> Self { + pub fn with_tip_sender(&mut self, tip_tx: watch::Sender) -> &mut Self { self.tip_tx = Some(tip_tx); self } /// Set the metric events sender. - pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + pub fn with_metrics_tx(&mut self, metrics_tx: MetricEventsSender) -> &mut Self { self.metrics_tx = Some(metrics_tx); self } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index d15fefacf679..ece01fcf3a34 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -550,7 +550,8 @@ mod tests { async fn run_pipeline() { let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), @@ -559,8 +560,8 @@ mod tests { TestStage::new(StageId::Other("B")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) - .with_max_block(10) - .build(db, MAINNET.clone()); + .with_max_block(10); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let events = pipeline.events(); // Run pipeline @@ -605,7 +606,9 @@ mod tests { async fn unwind_pipeline() { let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + + pipeline .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) @@ -621,8 +624,8 @@ mod tests { .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) - .with_max_block(10) - .build(db, MAINNET.clone()); + .with_max_block(10); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let events = pipeline.events(); // Run pipeline @@ -721,7 +724,8 @@ mod tests { async fn unwind_pipeline_with_intermediate_progress() { let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) @@ -731,8 +735,8 @@ mod tests { TestStage::new(StageId::Other("B")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) - .with_max_block(10) - .build(db, MAINNET.clone()); + .with_max_block(10); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let events = pipeline.events(); // Run pipeline @@ -808,7 +812,8 @@ mod tests { async fn run_pipeline_with_unwind() { let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .add_stage( TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) @@ -824,8 +829,8 @@ mod tests { .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) - .with_max_block(10) - .build(db, MAINNET.clone()); + .with_max_block(10); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let events = pipeline.events(); // Run pipeline @@ -901,24 +906,25 @@ mod tests { async fn pipeline_error_handling() { // Non-fatal let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() + let mut pipeline = Pipeline::builder(); + pipeline .add_stage( TestStage::new(StageId::Other("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) - .with_max_block(10) - .build(db, MAINNET.clone()); + .with_max_block(10); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let result = pipeline.run().await; assert_matches!(result, Ok(())); // Fatal let db = create_test_rw_db(); - let mut pipeline = Pipeline::builder() - .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( - StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), - ))) - .build(db, MAINNET.clone()); + let mut pipeline = Pipeline::builder(); + pipeline.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( + StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), + ))); + let mut pipeline = pipeline.build(db, MAINNET.clone()); let result = pipeline.run().await; assert_matches!( result, From 8be2fee296941134065c449fdc7ca02eb4dcf8b5 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 18:22:34 +1000 Subject: [PATCH 13/15] feat(examples): impl custom stage addition --- examples/additional-stage/src/main.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/examples/additional-stage/src/main.rs b/examples/additional-stage/src/main.rs index bbcbdeb39dbd..197d35061c1f 100644 --- a/examples/additional-stage/src/main.rs +++ b/examples/additional-stage/src/main.rs @@ -119,7 +119,10 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { Ok(()) } - fn add_custom_stage(&self, pipeline_builder: &mut PipelineBuilder) -> eyre::Result<()> + fn add_custom_stage<'a, DB>( + &self, + pipeline_builder: &'a mut PipelineBuilder, + ) -> eyre::Result<&'a mut PipelineBuilder> where DB: Database, { @@ -127,7 +130,7 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { let set = MyStageSet::new().set(MyStage::new()); // Add set to pipeline. pipeline_builder.add_stages(set); - Ok(()) + Ok(pipeline_builder) } } @@ -144,17 +147,17 @@ pub trait MyNamespaceApi { /// The type that implements the `mynamespace` rpc namespace trait /// /// Usage: Members of this struct hold data for constructing the response. -pub struct MyNamespace { +#[derive(Debug, Clone)] +pub struct MyNamespace<'a, DB: Database> { /// A reference to the reth database. Used by MyNamespace to construct responses. - database: Arc, + database: &'a DatabaseProviderRO<'a, &'a DB>, } -impl MyNamespaceApiServer for MyNamespace { +impl MyNamespaceApiServer for MyNamespace<'static, DB> { fn my_method(&self, my_param: Address) -> RpcResult { let response: Option = self .database - .tx() - .expect("Couldn't get DB Tx") + .tx_ref() .get::(my_param) .expect("Couldn't read MyTable"); Ok(response) From 3c72d1ee93a5760ca5523a91b66ae54660aec2f6 Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Wed, 30 Aug 2023 19:50:01 +1000 Subject: [PATCH 14/15] wip: use DatabaseReader in namespace example --- Cargo.lock | 1 + crates/storage/provider/src/traits/mod.rs | 3 ++ .../Cargo.toml | 1 + .../src/main.rs | 9 ++++-- examples/additional-stage/src/main.rs | 29 ++++++++++--------- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2655d25f427..dbbaf6f637d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,7 @@ dependencies = [ "eyre", "jsonrpsee", "reth", + "reth-db", "reth-transaction-pool", ] diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 3c2e06d21142..bb48b0e478c2 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -15,6 +15,9 @@ pub use block_hash::BlockHashReader; mod block_id; pub use block_id::{BlockIdReader, BlockNumReader}; +mod database; +pub use database::DatabaseReader; + mod evm_env; pub use evm_env::EvmEnvProvider; diff --git a/examples/additional-rpc-namespace-in-cli/Cargo.toml b/examples/additional-rpc-namespace-in-cli/Cargo.toml index dadbe58587db..fbd41eba4ded 100644 --- a/examples/additional-rpc-namespace-in-cli/Cargo.toml +++ b/examples/additional-rpc-namespace-in-cli/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] reth.workspace = true +reth-db = { workspace = true, features = ["mdbx", "test-utils"] } reth-transaction-pool.workspace = true clap = { version = "4", features = ["derive"] } diff --git a/examples/additional-rpc-namespace-in-cli/src/main.rs b/examples/additional-rpc-namespace-in-cli/src/main.rs index b388f1db280d..97155e7186ab 100644 --- a/examples/additional-rpc-namespace-in-cli/src/main.rs +++ b/examples/additional-rpc-namespace-in-cli/src/main.rs @@ -22,11 +22,12 @@ use reth::{ network::{NetworkInfo, Peers}, providers::{ BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - EvmEnvProvider, StateProviderFactory, + DatabaseReader, EvmEnvProvider, StateProviderFactory, }, rpc::builder::{RethModuleRegistry, TransportRpcModules}, tasks::TaskSpawner, }; +use reth_db::database::Database; use reth_transaction_pool::TransactionPool; fn main() { @@ -54,7 +55,7 @@ struct RethCliTxpoolExt { impl RethNodeCommandConfig for RethCliTxpoolExt { type TableExt = NoAdditionalTablesConfig; // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, _config: &Conf, registry: &mut RethModuleRegistry, @@ -62,7 +63,9 @@ impl RethNodeCommandConfig for RethCliTxpoolExt { ) -> eyre::Result<()> where Conf: RethRpcConfig, - Provider: BlockReaderIdExt + DB: Database + 'static, + Provider: DatabaseReader + + BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + ChainSpecProvider diff --git a/examples/additional-stage/src/main.rs b/examples/additional-stage/src/main.rs index 197d35061c1f..8e29efa50d50 100644 --- a/examples/additional-stage/src/main.rs +++ b/examples/additional-stage/src/main.rs @@ -26,6 +26,7 @@ //! Specifically, the example adds a new stage that records the most recent block that a miner //! produced. A new table stores this data. The JSON-RPC method returns the latest block produced //! (if any) for any given address. + use std::sync::Arc; use clap::Parser; @@ -33,19 +34,18 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth::{ cli::{ config::RethRpcConfig, - ext::{NoAdditionalTablesConfig, RethCliExt, RethNodeCommandConfig}, + ext::{RethCliExt, RethNodeCommandConfig}, Cli, }, - db, network::{NetworkInfo, Peers}, providers::{ BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - EvmEnvProvider, StateProviderFactory, + DatabaseProvider, DatabaseProviderRO, DatabaseReader, EvmEnvProvider, StateProviderFactory, }, rpc::builder::{RethModuleRegistry, TransportRpcModules}, tasks::TaskSpawner, }; -use reth_db::{database::Database, transaction::DbTx, TableMetadata}; +use reth_db::{database::Database, transaction::DbTx}; use reth_primitives::{Address, BlockNumber}; use reth_stages::{PipelineBuilder, StageSet}; use reth_transaction_pool::TransactionPool; @@ -84,7 +84,7 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { type TableExt = NonCoreTable; // This is the entrypoint for the CLI to extend the RPC server with custom rpc namespaces. - fn extend_rpc_modules( + fn extend_rpc_modules( &mut self, _config: &Conf, registry: &mut RethModuleRegistry, @@ -92,7 +92,9 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { ) -> eyre::Result<()> where Conf: RethRpcConfig, - Provider: BlockReaderIdExt + DB: Database + 'static, + Provider: DatabaseReader + + BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + ChainSpecProvider @@ -109,8 +111,10 @@ impl RethNodeCommandConfig for RethCliNamespaceExt { return Ok(()) } - let database = todo!("Get reference to database"); - let ext = MyNamespace { database }; + let database = registry.provider().database()?; + let tx = database.into_tx(); + let db_provider = DatabaseProvider::new(tx, registry.provider().chain_spec()); + let ext: MyNamespace<'_, DB> = MyNamespace { database: Arc::new(db_provider) }; // now we merge our extension namespace into all configured transports modules.merge_configured(ext.into_rpc())?; @@ -150,16 +154,13 @@ pub trait MyNamespaceApi { #[derive(Debug, Clone)] pub struct MyNamespace<'a, DB: Database> { /// A reference to the reth database. Used by MyNamespace to construct responses. - database: &'a DatabaseProviderRO<'a, &'a DB>, + database: Arc>, } impl MyNamespaceApiServer for MyNamespace<'static, DB> { fn my_method(&self, my_param: Address) -> RpcResult { - let response: Option = self - .database - .tx_ref() - .get::(my_param) - .expect("Couldn't read MyTable"); + let response: Option = + self.database.tx_ref().get::(my_param).expect("Couldn't read MyTable"); Ok(response) } } From cefec0a20121ed43b3185271b61e07a09666967b Mon Sep 17 00:00:00 2001 From: perama-v <83961755+perama-v@users.noreply.github.com> Date: Fri, 1 Sep 2023 18:13:39 +1000 Subject: [PATCH 15/15] draft: ideas for custom tip stage impl --- crates/blockchain-tree/src/chain.rs | 16 ++++++++++- crates/blockchain-tree/src/externals.rs | 2 ++ crates/storage/provider/src/post_state/mod.rs | 28 ++++++++++++++++++- examples/additional-stage/src/main.rs | 14 ++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 8a3b1b615556..3d39a9038461 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -204,7 +204,21 @@ impl AppendableChain { let provider = PostStateProvider::new(state_provider, post_state_data_provider); let mut executor = externals.executor_factory.with_sp(&provider); - let post_state = executor.execute_and_verify_receipt(&block, U256::MAX, Some(senders))?; + let mut post_state = executor.execute_and_verify_receipt(&block, U256::MAX, Some(senders))?; + + // Additional stages could have their own executor factory. + let mut noncore_executor = externals.noncore_stage_factory.with_sp(&provider); + // Pass custom stages to the executor + let post_stage = noncore_executor.run_noncore_stages(block, noncore_stages); + // Push the changes alongside core post_state + post_state.non_core_changes = Some(non_core_post_state); + /* + Explanation: + // - in crates/storage/provider/src/post_state/mod.rs + // - member has type: `non_core_changes: Option` + // - `NonCorePostWriter` provides method `write_noncore_stage_data(tx)` + // - which is called by self.write_to_db(...) + */ // check state root if the block extends the canonical chain. if block_kind.extends_canonical_head() { diff --git a/crates/blockchain-tree/src/externals.rs b/crates/blockchain-tree/src/externals.rs index b73f0258ebc8..1baede011a76 100644 --- a/crates/blockchain-tree/src/externals.rs +++ b/crates/blockchain-tree/src/externals.rs @@ -24,6 +24,8 @@ pub struct TreeExternals { pub(crate) executor_factory: EF, /// The chain spec. pub(crate) chain_spec: Arc, + /// The factory to run non-core stages on blocks with. + pub(crate) noncore_stage_factory: EF, } impl TreeExternals { diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index 1aed0e666934..59d33184027b 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -56,7 +56,7 @@ pub use storage::{Storage, StorageChanges, StorageChangeset, StorageTransition, /// for receipts and changes, which [PostState::new] does, and thus it (or /// [PostState::with_tx_capacity]) should be preferred to using the [Default] implementation. #[derive(Debug, Clone, Default, Eq, PartialEq)] -pub struct PostState { +pub struct PostState { /// The state of all modified accounts after execution. /// /// If the value contained is `None`, then the account should be deleted. @@ -80,6 +80,10 @@ pub struct PostState { receipts: BTreeMap>, /// Pruning configuration. prune_modes: PruneModes, + /// Data produced from executing a non-core stages. + /// + /// The trait allows the data to be written to the DB in a custom function. + non_core_changes: Option, } impl PostState { @@ -672,10 +676,32 @@ impl PostState { } } + // Write the custom data for non-core stages to the DB. + /* + - `self.non_core_changes` is a type provided by the user. + - It implements a trait that provides this method. + - The method accepts &tx and then the user will impl the desired db write + via: `tx::cursor_write::()` similar to above. + - Thus they can keep any data and write to any table they create. + - `self.non_core_changes` is populated in `crates/blockchain-tree/src/chain.rs` + validate_and_execute() after `post_state` has been obtained. + */ + self.non_core_changes.write_noncore_stage_data(tx)?; Ok(()) } } + +/// Trait that allows data obtained from custom stages to be written +/// to the database. +/// +/// Defined and implemented on an associated type in CLI namespace/stage/table extension group. +pub trait NonCorePostWriter<'a, TX: DbTxMut<'a> + DbTx<'a>> { + /// Method that allows + /// Called alongside core + fn write_noncore_stage_data(self, tx: &TX,); +} + #[cfg(test)] mod tests { use super::*; diff --git a/examples/additional-stage/src/main.rs b/examples/additional-stage/src/main.rs index 8e29efa50d50..c8d309c2bd0a 100644 --- a/examples/additional-stage/src/main.rs +++ b/examples/additional-stage/src/main.rs @@ -69,6 +69,20 @@ impl RethCliExt for MyRethCliExt { type Node = RethCliNamespaceExt; /// This tells the reth CLI to use additional non-core tables. type TableExt = NonCoreTable; + /// Data obtained after executing custom stages. + type CustomStagePostData = MyPostData; +} + +/// Holds data obtained from extra stages, ready to be put in DB. +struct MyPostData { + // e.g., store block -> miner address mappings +} + +impl NonCorePostWriter for MyPostData { + fn write_noncore_stage_data(self, tx: &TX) { + // tx::cursor_write::() + // Store data.miner_addresses in MyTable + } } /// Our custom cli args extension that adds one flag to reth default CLI.