Skip to content

Commit

Permalink
restore object infos
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Dec 4, 2024
1 parent 341d4b1 commit fdbecad
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 99 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions crates/sui-indexer-alt/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ impl DbArgs {
pub fn connection_timeout(&self) -> Duration {
Duration::from_millis(self.connection_timeout_ms)
}

pub fn with_url(url: Url) -> Self {
Self {
database_url: url,
..Default::default()
}
}
}

impl Db {
Expand All @@ -63,7 +70,7 @@ impl Db {

/// Retrieves a connection from the pool. Can fail with a timeout if a connection cannot be
/// established before the [DbConfig::connection_timeout] has elapsed.
pub(crate) async fn connect(&self) -> Result<Connection<'_>, RunError> {
pub async fn connect(&self) -> Result<Connection<'_>, RunError> {
self.pool.get().await
}

Expand Down Expand Up @@ -183,10 +190,7 @@ mod tests {
let url = db.database().url();

info!(%url);
let db_args = DbArgs {
database_url: url.clone(),
..Default::default()
};
let db_args = DbArgs::with_url(url.clone());

let db = Db::new(db_args).await.unwrap();
let mut conn = db.connect().await.unwrap();
Expand All @@ -211,11 +215,7 @@ mod tests {
let temp_db = TempDb::new().unwrap();
let url = temp_db.database().url();

let db_args = DbArgs {
database_url: url.clone(),
..Default::default()
};

let db_args = DbArgs::with_url(url.clone());
let db = Db::new(db_args.clone()).await.unwrap();
let mut conn = db.connect().await.unwrap();
diesel::sql_query("CREATE TABLE test_table (id INTEGER PRIMARY KEY)")
Expand Down
39 changes: 4 additions & 35 deletions crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, Result};
use anyhow::Result;
use diesel_async::RunQueryDsl;
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData, object::Owner};
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData};

use crate::{
db,
models::objects::{StoredObjInfo, StoredOwnerKind},
models::objects::{create_stored_obj_info, StoredObjInfo},
pipeline::{concurrent::Handler, Processor},
schema::obj_info,
};
Expand Down Expand Up @@ -58,40 +58,9 @@ impl Processor for ObjInfo {
None => true,
};
if should_insert {
let type_ = object.type_();
values.insert(
*object_id,
StoredObjInfo {
object_id: object_id.to_vec(),
cp_sequence_number,
owner_kind: Some(match object.owner() {
Owner::AddressOwner(_) => StoredOwnerKind::Address,
Owner::ObjectOwner(_) => StoredOwnerKind::Object,
Owner::Shared { .. } => StoredOwnerKind::Shared,
Owner::Immutable => StoredOwnerKind::Immutable,
Owner::ConsensusV2 { .. } => todo!(),
}),

owner_id: match object.owner() {
Owner::AddressOwner(a) => Some(a.to_vec()),
Owner::ObjectOwner(o) => Some(o.to_vec()),
Owner::Shared { .. } | Owner::Immutable { .. } => None,
Owner::ConsensusV2 { .. } => todo!(),
},

package: type_.map(|t| t.address().to_vec()),
module: type_.map(|t| t.module().to_string()),
name: type_.map(|t| t.name().to_string()),
instantiation: type_
.map(|t| bcs::to_bytes(&t.type_params()))
.transpose()
.map_err(|e| {
anyhow!(
"Failed to serialize type parameters for {}: {e}",
object.id().to_canonical_display(/* with_prefix */ true),
)
})?,
},
create_stored_obj_info(*object_id, cp_sequence_number, object)?,
);
}
}
Expand Down
44 changes: 43 additions & 1 deletion crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use diesel::{
sql_types::SmallInt, FromSqlRow,
};
use sui_field_count::FieldCount;
use sui_types::base_types::ObjectID;
use sui_types::{
base_types::ObjectID,
object::{Object, Owner},
};

use crate::schema::{
kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances,
Expand Down Expand Up @@ -145,3 +148,42 @@ pub struct StoredObjInfo {
pub name: Option<String>,
pub instantiation: Option<Vec<u8>>,
}

pub fn create_stored_obj_info(
object_id: ObjectID,
cp_sequence_number: i64,
object: &Object,
) -> Result<StoredObjInfo, anyhow::Error> {
let type_ = object.type_();
Ok(StoredObjInfo {
object_id: object_id.to_vec(),
cp_sequence_number,
owner_kind: Some(match object.owner() {
Owner::AddressOwner(_) => StoredOwnerKind::Address,
Owner::ObjectOwner(_) => StoredOwnerKind::Object,
Owner::Shared { .. } => StoredOwnerKind::Shared,
Owner::Immutable => StoredOwnerKind::Immutable,
Owner::ConsensusV2 { .. } => todo!(),
}),

owner_id: match object.owner() {
Owner::AddressOwner(a) => Some(a.to_vec()),
Owner::ObjectOwner(o) => Some(o.to_vec()),
Owner::Shared { .. } | Owner::Immutable { .. } => None,
Owner::ConsensusV2 { .. } => todo!(),
},

package: type_.map(|t| t.address().to_vec()),
module: type_.map(|t| t.module().to_string()),
name: type_.map(|t| t.name().to_string()),
instantiation: type_
.map(|t| bcs::to_bytes(&t.type_params()))
.transpose()
.map_err(|e| {
anyhow::anyhow!(
"Failed to serialize type parameters for {}: {e}",
object.id().to_canonical_display(/* with_prefix */ true),
)
})?,
})
}
18 changes: 11 additions & 7 deletions crates/sui-indexer-restorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ anyhow = { version = "1.0.71", features = ["backtrace"] }
async-trait = "0.1.68"
clap = { version = "4.3.0", features = ["derive", "env"] }
futures = "0.3.28"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
tokio = { workspace = true, features = ["full"] }
tracing = "0.1.37"

diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
indicatif.workspace = true
object_store.workspace = true
prometheus.workspace = true
sui-archival.workspace = true
sui-config.workspace = true
sui-snapshot.workspace = true
sui-core.workspace = true
sui-field-count.workspace = true
sui-indexer-alt.workspace = true
sui-types.workspace = true
sui-snapshot.workspace = true
sui-storage.workspace = true
sui-core.workspace = true
object_store.workspace = true
sui-types.workspace = true
tokio = { workspace = true, features = ["full"] }
url.workspace = true

[[bin]]
name = "sui-indexer-restorer"
Expand Down
57 changes: 57 additions & 0 deletions crates/sui-indexer-restorer/src/archives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::num::NonZeroUsize;

use prometheus::Registry;
use sui_types::digests::CheckpointDigest;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

#[derive(Clone, Debug)]
pub(crate) struct ArchivalCheckpointInfo {
pub next_checkpoint_after_epoch: u64,
#[allow(unused)]
pub chain_identifier: CheckpointDigest,
}

pub(crate) async fn read_archival_checkpoint_info(
archive_bucket: Option<String>,
epoch: u64,
) -> Result<ArchivalCheckpointInfo, anyhow::Error> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
object_store_connection_limit: 50,
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(50).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(epoch);
info!(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
let cp_summaries = archive_reader
.get_summaries_for_list_no_verify(vec![0])
.await?;
let first_cp = cp_summaries
.first()
.ok_or_else(|| anyhow::anyhow!("No checkpoint found"))?;
let chain_identifier = *first_cp.digest();
Ok(ArchivalCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
})
}
16 changes: 13 additions & 3 deletions crates/sui-indexer-restorer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

mod archives;
mod snapshot;

use archives::read_archival_checkpoint_info;
use clap::Parser;

use crate::snapshot::SnapshotRestorer;
Expand All @@ -24,12 +26,20 @@ pub struct Args {

#[clap(long, env = "SNAPSHOT_LOCAL_DIR", required = true)]
pub snapshot_local_dir: String,

#[clap(long, env = "DATABASE_URL", required = true)]
pub database_url: String,

#[clap(long, env = "CONCURRENCY", default_value_t = 50)]
pub concurrency: usize,
}

pub async fn restore(args: &Args) -> anyhow::Result<()> {
let mut snapshot_restorer = SnapshotRestorer::new(args).await?;
let archival_checkpoint_info =
read_archival_checkpoint_info(Some(args.archive_bucket.clone()), args.start_epoch).await?;

let mut snapshot_restorer =
SnapshotRestorer::new(args, archival_checkpoint_info.next_checkpoint_after_epoch).await?;
snapshot_restorer.restore().await?;
Ok(())
}


9 changes: 2 additions & 7 deletions crates/sui-indexer-restorer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@ use anyhow::Result;
use clap::Parser;
use tracing::info;

use sui_indexer_restorer::Args;
use sui_indexer_restorer::restore;
use sui_indexer_restorer::Args;

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
info!(
"Starting indexer restorer from epoch {}",
args.start_epoch
);
info!("Starting indexer restorer from epoch {}", args.start_epoch);
restore(&args).await?;
info!("Finished indexer restorer!");
Ok(())
}


Loading

0 comments on commit fdbecad

Please sign in to comment.