From 0d8698057b5ef4991e33bff1a5066901e3197d90 Mon Sep 17 00:00:00 2001 From: Sadhan Sood <106645797+sadhansood@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:00:28 -0800 Subject: [PATCH] Start indexing dynamic fields in analytics (#15987) ## Description This PR adds a new indexing pipeline for indexing dynamic fields. The primary clustering is to be done by `parent_id` to lookup dynamic fields and objects ## Test Plan Running locally and verifying --- Cargo.lock | 1 + crates/sui-analytics-indexer/Cargo.toml | 1 + .../src/handlers/df_handler.rs | 193 ++++++++++++++++++ .../sui-analytics-indexer/src/handlers/mod.rs | 1 + crates/sui-analytics-indexer/src/lib.rs | 50 ++++- crates/sui-analytics-indexer/src/tables.rs | 20 +- 6 files changed, 263 insertions(+), 3 deletions(-) create mode 100644 crates/sui-analytics-indexer/src/handlers/df_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 57c444387bdb0..769be4b8312ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11730,6 +11730,7 @@ dependencies = [ "sui-rest-api", "sui-storage", "sui-types", + "tap", "telemetry-subscribers", "tempfile", "thiserror", diff --git a/crates/sui-analytics-indexer/Cargo.toml b/crates/sui-analytics-indexer/Cargo.toml index 81743f99104ae..6d7bfb341bb10 100644 --- a/crates/sui-analytics-indexer/Cargo.toml +++ b/crates/sui-analytics-indexer/Cargo.toml @@ -55,6 +55,7 @@ simulacrum.workspace = true arrow = { version = "50.0.0"} gcp-bigquery-client = "0.18.0" snowflake-api = { version = "0.6.0" } +tap = { version = "1.0.1", features = [] } [dev-dependencies] diff --git a/crates/sui-analytics-indexer/src/handlers/df_handler.rs b/crates/sui-analytics-indexer/src/handlers/df_handler.rs new file mode 100644 index 0000000000000..539a29c690394 --- /dev/null +++ b/crates/sui-analytics-indexer/src/handlers/df_handler.rs @@ -0,0 +1,193 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use fastcrypto::encoding::{Base64, Encoding}; +use std::collections::HashMap; +use std::path::Path; +use sui_indexer::errors::IndexerError; +use tap::tap::TapFallible; +use tracing::warn; + +use sui_indexer::framework::Handler; +use sui_indexer::types_v2::owner_to_owner_info; +use sui_json_rpc_types::SuiMoveValue; +use sui_package_resolver::Resolver; +use sui_rest_api::{CheckpointData, CheckpointTransaction}; +use sui_types::base_types::ObjectID; +use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName, DynamicFieldType}; +use sui_types::object::Object; + +use crate::handlers::{get_move_struct, AnalyticsHandler}; +use crate::package_store::{LocalDBPackageStore, PackageCache}; +use crate::tables::DynamicFieldEntry; +use crate::FileType; + +pub struct DynamicFieldHandler { + dynamic_fields: Vec, + package_store: LocalDBPackageStore, + resolver: Resolver, +} + +#[async_trait::async_trait] +impl Handler for DynamicFieldHandler { + fn name(&self) -> &str { + "dynamic_field" + } + async fn process_checkpoint(&mut self, checkpoint_data: &CheckpointData) -> Result<()> { + let CheckpointData { + checkpoint_summary, + transactions: checkpoint_transactions, + .. + } = checkpoint_data; + for checkpoint_transaction in checkpoint_transactions { + for object in checkpoint_transaction.output_objects.iter() { + self.package_store.update(object)?; + } + self.process_transaction( + checkpoint_summary.epoch, + checkpoint_summary.sequence_number, + checkpoint_summary.timestamp_ms, + checkpoint_transaction, + ) + .await?; + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl AnalyticsHandler for DynamicFieldHandler { + fn read(&mut self) -> Result> { + let cloned = self.dynamic_fields.clone(); + self.dynamic_fields.clear(); + Ok(cloned) + } + + fn file_type(&self) -> Result { + Ok(FileType::DynamicField) + } +} + +impl DynamicFieldHandler { + pub fn new(store_path: &Path, rest_uri: &str) -> Self { + let package_store = LocalDBPackageStore::new(&store_path.join("dynamic_field"), rest_uri); + DynamicFieldHandler { + dynamic_fields: vec![], + package_store: package_store.clone(), + resolver: Resolver::new(PackageCache::new(package_store)), + } + } + async fn process_dynamic_field( + &mut self, + epoch: u64, + checkpoint: u64, + timestamp_ms: u64, + object: &Object, + all_written_objects: &HashMap, + ) -> Result<()> { + let move_obj_opt = object.data.try_as_move(); + // Skip if not a move object + let Some(move_object) = move_obj_opt else { + return Ok(()); + }; + if !move_object.type_().is_dynamic_field() { + return Ok(()); + } + let move_struct = if let Some((tag, contents)) = object + .struct_tag() + .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents()))) + { + let move_struct = get_move_struct(&tag, contents, &self.resolver).await?; + Some(move_struct) + } else { + None + }; + let Some(move_struct) = move_struct else { + return Ok(()); + }; + let (name_value, type_, object_id) = + DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?; + let name_type = move_object.type_().try_extract_field_name(&type_)?; + + let bcs_name = bcs::to_bytes(&name_value.clone().undecorate()).map_err(|e| { + IndexerError::SerdeError(format!( + "Failed to serialize dynamic field name {:?}: {e}", + name_value + )) + })?; + let name = DynamicFieldName { + type_: name_type, + value: SuiMoveValue::from(name_value).to_json_value(), + }; + let name_json = serde_json::to_string(&name)?; + let (_owner_type, owner_id) = owner_to_owner_info(&object.owner); + let Some(parent_id) = owner_id else { + return Ok(()); + }; + let entry = match type_ { + DynamicFieldType::DynamicField => DynamicFieldEntry { + parent_object_id: parent_id.to_string(), + transaction_digest: object.previous_transaction.base58_encode(), + checkpoint, + epoch, + timestamp_ms, + name: name_json, + bcs_name: Base64::encode(bcs_name), + type_, + object_id: object.id().to_string(), + version: object.version().value(), + digest: object.digest().to_string(), + object_type: move_object.clone().into_type().into_type_params()[1] + .to_canonical_string(/* with_prefix */ true), + }, + DynamicFieldType::DynamicObject => { + let object = + all_written_objects + .get(&object_id) + .ok_or(IndexerError::UncategorizedError(anyhow::anyhow!( + "Failed to find object_id {:?} when trying to create dynamic field info", + object_id + )))?; + let version = object.version().value(); + let digest = object.digest().to_string(); + let object_type = object.data.type_().unwrap().clone(); + DynamicFieldEntry { + parent_object_id: parent_id.to_string(), + transaction_digest: object.previous_transaction.base58_encode(), + checkpoint, + epoch, + timestamp_ms, + name: name_json, + bcs_name: Base64::encode(bcs_name), + type_, + object_id: object.id().to_string(), + digest, + version, + object_type: object_type.to_canonical_string(true), + } + } + }; + self.dynamic_fields.push(entry); + Ok(()) + } + + async fn process_transaction( + &mut self, + epoch: u64, + checkpoint: u64, + timestamp_ms: u64, + checkpoint_transaction: &CheckpointTransaction, + ) -> Result<()> { + let all_objects: HashMap<_, _> = checkpoint_transaction + .output_objects + .iter() + .map(|x| (x.id(), x.clone())) + .collect(); + for object in checkpoint_transaction.output_objects.iter() { + self.process_dynamic_field(epoch, checkpoint, timestamp_ms, object, &all_objects) + .await?; + } + Ok(()) + } +} diff --git a/crates/sui-analytics-indexer/src/handlers/mod.rs b/crates/sui-analytics-indexer/src/handlers/mod.rs index 89d940a314ddc..12bdfdfe105e3 100644 --- a/crates/sui-analytics-indexer/src/handlers/mod.rs +++ b/crates/sui-analytics-indexer/src/handlers/mod.rs @@ -20,6 +20,7 @@ use crate::tables::{InputObjectKind, ObjectStatus, OwnerType}; use crate::FileType; pub mod checkpoint_handler; +pub mod df_handler; pub mod event_handler; pub mod move_call_handler; pub mod object_handler; diff --git a/crates/sui-analytics-indexer/src/lib.rs b/crates/sui-analytics-indexer/src/lib.rs index b5b161f21eb26..1458ec12173cd 100644 --- a/crates/sui-analytics-indexer/src/lib.rs +++ b/crates/sui-analytics-indexer/src/lib.rs @@ -24,11 +24,13 @@ use sui_storage::object_store::util::{ find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix, }; use sui_types::base_types::EpochId; +use sui_types::dynamic_field::DynamicFieldType; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::analytics_metrics::AnalyticsMetrics; use crate::analytics_processor::AnalyticsProcessor; use crate::handlers::checkpoint_handler::CheckpointHandler; +use crate::handlers::df_handler::DynamicFieldHandler; use crate::handlers::event_handler::EventHandler; use crate::handlers::move_call_handler::MoveCallHandler; use crate::handlers::object_handler::ObjectHandler; @@ -37,8 +39,9 @@ use crate::handlers::transaction_handler::TransactionHandler; use crate::handlers::transaction_objects_handler::TransactionObjectsHandler; use crate::handlers::AnalyticsHandler; use crate::tables::{ - CheckpointEntry, EventEntry, InputObjectKind, MoveCallEntry, MovePackageEntry, ObjectEntry, - ObjectStatus, OwnerType, TransactionEntry, TransactionObjectEntry, + CheckpointEntry, DynamicFieldEntry, EventEntry, InputObjectKind, MoveCallEntry, + MovePackageEntry, ObjectEntry, ObjectStatus, OwnerType, TransactionEntry, + TransactionObjectEntry, }; use crate::writers::csv_writer::CSVWriter; use crate::writers::parquet_writer::ParquetWriter; @@ -60,6 +63,7 @@ const EVENT_DIR_PREFIX: &str = "events"; const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects"; const MOVE_CALL_PREFIX: &str = "move_call"; const MOVE_PACKAGE_PREFIX: &str = "move_package"; +const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field"; #[derive(Parser, Clone, Debug)] #[clap( @@ -312,6 +316,7 @@ pub enum FileType { Event, MoveCall, MovePackage, + DynamicField, } impl FileType { @@ -324,6 +329,7 @@ impl FileType { FileType::Event => Path::from(EVENT_DIR_PREFIX), FileType::MoveCall => Path::from(MOVE_CALL_PREFIX), FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX), + FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX), } } @@ -413,6 +419,18 @@ impl From> for ParquetValue { } } +impl From for ParquetValue { + fn from(value: DynamicFieldType) -> Self { + Self::Str(value.to_string()) + } +} + +impl From> for ParquetValue { + fn from(value: Option) -> Self { + Self::OptionStr(value.map(|v| v.to_string())) + } +} + pub trait ParquetSchema { fn schema() -> Vec; @@ -772,6 +790,33 @@ pub async fn make_move_call_processor( .await } +pub async fn make_dynamic_field_processor( + config: AnalyticsIndexerConfig, + metrics: AnalyticsMetrics, +) -> Result { + let starting_checkpoint_seq_num = + get_starting_checkpoint_seq_num(config.clone(), FileType::DynamicField).await?; + let handler: Box> = Box::new(DynamicFieldHandler::new( + &config.package_cache_path, + &config.rest_url, + )); + let writer = make_writer::( + config.clone(), + FileType::DynamicField, + starting_checkpoint_seq_num, + )?; + let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?; + Processor::new::( + handler, + writer, + max_checkpoint_reader, + starting_checkpoint_seq_num, + metrics, + config, + ) + .await +} + pub fn make_writer( config: AnalyticsIndexerConfig, file_type: FileType, @@ -815,5 +860,6 @@ pub async fn make_analytics_processor( FileType::TransactionObjects => make_transaction_objects_processor(config, metrics).await, FileType::MoveCall => make_move_call_processor(config, metrics).await, FileType::MovePackage => make_move_package_processor(config, metrics).await, + FileType::DynamicField => make_dynamic_field_processor(config, metrics).await, } } diff --git a/crates/sui-analytics-indexer/src/tables.rs b/crates/sui-analytics-indexer/src/tables.rs index e445fdaa35d72..942ec104500ea 100644 --- a/crates/sui-analytics-indexer/src/tables.rs +++ b/crates/sui-analytics-indexer/src/tables.rs @@ -6,7 +6,7 @@ use crate::{ParquetSchema, ParquetValue}; use serde::Serialize; use strum_macros::Display; use sui_analytics_indexer_derive::SerializeParquet; -// use std::collections::BTreeSet; +use sui_types::dynamic_field::DynamicFieldType; // // Table entries for the analytics database. @@ -235,3 +235,21 @@ pub(crate) struct MovePackageEntry { pub(crate) package_version: Option, pub(crate) original_package_id: Option, } + +#[derive(Serialize, Clone, SerializeParquet)] +pub(crate) struct DynamicFieldEntry { + // indexes + pub(crate) parent_object_id: String, + pub(crate) transaction_digest: String, + pub(crate) checkpoint: u64, + pub(crate) epoch: u64, + pub(crate) timestamp_ms: u64, + // df information + pub(crate) name: String, + pub(crate) bcs_name: String, + pub(crate) type_: DynamicFieldType, + pub(crate) object_id: String, + pub(crate) version: u64, + pub(crate) digest: String, + pub(crate) object_type: String, +}