Skip to content

Commit

Permalink
restorer for indexer alt
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Dec 4, 2024
1 parent e5cf6a6 commit 341d4b1
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 0 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ members = [
"crates/sui-indexer",
"crates/sui-indexer-alt",
"crates/sui-indexer-builder",
"crates/sui-indexer-restorer",
"crates/sui-json",
"crates/sui-json-rpc",
"crates/sui-json-rpc-api",
Expand Down Expand Up @@ -646,7 +647,9 @@ sui-graphql-rpc-client = { path = "crates/sui-graphql-rpc-client" }
sui-graphql-rpc-headers = { path = "crates/sui-graphql-rpc-headers" }
sui-genesis-builder = { path = "crates/sui-genesis-builder" }
sui-indexer = { path = "crates/sui-indexer" }
sui-indexer-alt = { path = "crates/sui-indexer-alt" }
sui-indexer-builder = { path = "crates/sui-indexer-builder" }
sui-indexer-restorer = { path = "crates/sui-indexer-restorer" }
sui-json = { path = "crates/sui-json" }
sui-json-rpc = { path = "crates/sui-json-rpc" }
sui-json-rpc-api = { path = "crates/sui-json-rpc-api" }
Expand Down
30 changes: 30 additions & 0 deletions crates/sui-indexer-restorer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "sui-indexer-restorer"
version = "0.1.0"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
anyhow = { version = "1.0.71", features = ["backtrace"] }
async-trait = "0.1.68"
clap = { version = "4.3.0", features = ["derive", "env"] }
futures = "0.3.28"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
tokio = { workspace = true, features = ["full"] }
tracing = "0.1.37"

indicatif.workspace = true
sui-config.workspace = true
sui-snapshot.workspace = true
sui-indexer-alt.workspace = true
sui-types.workspace = true
sui-storage.workspace = true
sui-core.workspace = true
object_store.workspace = true

[[bin]]
name = "sui-indexer-restorer"
path = "src/main.rs"
35 changes: 35 additions & 0 deletions crates/sui-indexer-restorer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

mod snapshot;

use clap::Parser;

use crate::snapshot::SnapshotRestorer;

#[derive(Parser, Debug, Clone)]
#[clap(name = "sui-indexer-restorer")]
pub struct Args {
#[clap(long, env = "START_EPOCH", required = true)]
pub start_epoch: u64,

#[clap(long, env = "ENDPOINT", required = true)]
pub endpoint: String,

#[clap(long, env = "SNAPSHOT_BUCKET", required = true)]
pub snapshot_bucket: String,

#[clap(long, env = "ARCHIVE_BUCKET", required = true)]
pub archive_bucket: String,

#[clap(long, env = "SNAPSHOT_LOCAL_DIR", required = true)]
pub snapshot_local_dir: String,
}

pub async fn restore(args: &Args) -> anyhow::Result<()> {
let mut snapshot_restorer = SnapshotRestorer::new(args).await?;
snapshot_restorer.restore().await?;
Ok(())
}


23 changes: 23 additions & 0 deletions crates/sui-indexer-restorer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use clap::Parser;
use tracing::info;

use sui_indexer_restorer::Args;
use sui_indexer_restorer::restore;

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
info!(
"Starting indexer restorer from epoch {}",
args.start_epoch
);
restore(&args).await?;
info!("Finished indexer restorer!");
Ok(())
}


169 changes: 169 additions & 0 deletions crates/sui-indexer-restorer/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;


use tokio::sync::{Mutex, Semaphore};
use tokio::task;
use anyhow::Error;
use futures::future::{AbortHandle, AbortRegistration, Abortable};
use tracing::info;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use object_store::path::Path;

use sui_snapshot::{reader::{download_bytes, LiveObjectIter, StateSnapshotReaderV1}, FileMetadata};
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
use sui_storage::object_store::ObjectStoreGetExt;
use sui_core::authority::authority_store_tables::LiveObject;
use sui_types::accumulator::Accumulator;

use crate::Args;

pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
pub type Sha3DigestType = Arc<Mutex<BTreeMap<u32, BTreeMap<u32, [u8; 32]>>>>;


pub struct SnapshotRestorer {
pub restore_args: Args,
pub snapshot_reader: StateSnapshotReaderV1,
}

impl SnapshotRestorer {
pub async fn new(args: &Args) -> Result<Self, Error> {
let remote_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::S3),
aws_endpoint: Some(args.endpoint.clone()),
aws_virtual_hosted_style_request: true,
object_store_connection_limit: 50, // TODO: Make this configurable with default
no_sign_request: true,
..Default::default()
};

let local_path = PathBuf::from(&args.snapshot_local_dir);
let snapshot_dir = local_path.join("snapshot");
let local_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::File),
directory: Some(snapshot_dir.clone().to_path_buf()),
..Default::default()
};

let m = MultiProgress::new();
let snapshot_reader = StateSnapshotReaderV1::new(
args.start_epoch,
&remote_store_config,
&local_store_config,
usize::MAX,
NonZeroUsize::new(50).unwrap(), // TODO: Make this configurable with default
m,
true, // skip_reset_local_store
)
.await?;

Ok(Self { restore_args: args.clone(), snapshot_reader })
}

pub async fn restore(&mut self) -> Result<(), Error> {
info!("Starting snapshot restore from epoch {}", self.restore_args.start_epoch);
let (sha3_digests, num_part_files) = self.snapshot_reader.compute_checksum().await?;
let (_abort_handle, abort_registration) = AbortHandle::new_pair();
let (input_files, epoch_dir, remote_object_store, _concurrency) =
self.snapshot_reader.export_metadata().await?;
let owned_input_files: Vec<(u32, (u32, FileMetadata))> = input_files
.into_iter()
.map(|(bucket, (part_num, metadata))| (*bucket, (part_num, metadata.clone())))
.collect();

self.restore_move_objects(abort_registration, owned_input_files, epoch_dir, remote_object_store, sha3_digests, num_part_files).await?;
info!("Finished snapshot restore from epoch {}", self.restore_args.start_epoch);
Ok(())
}

async fn restore_move_objects(
&self,
abort_registration: AbortRegistration,
input_files: Vec<(u32, (u32, FileMetadata))>,
epoch_dir: Path,
remote_object_store: Arc<dyn ObjectStoreGetExt>,
sha3_digests: Arc<Mutex<DigestByBucketAndPartition>>,
num_part_files: usize,
) -> std::result::Result<(), anyhow::Error> {
let move_object_progress_bar = Arc::new(self.snapshot_reader.get_multi_progress().add(
ProgressBar::new(num_part_files as u64).with_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos} out of {len} move object files restored ({msg})",
)
.unwrap(),
),
));

Abortable::new(
async move {
let sema_limit = Arc::new(Semaphore::new(
50, // TODO: Make this configurable with default
));
let mut restore_tasks = vec![];

for (bucket, (part_num, file_metadata)) in input_files.into_iter() {
let sema_limit_clone = sema_limit.clone();
let epoch_dir_clone = epoch_dir.clone();
let remote_object_store_clone = remote_object_store.clone();
let sha3_digests_clone = sha3_digests.clone();
let bar_clone = move_object_progress_bar.clone();
let args = self.restore_args.clone();

let restore_task = task::spawn(async move {
let _permit = sema_limit_clone.acquire().await.unwrap();
let object_file_path = file_metadata.file_path(&epoch_dir_clone);
let (bytes, _) = download_bytes(
remote_object_store_clone,
&file_metadata,
epoch_dir_clone,
sha3_digests_clone,
&&bucket,
&part_num,
Some(512), // TODO: Make this configurable with default
)
.await;
info!(
"Finished downloading move object file {:?}",
object_file_path
);
let mut move_objects = vec![];
let _result: Result<(), anyhow::Error> =
LiveObjectIter::new(&file_metadata, bytes.clone()).map(|obj_iter| {
for object in obj_iter {
match object {
LiveObject::Normal(obj) => {
move_objects.push(obj);
// TODOggao: index object from sui-type object to sui-indexer-alt object
}
LiveObject::Wrapped(_) => {}
}
}
});

let live_obj_cnt = move_objects.len();
// TODOggao: persist to various tables

bar_clone.inc(1);
Ok::<(), anyhow::Error>(())
});
restore_tasks.push(restore_task);
}

let restore_task_results = futures::future::join_all(restore_tasks).await;
for restore_task_result in restore_task_results {
restore_task_result??;
}
Ok(())
},
abort_registration,
)
.await?
}
}

0 comments on commit 341d4b1

Please sign in to comment.