From 59a86a0f3f6103033cb7b276ff373cad62e69173 Mon Sep 17 00:00:00 2001 From: Azz Date: Thu, 19 Dec 2024 16:21:30 +0000 Subject: [PATCH 1/2] feat(storage) `L1BlockManager` --- Cargo.lock | 1 + crates/storage/Cargo.toml | 1 + crates/storage/src/managers/l1.rs | 127 +++++++++++++++++++++++++++++ crates/storage/src/managers/mod.rs | 1 + 4 files changed, 130 insertions(+) create mode 100644 crates/storage/src/managers/l1.rs diff --git a/Cargo.lock b/Cargo.lock index b7a3c13b0..575bcb530 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13726,6 +13726,7 @@ dependencies = [ "parking_lot 0.12.3", "paste", "strata-db", + "strata-mmr", "strata-primitives", "strata-state", "threadpool", diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index c5f5ee306..80ab7191b 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -17,3 +17,4 @@ paste.workspace = true threadpool.workspace = true tokio.workspace = true tracing.workspace = true +strata-mmr.workspace = true diff --git a/crates/storage/src/managers/l1.rs b/crates/storage/src/managers/l1.rs new file mode 100644 index 000000000..c4e745fc1 --- /dev/null +++ b/crates/storage/src/managers/l1.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use strata_db::{ + traits::{BlockStatus, Database, L1Database}, + DbResult, +}; +use strata_mmr::CompactMmr; +use strata_primitives::{ + buf::Buf32, + l1::{L1BlockManifest, L1TxRef}, +}; +use strata_state::{block::L2BlockBundle, header::L2Header, id::L2BlockId, l1::L1Tx}; +use threadpool::ThreadPool; +use tokio::sync::oneshot::{self, error::RecvError}; +use tracing::warn; + +use crate::cache::{self, CacheTable}; + +/// Caching manager of L1 blocks in the block database. +pub struct L1BlockManager +where + DB: L1Database + Sync + Send + 'static, +{ + pool: ThreadPool, + db: Arc, + block_cache: CacheTable>, +} + +impl L1BlockManager +where + DB: L1Database + Sync + Send + 'static, +{ + pub fn new(pool: ThreadPool, db: Arc) -> Self { + Self { + pool, + db, + block_cache: CacheTable::new(64.try_into().unwrap()), + } + } + + pub async fn put_block_data( + &self, + idx: u64, + mf: L1BlockManifest, + txs: Vec, + ) -> DbResult<()> { + let db = self.db.clone(); + self.pool + .spawn(move || db.put_block_data(idx, mf, txs)) + .await + } + + pub async fn put_mmr_checkpoint(&self, idx: u64, mmr: CompactMmr) -> DbResult<()> { + let db = self.db.clone(); + self.pool + .spawn(move || db.put_mmr_checkpoint(idx, mmr)) + .await + } + + pub async fn revert_to_height(&self, idx: u64) -> DbResult<()> { + let db = self.db.clone(); + self.pool.spawn(move || db.revert_to_height(idx)).await + } + + pub async fn get_chain_tip(&self) -> DbResult> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_chain_tip()).await + } + + pub async fn get_block_manifest(&self, idx: u64) -> DbResult> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_block_manifest(idx)).await + } + + pub async fn get_blockid_range(&self, start_idx: u64, end_idx: u64) -> DbResult> { + let db = self.db.clone(); + self.pool + .spawn(move || db.get_blockid_range(start_idx, end_idx)) + .await + } + + pub async fn get_block_txs(&self, idx: u64) -> DbResult>> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_block_txs(idx)).await + } + + pub async fn get_tx(&self, tx_ref: L1TxRef) -> DbResult> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_tx(tx_ref)).await + } + + pub async fn get_last_mmr_to(&self, idx: u64) -> DbResult> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_last_mmr_to(idx)).await + } + + pub async fn get_txs_from(&self, start_idx: u64) -> DbResult<(Vec, u64)> { + let db = self.db.clone(); + self.pool.spawn(move || db.get_txs_from(start_idx)).await + } +} + +#[async_trait] +trait ThreadPoolSpawn { + async fn spawn(&self, func: F) -> T + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static; +} + +#[async_trait] +impl ThreadPoolSpawn for ThreadPool { + async fn spawn(&self, func: F) -> T + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + self.execute(move || { + if tx.send(func()).is_err() { + warn!("failed to send response") + } + }); + rx.await.expect("Sender was dropped without sending") + } +} diff --git a/crates/storage/src/managers/mod.rs b/crates/storage/src/managers/mod.rs index 4a25f4f9a..9c015a9d7 100644 --- a/crates/storage/src/managers/mod.rs +++ b/crates/storage/src/managers/mod.rs @@ -1,2 +1,3 @@ pub mod checkpoint; +pub mod l1; pub mod l2; From c75a6f21fd15b24fe858876c11620fb21de1a232 Mon Sep 17 00:00:00 2001 From: Azz Date: Thu, 19 Dec 2024 17:12:03 +0000 Subject: [PATCH 2/2] feat(macros) `gen_async_version` --- Cargo.lock | 12 +++++++- Cargo.toml | 4 +++ crates/db/Cargo.toml | 1 + crates/db/src/traits.rs | 6 ++-- crates/macros/Cargo.toml | 12 ++++++++ crates/macros/src/lib.rs | 49 +++++++++++++++++++++++++++++++ crates/storage/src/managers/l1.rs | 35 ++++++++++------------ 7 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 crates/macros/Cargo.toml create mode 100644 crates/macros/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 575bcb530..4cceb6c74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -13080,6 +13080,7 @@ dependencies = [ "parking_lot 0.12.3", "serde", "serde_json", + "strata-macros", "strata-mmr", "strata-primitives", "strata-state", @@ -13153,6 +13154,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "strata-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.86", +] + [[package]] name = "strata-mmr" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 78038a087..073ab17dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "crates/state", "crates/status", "crates/storage", + "crates/macros", "crates/sync", "crates/tasks", "crates/test-utils", @@ -134,10 +135,13 @@ alloy-rpc-types-eth = { version = "0.4.2", default-features = false, features = ] } alloy-serde = { version = "0.4.2", default-features = false } alloy-sol-types = "0.8.0" +proc-macro2 = "1.0" +quote = "1.0" revm = { version = "14.0.3", features = ["std"], default-features = false } revm-primitives = { version = "10.0.0", features = [ "std", ], default-features = false } +syn = "2.0" # reth itself: reth = { git = "https://github.com/paradigmxyz/reth.git", rev = "v1.1.0" } diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index ea93ed9ad..93114b937 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -20,6 +20,7 @@ parking_lot.workspace = true serde.workspace = true thiserror.workspace = true tracing.workspace = true +strata-macros = { version = "0.1.0", path = "../macros" } [features] default = [] diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index 01d7e72a3..9095b1835 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use borsh::{BorshDeserialize, BorshSerialize}; +use strata_macros::gen_async_version; use strata_mmr::CompactMmr; use strata_primitives::{ l1::*, @@ -43,6 +44,7 @@ pub trait Database { } /// Database interface to control our view of L1 data. +#[gen_async_version] pub trait L1Database { /// Atomically extends the chain with a new block, providing the manifest /// and a list of transactions we find relevant. Returns error if @@ -282,7 +284,7 @@ pub trait ProofDatabase { /// Deletes a proof by its key. /// /// Tries to delete a proof by its key, returning if it really - /// existed or not. + /// existed or not. fn del_proof(&self, proof_key: ProofKey) -> DbResult; /// Inserts dependencies for a given [`ProofContext`] into the database. @@ -298,7 +300,7 @@ pub trait ProofDatabase { /// Deletes dependencies for a given [`ProofContext`]. /// /// Tries to delete dependencies of by its context, returning if it really - /// existed or not. + /// existed or not. fn del_proof_deps(&self, proof_context: ProofContext) -> DbResult; } diff --git a/crates/macros/Cargo.toml b/crates/macros/Cargo.toml new file mode 100644 index 000000000..1f72350d2 --- /dev/null +++ b/crates/macros/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "strata-macros" +version = "0.1.0" +edition = "2024" + +[dependencies] +proc-macro2.workspace = true +quote.workspace = true +syn.workspace = true + +[lib] +proc-macro = true diff --git a/crates/macros/src/lib.rs b/crates/macros/src/lib.rs new file mode 100644 index 000000000..f1d041827 --- /dev/null +++ b/crates/macros/src/lib.rs @@ -0,0 +1,49 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{Attribute, ItemTrait, parse_macro_input}; + +#[proc_macro_attribute] +pub fn gen_async_version(_attr: TokenStream, item: TokenStream) -> TokenStream { + // Parse the input tokens into a syntax tree + let input = parse_macro_input!(item as ItemTrait); + + // Extract the trait name + let trait_name = &input.ident; + let async_trait_name = syn::Ident::new(&format!("{}Async", trait_name), trait_name.span()); + + // Generate async methods + let async_methods = input.items.iter().filter_map(|item| { + if let syn::TraitItem::Fn(method) = item { + let sig = &method.sig; + let async_sig = syn::Signature { + asyncness: Some(Default::default()), // Add async keyword + ..sig.clone() + }; + + let docs: Vec<&Attribute> = method + .attrs + .iter() + .filter(|attr| attr.path().is_ident("doc")) + .collect(); + + Some(quote! { + #(#docs)* + #async_sig; + }) + } else { + None + } + }); + + // Generate the new async trait + let expanded = quote! { + #input + + pub trait #async_trait_name { + #(#async_methods)* + } + }; + + // Convert the expanded code into a TokenStream and return it + TokenStream::from(expanded) +} diff --git a/crates/storage/src/managers/l1.rs b/crates/storage/src/managers/l1.rs index c4e745fc1..623ff6b9b 100644 --- a/crates/storage/src/managers/l1.rs +++ b/crates/storage/src/managers/l1.rs @@ -1,8 +1,7 @@ use std::sync::Arc; -use async_trait::async_trait; use strata_db::{ - traits::{BlockStatus, Database, L1Database}, + traits::{BlockStatus, Database, L1Database, L1DatabaseAsync}, DbResult, }; use strata_mmr::CompactMmr; @@ -38,70 +37,69 @@ where block_cache: CacheTable::new(64.try_into().unwrap()), } } +} - pub async fn put_block_data( - &self, - idx: u64, - mf: L1BlockManifest, - txs: Vec, - ) -> DbResult<()> { +impl L1DatabaseAsync for L1BlockManager +where + DB: L1Database + Sync + Send + 'static, +{ + async fn put_block_data(&self, idx: u64, mf: L1BlockManifest, txs: Vec) -> DbResult<()> { let db = self.db.clone(); self.pool .spawn(move || db.put_block_data(idx, mf, txs)) .await } - pub async fn put_mmr_checkpoint(&self, idx: u64, mmr: CompactMmr) -> DbResult<()> { + async fn put_mmr_checkpoint(&self, idx: u64, mmr: CompactMmr) -> DbResult<()> { let db = self.db.clone(); self.pool .spawn(move || db.put_mmr_checkpoint(idx, mmr)) .await } - pub async fn revert_to_height(&self, idx: u64) -> DbResult<()> { + async fn revert_to_height(&self, idx: u64) -> DbResult<()> { let db = self.db.clone(); self.pool.spawn(move || db.revert_to_height(idx)).await } - pub async fn get_chain_tip(&self) -> DbResult> { + async fn get_chain_tip(&self) -> DbResult> { let db = self.db.clone(); self.pool.spawn(move || db.get_chain_tip()).await } - pub async fn get_block_manifest(&self, idx: u64) -> DbResult> { + async fn get_block_manifest(&self, idx: u64) -> DbResult> { let db = self.db.clone(); self.pool.spawn(move || db.get_block_manifest(idx)).await } - pub async fn get_blockid_range(&self, start_idx: u64, end_idx: u64) -> DbResult> { + async fn get_blockid_range(&self, start_idx: u64, end_idx: u64) -> DbResult> { let db = self.db.clone(); self.pool .spawn(move || db.get_blockid_range(start_idx, end_idx)) .await } - pub async fn get_block_txs(&self, idx: u64) -> DbResult>> { + async fn get_block_txs(&self, idx: u64) -> DbResult>> { let db = self.db.clone(); self.pool.spawn(move || db.get_block_txs(idx)).await } - pub async fn get_tx(&self, tx_ref: L1TxRef) -> DbResult> { + async fn get_tx(&self, tx_ref: L1TxRef) -> DbResult> { let db = self.db.clone(); self.pool.spawn(move || db.get_tx(tx_ref)).await } - pub async fn get_last_mmr_to(&self, idx: u64) -> DbResult> { + async fn get_last_mmr_to(&self, idx: u64) -> DbResult> { let db = self.db.clone(); self.pool.spawn(move || db.get_last_mmr_to(idx)).await } - pub async fn get_txs_from(&self, start_idx: u64) -> DbResult<(Vec, u64)> { + async fn get_txs_from(&self, start_idx: u64) -> DbResult<(Vec, u64)> { let db = self.db.clone(); self.pool.spawn(move || db.get_txs_from(start_idx)).await } } -#[async_trait] trait ThreadPoolSpawn { async fn spawn(&self, func: F) -> T where @@ -109,7 +107,6 @@ trait ThreadPoolSpawn { T: Send + 'static; } -#[async_trait] impl ThreadPoolSpawn for ThreadPool { async fn spawn(&self, func: F) -> T where