From 341d4b1fffc585070f080c45b7609e29ca4a968a Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Wed, 4 Dec 2024 14:02:45 -0500 Subject: [PATCH] restorer for indexer alt --- Cargo.lock | 22 +++ Cargo.toml | 3 + crates/sui-indexer-restorer/Cargo.toml | 30 ++++ crates/sui-indexer-restorer/src/lib.rs | 35 ++++ crates/sui-indexer-restorer/src/main.rs | 23 +++ crates/sui-indexer-restorer/src/snapshot.rs | 169 ++++++++++++++++++++ 6 files changed, 282 insertions(+) create mode 100644 crates/sui-indexer-restorer/Cargo.toml create mode 100644 crates/sui-indexer-restorer/src/lib.rs create mode 100644 crates/sui-indexer-restorer/src/main.rs create mode 100644 crates/sui-indexer-restorer/src/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index a9dac90a8351af..93884fb0d38e13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14149,6 +14149,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "sui-indexer-restorer" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "clap", + "futures", + "indicatif", + "object_store", + "serde", + "serde_json", + "sui-config", + "sui-core", + "sui-indexer-alt", + "sui-snapshot", + "sui-storage", + "sui-types", + "tokio", + "tracing", +] + [[package]] name = "sui-json" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index ffb5f2589ae8f2..1ad482e19ec688 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ members = [ "crates/sui-indexer", "crates/sui-indexer-alt", "crates/sui-indexer-builder", + "crates/sui-indexer-restorer", "crates/sui-json", "crates/sui-json-rpc", "crates/sui-json-rpc-api", @@ -646,7 +647,9 @@ sui-graphql-rpc-client = { path = "crates/sui-graphql-rpc-client" } sui-graphql-rpc-headers = { path = "crates/sui-graphql-rpc-headers" } sui-genesis-builder = { path = "crates/sui-genesis-builder" } sui-indexer = { path = "crates/sui-indexer" } +sui-indexer-alt = { path = "crates/sui-indexer-alt" } sui-indexer-builder = { path = "crates/sui-indexer-builder" } +sui-indexer-restorer = { path = "crates/sui-indexer-restorer" } sui-json = { path = "crates/sui-json" } sui-json-rpc = { path = "crates/sui-json-rpc" } sui-json-rpc-api = { path = "crates/sui-json-rpc-api" } diff --git a/crates/sui-indexer-restorer/Cargo.toml b/crates/sui-indexer-restorer/Cargo.toml new file mode 100644 index 00000000000000..03467c14ef4363 --- /dev/null +++ b/crates/sui-indexer-restorer/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "sui-indexer-restorer" +version = "0.1.0" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +anyhow = { version = "1.0.71", features = ["backtrace"] } +async-trait = "0.1.68" +clap = { version = "4.3.0", features = ["derive", "env"] } +futures = "0.3.28" +serde = { version = "1.0.163", features = ["derive"] } +serde_json = "1.0.96" +tokio = { workspace = true, features = ["full"] } +tracing = "0.1.37" + +indicatif.workspace = true +sui-config.workspace = true +sui-snapshot.workspace = true +sui-indexer-alt.workspace = true +sui-types.workspace = true +sui-storage.workspace = true +sui-core.workspace = true +object_store.workspace = true + +[[bin]] +name = "sui-indexer-restorer" +path = "src/main.rs" diff --git a/crates/sui-indexer-restorer/src/lib.rs b/crates/sui-indexer-restorer/src/lib.rs new file mode 100644 index 00000000000000..ea754715f1809e --- /dev/null +++ b/crates/sui-indexer-restorer/src/lib.rs @@ -0,0 +1,35 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +mod snapshot; + +use clap::Parser; + +use crate::snapshot::SnapshotRestorer; + +#[derive(Parser, Debug, Clone)] +#[clap(name = "sui-indexer-restorer")] +pub struct Args { + #[clap(long, env = "START_EPOCH", required = true)] + pub start_epoch: u64, + + #[clap(long, env = "ENDPOINT", required = true)] + pub endpoint: String, + + #[clap(long, env = "SNAPSHOT_BUCKET", required = true)] + pub snapshot_bucket: String, + + #[clap(long, env = "ARCHIVE_BUCKET", required = true)] + pub archive_bucket: String, + + #[clap(long, env = "SNAPSHOT_LOCAL_DIR", required = true)] + pub snapshot_local_dir: String, +} + +pub async fn restore(args: &Args) -> anyhow::Result<()> { + let mut snapshot_restorer = SnapshotRestorer::new(args).await?; + snapshot_restorer.restore().await?; + Ok(()) +} + + diff --git a/crates/sui-indexer-restorer/src/main.rs b/crates/sui-indexer-restorer/src/main.rs new file mode 100644 index 00000000000000..3446466a46fa2c --- /dev/null +++ b/crates/sui-indexer-restorer/src/main.rs @@ -0,0 +1,23 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use clap::Parser; +use tracing::info; + +use sui_indexer_restorer::Args; +use sui_indexer_restorer::restore; + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + info!( + "Starting indexer restorer from epoch {}", + args.start_epoch + ); + restore(&args).await?; + info!("Finished indexer restorer!"); + Ok(()) +} + + diff --git a/crates/sui-indexer-restorer/src/snapshot.rs b/crates/sui-indexer-restorer/src/snapshot.rs new file mode 100644 index 00000000000000..99ad6c37f7abef --- /dev/null +++ b/crates/sui-indexer-restorer/src/snapshot.rs @@ -0,0 +1,169 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::BTreeMap; +use std::num::NonZeroUsize; +use std::path::PathBuf; +use std::sync::Arc; + + +use tokio::sync::{Mutex, Semaphore}; +use tokio::task; +use anyhow::Error; +use futures::future::{AbortHandle, AbortRegistration, Abortable}; +use tracing::info; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use object_store::path::Path; + +use sui_snapshot::{reader::{download_bytes, LiveObjectIter, StateSnapshotReaderV1}, FileMetadata}; +use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType}; +use sui_storage::object_store::ObjectStoreGetExt; +use sui_core::authority::authority_store_tables::LiveObject; +use sui_types::accumulator::Accumulator; + +use crate::Args; + +pub type DigestByBucketAndPartition = BTreeMap>; +pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator); +pub type Sha3DigestType = Arc>>>; + + +pub struct SnapshotRestorer { + pub restore_args: Args, + pub snapshot_reader: StateSnapshotReaderV1, +} + +impl SnapshotRestorer { + pub async fn new(args: &Args) -> Result { + let remote_store_config = ObjectStoreConfig { + object_store: Some(ObjectStoreType::S3), + aws_endpoint: Some(args.endpoint.clone()), + aws_virtual_hosted_style_request: true, + object_store_connection_limit: 50, // TODO: Make this configurable with default + no_sign_request: true, + ..Default::default() + }; + + let local_path = PathBuf::from(&args.snapshot_local_dir); + let snapshot_dir = local_path.join("snapshot"); + let local_store_config = ObjectStoreConfig { + object_store: Some(ObjectStoreType::File), + directory: Some(snapshot_dir.clone().to_path_buf()), + ..Default::default() + }; + + let m = MultiProgress::new(); + let snapshot_reader = StateSnapshotReaderV1::new( + args.start_epoch, + &remote_store_config, + &local_store_config, + usize::MAX, + NonZeroUsize::new(50).unwrap(), // TODO: Make this configurable with default + m, + true, // skip_reset_local_store + ) + .await?; + + Ok(Self { restore_args: args.clone(), snapshot_reader }) + } + + pub async fn restore(&mut self) -> Result<(), Error> { + info!("Starting snapshot restore from epoch {}", self.restore_args.start_epoch); + let (sha3_digests, num_part_files) = self.snapshot_reader.compute_checksum().await?; + let (_abort_handle, abort_registration) = AbortHandle::new_pair(); + let (input_files, epoch_dir, remote_object_store, _concurrency) = + self.snapshot_reader.export_metadata().await?; + let owned_input_files: Vec<(u32, (u32, FileMetadata))> = input_files + .into_iter() + .map(|(bucket, (part_num, metadata))| (*bucket, (part_num, metadata.clone()))) + .collect(); + + self.restore_move_objects(abort_registration, owned_input_files, epoch_dir, remote_object_store, sha3_digests, num_part_files).await?; + info!("Finished snapshot restore from epoch {}", self.restore_args.start_epoch); + Ok(()) + } + + async fn restore_move_objects( + &self, + abort_registration: AbortRegistration, + input_files: Vec<(u32, (u32, FileMetadata))>, + epoch_dir: Path, + remote_object_store: Arc, + sha3_digests: Arc>, + num_part_files: usize, + ) -> std::result::Result<(), anyhow::Error> { + let move_object_progress_bar = Arc::new(self.snapshot_reader.get_multi_progress().add( + ProgressBar::new(num_part_files as u64).with_style( + ProgressStyle::with_template( + "[{elapsed_precise}] {wide_bar} {pos} out of {len} move object files restored ({msg})", + ) + .unwrap(), + ), + )); + + Abortable::new( + async move { + let sema_limit = Arc::new(Semaphore::new( + 50, // TODO: Make this configurable with default + )); + let mut restore_tasks = vec![]; + + for (bucket, (part_num, file_metadata)) in input_files.into_iter() { + let sema_limit_clone = sema_limit.clone(); + let epoch_dir_clone = epoch_dir.clone(); + let remote_object_store_clone = remote_object_store.clone(); + let sha3_digests_clone = sha3_digests.clone(); + let bar_clone = move_object_progress_bar.clone(); + let args = self.restore_args.clone(); + + let restore_task = task::spawn(async move { + let _permit = sema_limit_clone.acquire().await.unwrap(); + let object_file_path = file_metadata.file_path(&epoch_dir_clone); + let (bytes, _) = download_bytes( + remote_object_store_clone, + &file_metadata, + epoch_dir_clone, + sha3_digests_clone, + &&bucket, + &part_num, + Some(512), // TODO: Make this configurable with default + ) + .await; + info!( + "Finished downloading move object file {:?}", + object_file_path + ); + let mut move_objects = vec![]; + let _result: Result<(), anyhow::Error> = + LiveObjectIter::new(&file_metadata, bytes.clone()).map(|obj_iter| { + for object in obj_iter { + match object { + LiveObject::Normal(obj) => { + move_objects.push(obj); + // TODOggao: index object from sui-type object to sui-indexer-alt object + } + LiveObject::Wrapped(_) => {} + } + } + }); + + let live_obj_cnt = move_objects.len(); + // TODOggao: persist to various tables + + bar_clone.inc(1); + Ok::<(), anyhow::Error>(()) + }); + restore_tasks.push(restore_task); + } + + let restore_task_results = futures::future::join_all(restore_tasks).await; + for restore_task_result in restore_task_results { + restore_task_result??; + } + Ok(()) + }, + abort_registration, + ) + .await? + } +}