Skip to content

Commit

Permalink
Start indexing dynamic fields in analytics (MystenLabs#15987)
Browse files Browse the repository at this point in the history
## Description 

This PR adds a new indexing pipeline for indexing dynamic fields. The
primary clustering is to be done by `parent_id` to lookup dynamic fields
and objects
## Test Plan 

Running locally and verifying
  • Loading branch information
sadhansood authored Feb 2, 2024
1 parent 44a765b commit 0d86980
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-analytics-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ simulacrum.workspace = true
arrow = { version = "50.0.0"}
gcp-bigquery-client = "0.18.0"
snowflake-api = { version = "0.6.0" }
tap = { version = "1.0.1", features = [] }

[dev-dependencies]

Expand Down
193 changes: 193 additions & 0 deletions crates/sui-analytics-indexer/src/handlers/df_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use fastcrypto::encoding::{Base64, Encoding};
use std::collections::HashMap;
use std::path::Path;
use sui_indexer::errors::IndexerError;
use tap::tap::TapFallible;
use tracing::warn;

use sui_indexer::framework::Handler;
use sui_indexer::types_v2::owner_to_owner_info;
use sui_json_rpc_types::SuiMoveValue;
use sui_package_resolver::Resolver;
use sui_rest_api::{CheckpointData, CheckpointTransaction};
use sui_types::base_types::ObjectID;
use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName, DynamicFieldType};
use sui_types::object::Object;

use crate::handlers::{get_move_struct, AnalyticsHandler};
use crate::package_store::{LocalDBPackageStore, PackageCache};
use crate::tables::DynamicFieldEntry;
use crate::FileType;

pub struct DynamicFieldHandler {
dynamic_fields: Vec<DynamicFieldEntry>,
package_store: LocalDBPackageStore,
resolver: Resolver<PackageCache>,
}

#[async_trait::async_trait]
impl Handler for DynamicFieldHandler {
fn name(&self) -> &str {
"dynamic_field"
}
async fn process_checkpoint(&mut self, checkpoint_data: &CheckpointData) -> Result<()> {
let CheckpointData {
checkpoint_summary,
transactions: checkpoint_transactions,
..
} = checkpoint_data;
for checkpoint_transaction in checkpoint_transactions {
for object in checkpoint_transaction.output_objects.iter() {
self.package_store.update(object)?;
}
self.process_transaction(
checkpoint_summary.epoch,
checkpoint_summary.sequence_number,
checkpoint_summary.timestamp_ms,
checkpoint_transaction,
)
.await?;
}
Ok(())
}
}

#[async_trait::async_trait]
impl AnalyticsHandler<DynamicFieldEntry> for DynamicFieldHandler {
fn read(&mut self) -> Result<Vec<DynamicFieldEntry>> {
let cloned = self.dynamic_fields.clone();
self.dynamic_fields.clear();
Ok(cloned)
}

fn file_type(&self) -> Result<FileType> {
Ok(FileType::DynamicField)
}
}

impl DynamicFieldHandler {
pub fn new(store_path: &Path, rest_uri: &str) -> Self {
let package_store = LocalDBPackageStore::new(&store_path.join("dynamic_field"), rest_uri);
DynamicFieldHandler {
dynamic_fields: vec![],
package_store: package_store.clone(),
resolver: Resolver::new(PackageCache::new(package_store)),
}
}
async fn process_dynamic_field(
&mut self,
epoch: u64,
checkpoint: u64,
timestamp_ms: u64,
object: &Object,
all_written_objects: &HashMap<ObjectID, Object>,
) -> Result<()> {
let move_obj_opt = object.data.try_as_move();
// Skip if not a move object
let Some(move_object) = move_obj_opt else {
return Ok(());
};
if !move_object.type_().is_dynamic_field() {
return Ok(());
}
let move_struct = if let Some((tag, contents)) = object
.struct_tag()
.and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
{
let move_struct = get_move_struct(&tag, contents, &self.resolver).await?;
Some(move_struct)
} else {
None
};
let Some(move_struct) = move_struct else {
return Ok(());
};
let (name_value, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;
let name_type = move_object.type_().try_extract_field_name(&type_)?;

let bcs_name = bcs::to_bytes(&name_value.clone().undecorate()).map_err(|e| {
IndexerError::SerdeError(format!(
"Failed to serialize dynamic field name {:?}: {e}",
name_value
))
})?;
let name = DynamicFieldName {
type_: name_type,
value: SuiMoveValue::from(name_value).to_json_value(),
};
let name_json = serde_json::to_string(&name)?;
let (_owner_type, owner_id) = owner_to_owner_info(&object.owner);
let Some(parent_id) = owner_id else {
return Ok(());
};
let entry = match type_ {
DynamicFieldType::DynamicField => DynamicFieldEntry {
parent_object_id: parent_id.to_string(),
transaction_digest: object.previous_transaction.base58_encode(),
checkpoint,
epoch,
timestamp_ms,
name: name_json,
bcs_name: Base64::encode(bcs_name),
type_,
object_id: object.id().to_string(),
version: object.version().value(),
digest: object.digest().to_string(),
object_type: move_object.clone().into_type().into_type_params()[1]
.to_canonical_string(/* with_prefix */ true),
},
DynamicFieldType::DynamicObject => {
let object =
all_written_objects
.get(&object_id)
.ok_or(IndexerError::UncategorizedError(anyhow::anyhow!(
"Failed to find object_id {:?} when trying to create dynamic field info",
object_id
)))?;
let version = object.version().value();
let digest = object.digest().to_string();
let object_type = object.data.type_().unwrap().clone();
DynamicFieldEntry {
parent_object_id: parent_id.to_string(),
transaction_digest: object.previous_transaction.base58_encode(),
checkpoint,
epoch,
timestamp_ms,
name: name_json,
bcs_name: Base64::encode(bcs_name),
type_,
object_id: object.id().to_string(),
digest,
version,
object_type: object_type.to_canonical_string(true),
}
}
};
self.dynamic_fields.push(entry);
Ok(())
}

async fn process_transaction(
&mut self,
epoch: u64,
checkpoint: u64,
timestamp_ms: u64,
checkpoint_transaction: &CheckpointTransaction,
) -> Result<()> {
let all_objects: HashMap<_, _> = checkpoint_transaction
.output_objects
.iter()
.map(|x| (x.id(), x.clone()))
.collect();
for object in checkpoint_transaction.output_objects.iter() {
self.process_dynamic_field(epoch, checkpoint, timestamp_ms, object, &all_objects)
.await?;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sui-analytics-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::tables::{InputObjectKind, ObjectStatus, OwnerType};
use crate::FileType;

pub mod checkpoint_handler;
pub mod df_handler;
pub mod event_handler;
pub mod move_call_handler;
pub mod object_handler;
Expand Down
50 changes: 48 additions & 2 deletions crates/sui-analytics-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use sui_storage::object_store::util::{
find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix,
};
use sui_types::base_types::EpochId;
use sui_types::dynamic_field::DynamicFieldType;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::analytics_metrics::AnalyticsMetrics;
use crate::analytics_processor::AnalyticsProcessor;
use crate::handlers::checkpoint_handler::CheckpointHandler;
use crate::handlers::df_handler::DynamicFieldHandler;
use crate::handlers::event_handler::EventHandler;
use crate::handlers::move_call_handler::MoveCallHandler;
use crate::handlers::object_handler::ObjectHandler;
Expand All @@ -37,8 +39,9 @@ use crate::handlers::transaction_handler::TransactionHandler;
use crate::handlers::transaction_objects_handler::TransactionObjectsHandler;
use crate::handlers::AnalyticsHandler;
use crate::tables::{
CheckpointEntry, EventEntry, InputObjectKind, MoveCallEntry, MovePackageEntry, ObjectEntry,
ObjectStatus, OwnerType, TransactionEntry, TransactionObjectEntry,
CheckpointEntry, DynamicFieldEntry, EventEntry, InputObjectKind, MoveCallEntry,
MovePackageEntry, ObjectEntry, ObjectStatus, OwnerType, TransactionEntry,
TransactionObjectEntry,
};
use crate::writers::csv_writer::CSVWriter;
use crate::writers::parquet_writer::ParquetWriter;
Expand All @@ -60,6 +63,7 @@ const EVENT_DIR_PREFIX: &str = "events";
const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects";
const MOVE_CALL_PREFIX: &str = "move_call";
const MOVE_PACKAGE_PREFIX: &str = "move_package";
const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field";

#[derive(Parser, Clone, Debug)]
#[clap(
Expand Down Expand Up @@ -312,6 +316,7 @@ pub enum FileType {
Event,
MoveCall,
MovePackage,
DynamicField,
}

impl FileType {
Expand All @@ -324,6 +329,7 @@ impl FileType {
FileType::Event => Path::from(EVENT_DIR_PREFIX),
FileType::MoveCall => Path::from(MOVE_CALL_PREFIX),
FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX),
FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX),
}
}

Expand Down Expand Up @@ -413,6 +419,18 @@ impl From<Option<InputObjectKind>> for ParquetValue {
}
}

impl From<DynamicFieldType> for ParquetValue {
fn from(value: DynamicFieldType) -> Self {
Self::Str(value.to_string())
}
}

impl From<Option<DynamicFieldType>> for ParquetValue {
fn from(value: Option<DynamicFieldType>) -> Self {
Self::OptionStr(value.map(|v| v.to_string()))
}
}

pub trait ParquetSchema {
fn schema() -> Vec<String>;

Expand Down Expand Up @@ -772,6 +790,33 @@ pub async fn make_move_call_processor(
.await
}

pub async fn make_dynamic_field_processor(
config: AnalyticsIndexerConfig,
metrics: AnalyticsMetrics,
) -> Result<Processor> {
let starting_checkpoint_seq_num =
get_starting_checkpoint_seq_num(config.clone(), FileType::DynamicField).await?;
let handler: Box<dyn AnalyticsHandler<DynamicFieldEntry>> = Box::new(DynamicFieldHandler::new(
&config.package_cache_path,
&config.rest_url,
));
let writer = make_writer::<DynamicFieldEntry>(
config.clone(),
FileType::DynamicField,
starting_checkpoint_seq_num,
)?;
let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
Processor::new::<DynamicFieldEntry>(
handler,
writer,
max_checkpoint_reader,
starting_checkpoint_seq_num,
metrics,
config,
)
.await
}

pub fn make_writer<S: Serialize + ParquetSchema>(
config: AnalyticsIndexerConfig,
file_type: FileType,
Expand Down Expand Up @@ -815,5 +860,6 @@ pub async fn make_analytics_processor(
FileType::TransactionObjects => make_transaction_objects_processor(config, metrics).await,
FileType::MoveCall => make_move_call_processor(config, metrics).await,
FileType::MovePackage => make_move_package_processor(config, metrics).await,
FileType::DynamicField => make_dynamic_field_processor(config, metrics).await,
}
}
20 changes: 19 additions & 1 deletion crates/sui-analytics-indexer/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{ParquetSchema, ParquetValue};
use serde::Serialize;
use strum_macros::Display;
use sui_analytics_indexer_derive::SerializeParquet;
// use std::collections::BTreeSet;
use sui_types::dynamic_field::DynamicFieldType;

//
// Table entries for the analytics database.
Expand Down Expand Up @@ -235,3 +235,21 @@ pub(crate) struct MovePackageEntry {
pub(crate) package_version: Option<u64>,
pub(crate) original_package_id: Option<String>,
}

#[derive(Serialize, Clone, SerializeParquet)]
pub(crate) struct DynamicFieldEntry {
// indexes
pub(crate) parent_object_id: String,
pub(crate) transaction_digest: String,
pub(crate) checkpoint: u64,
pub(crate) epoch: u64,
pub(crate) timestamp_ms: u64,
// df information
pub(crate) name: String,
pub(crate) bcs_name: String,
pub(crate) type_: DynamicFieldType,
pub(crate) object_id: String,
pub(crate) version: u64,
pub(crate) digest: String,
pub(crate) object_type: String,
}

0 comments on commit 0d86980

Please sign in to comment.