Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snapshotter triggers segment snapshots #5287

Merged
merged 41 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
eb00e6d
add snapshots_path to ChainPath
joshieDo Oct 31, 2023
a8ef221
move HighestSnapshots to primitives crate
joshieDo Oct 31, 2023
f1f32d7
add with_highest_tracker to SnapshotProvider
joshieDo Oct 31, 2023
f5cadf8
add a shared snapshot provvider to providerfactory and dbprovider
joshieDo Oct 31, 2023
1acd28f
add snapshot provider to shared blockchain_db
joshieDo Oct 31, 2023
470a6b6
allow unused for now
joshieDo Oct 31, 2023
113ef23
add get_highest_snapshot to SnapshotProvider
joshieDo Oct 31, 2023
73fd00f
move hihgest snapshot channel inside snapshotter
joshieDo Nov 2, 2023
2fef51f
add default receiver to snapshotter
joshieDo Nov 2, 2023
33dc284
replace with with_snapshot_provider on db provider
joshieDo Nov 2, 2023
cda1ca7
use strum for SnapshotSegment, compression and filters
joshieDo Nov 3, 2023
ec89704
snapshotter takes a directory and reads from it
joshieDo Nov 3, 2023
e87c64d
default snapshot filename doesnt have configuration
joshieDo Nov 3, 2023
99c1fa5
create snapshots directory if it doesnt exist
joshieDo Nov 3, 2023
d4b5f61
add run_segment
joshieDo Nov 3, 2023
dd338e5
add type SegmentConfig
joshieDo Nov 3, 2023
00ec5f5
add docs for directory
joshieDo Nov 8, 2023
c1dd713
replace PathBuf usage
joshieDo Nov 8, 2023
3be09cd
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider
joshieDo Nov 8, 2023
96b255c
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 8, 2023
b281144
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 8, 2023
d0699f7
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
892b260
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
756d00b
Update crates/storage/provider/src/providers/snapshot/manager.rs
joshieDo Nov 13, 2023
5891d0a
Update crates/primitives/src/snapshot/segment.rs
joshieDo Nov 13, 2023
d2c21e9
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
a11751f
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 13, 2023
4574c68
add fn block_range to snapshot cmd
joshieDo Nov 13, 2023
d47812a
add read_dir to primitives fs
joshieDo Nov 13, 2023
e1dca84
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 13, 2023
95e965a
trait Segment requires Default
joshieDo Nov 13, 2023
7054744
add reth_primitives::fs::rename
joshieDo Nov 14, 2023
a42ebaa
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
a865b59
use reth fs rename
joshieDo Nov 14, 2023
77094e5
clippy
joshieDo Nov 14, 2023
622159b
Update crates/primitives/src/fs.rs
joshieDo Nov 14, 2023
975b47d
fmt
joshieDo Nov 14, 2023
c9ee102
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider2
joshieDo Nov 14, 2023
2bd3205
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
bceba97
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
986abfd
add missing imports
joshieDo Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
Expand All @@ -32,7 +35,7 @@ impl Command {

let segment = segments::Headers::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
reth_primitives::fs::rename(
Expand Down
7 changes: 5 additions & 2 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
Expand All @@ -33,7 +36,7 @@ impl Command {

let segment = segments::Receipts::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
reth_primitives::fs::rename(
Expand Down
7 changes: 5 additions & 2 deletions bin/reth/src/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
Expand All @@ -33,7 +36,7 @@ impl Command {

let segment = segments::Transactions::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
reth_primitives::fs::rename(
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod segment;
use alloy_primitives::BlockNumber;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment};
pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment};

/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
Expand Down
22 changes: 17 additions & 5 deletions crates/primitives/src/snapshot/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ pub enum SnapshotSegment {

impl SnapshotSegment {
/// Returns the default configuration of the segment.
pub const fn config(&self) -> (Filters, Compression) {
let default_config = (
Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph),
Compression::Lz4,
);
pub const fn config(&self) -> SegmentConfig {
let default_config = SegmentConfig {
filters: Filters::WithFilters(
InclusionFilter::Cuckoo,
super::PerfectHashingFunction::Fmph,
),
compression: Compression::Lz4,
};

match self {
SnapshotSegment::Headers => default_config,
Expand Down Expand Up @@ -133,3 +136,12 @@ impl SegmentHeader {
}
}
}

/// Configuration used on the segment.
#[derive(Debug, Clone, Copy)]
pub struct SegmentConfig {
/// Inclusion filters used on the segment
pub filters: Filters,
/// Compression used on the segment
pub compression: Compression,
}
28 changes: 19 additions & 9 deletions crates/snapshot/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,48 @@ use reth_db::{
};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters},
snapshot::{Compression, Filters, SegmentConfig},
BlockNumber, SnapshotSegment,
};
use reth_provider::DatabaseProviderRO;
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug)]
pub struct Headers {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}

impl Headers {
/// Creates new instance of [Headers] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}

impl Default for Headers {
fn default() -> Self {
Self { config: SnapshotSegment::Headers.config() }
}
}

impl Segment for Headers {
fn segment() -> SnapshotSegment {
SnapshotSegment::Headers
}

fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let range_len = range.clone().count();
let mut jar = prepare_jar::<DB, 3>(
provider,
SnapshotSegment::Headers,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
range.clone(),
range_len,
|| {
Expand All @@ -57,7 +67,7 @@ impl Segment for Headers {
// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
cursor
.walk(Some(RawKey::from(*range.start())))?
Expand Down
26 changes: 17 additions & 9 deletions crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use reth_db::{
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
},
BlockNumber, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
Expand All @@ -24,14 +26,19 @@ use std::{ops::RangeInclusive, path::Path};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

/// A segment represents a snapshotting of some portion of the data.
pub trait Segment {
/// Snapshot data using the provided range.
pub trait Segment: Default {
/// Snapshot data using the provided range. The `directory` parameter determines the snapshot
/// file's save location.
fn snapshot<DB: Database>(
mattsse marked this conversation as resolved.
Show resolved Hide resolved
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()>;

/// Returns this struct's [`SnapshotSegment`].
fn segment() -> SnapshotSegment;

/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
&self,
Expand All @@ -48,24 +55,25 @@ pub trait Segment {
}
}

/// Returns a [`NippyJar`] according to the desired configuration.
/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter
/// determines the snapshot file's save location.
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
segment_config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> RethResult<Rows<COLUMNS>>,
) -> RethResult<NippyJar<SegmentHeader>> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
Path::new(segment.filename(&block_range).as_str()),
&directory.as_ref().join(segment.filename(&block_range).as_str()),
SegmentHeader::new(block_range, tx_range, segment),
);

nippy_jar = match compression {
nippy_jar = match segment_config.compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
Expand All @@ -78,7 +86,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
Compression::Uncompressed => nippy_jar,
};

if let Filters::WithFilters(inclusion_filter, phf) = filters {
if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters {
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};
Expand Down
28 changes: 19 additions & 9 deletions crates/snapshot/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,50 @@ use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
snapshot::{Compression, Filters, SegmentConfig, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug)]
pub struct Receipts {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}

impl Receipts {
/// Creates new instance of [Receipts] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}

impl Default for Receipts {
fn default() -> Self {
Self { config: SnapshotSegment::Receipts.config() }
}
}

impl Segment for Receipts {
fn segment() -> SnapshotSegment {
SnapshotSegment::Receipts
}

fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();

let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Receipts,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
block_range,
tx_range_len,
|| {
Expand All @@ -49,7 +59,7 @@ impl Segment for Receipts {

// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
Expand Down
28 changes: 19 additions & 9 deletions crates/snapshot/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,50 @@ use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
snapshot::{Compression, Filters, SegmentConfig, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug)]
pub struct Transactions {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}

impl Transactions {
/// Creates new instance of [Transactions] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}

impl Default for Transactions {
fn default() -> Self {
Self { config: SnapshotSegment::Transactions.config() }
}
}

impl Segment for Transactions {
fn segment() -> SnapshotSegment {
SnapshotSegment::Transactions
}

fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();

let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Transactions,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
block_range,
tx_range_len,
|| {
Expand All @@ -49,7 +59,7 @@ impl Segment for Transactions {

// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
Expand Down
Loading
Loading