Skip to content

Commit

Permalink
snapshot: parallel checksum for speed
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Dec 23, 2024
1 parent 75a20fa commit f82876a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ prometheus.workspace = true
sui-types.workspace = true
sui-config.workspace = true
sui-core.workspace = true
sui-indexer-alt-framework.workspace = true
sui-storage.workspace = true
sui-protocol-config.workspace = true
fastcrypto = { workspace = true, features = ["copy_key"] }
Expand Down
73 changes: 48 additions & 25 deletions crates/sui-snapshot/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use sui_config::object_storage_config::ObjectStoreConfig;
use sui_core::authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject};
use sui_core::authority::AuthorityStore;
use sui_indexer_alt_framework::task::TrySpawnStreamExt;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_storage::object_store::http::HttpDownloaderBuilder;
use sui_storage::object_store::util::{copy_file, copy_files, path_to_filesystem};
Expand All @@ -41,6 +42,7 @@ use tracing::{error, info};
pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
pub type Sha3DigestType = Arc<Mutex<BTreeMap<u32, BTreeMap<u32, [u8; 32]>>>>;
#[derive(Clone)]
pub struct StateSnapshotReaderV1 {
epoch: u64,
local_staging_dir_root: PathBuf,
Expand Down Expand Up @@ -235,33 +237,54 @@ impl StateSnapshotReaderV1 {
),
);

for (bucket, part_files) in self.ref_files.clone().iter() {
for (part, _part_file) in part_files.iter() {
let mut sha3_digests = sha3_digests.lock().await;
let ref_iter = self.ref_iter(*bucket, *part)?;
let mut hasher = Sha3_256::default();
let mut empty = true;
self.object_files
.get(bucket)
.context(format!("No bucket exists for: {bucket}"))?
.get(part)
.context(format!("No part exists for bucket: {bucket}, part: {part}"))?;
for object_ref in ref_iter {
hasher.update(object_ref.2.inner());
empty = false;
}
if !empty {
sha3_digests
.entry(*bucket)
.or_insert(BTreeMap::new())
.entry(*part)
.or_insert(hasher.finalize().digest);
let ref_files_iter = self.ref_files.clone().into_iter();
futures::stream::iter(ref_files_iter)
.flat_map(|(bucket, part_files)| {
futures::stream::iter(
part_files
.into_iter()
.map(move |(part, part_file)| (bucket, part, part_file)),
)
})
.try_for_each_spawned(self.concurrency, |(bucket, part, _part_file)| {
let sha3_digests = sha3_digests.clone();
let object_files = self.object_files.clone();
let bar = checksum_progress_bar.clone();
let this = self.clone();

async move {
let ref_iter = this.ref_iter(bucket, part)?;
let mut hasher = Sha3_256::default();
let mut empty = true;

object_files
.get(&bucket)
.context(format!("No bucket exists for: {bucket}"))?
.get(&part)
.context(format!("No part exists for bucket: {bucket}, part: {part}"))?;

for object_ref in ref_iter {
hasher.update(object_ref.2.inner());
empty = false;
}

if !empty {
let mut digests = sha3_digests.lock().await;
digests
.entry(bucket)
.or_insert(BTreeMap::new())
.entry(part)
.or_insert(hasher.finalize().digest);
}

bar.inc(1);
bar.set_message(format!("Bucket: {}, Part: {}", bucket, part));
Ok::<(), anyhow::Error>(())
}
checksum_progress_bar.inc(1);
checksum_progress_bar.set_message(format!("Bucket: {}, Part: {}", bucket, part));
}
}
})
.await?;
checksum_progress_bar.finish_with_message("Checksumming complete");
info!("Checksumming complete");
Ok((sha3_digests, num_part_files))
}

Expand Down

0 comments on commit f82876a

Please sign in to comment.