From 9196bde374c2605458de5a8566c142acca232efe Mon Sep 17 00:00:00 2001 From: Xun Li Date: Thu, 19 Dec 2024 17:25:07 -0800 Subject: [PATCH] Integrate with ObjInfo --- .../src/pipeline/concurrent/mod.rs | 12 +++-- .../src/pipeline/concurrent/pruner.rs | 5 +- .../src/pipeline/processor.rs | 3 +- .../src/pipeline/sequential/mod.rs | 2 +- .../sui-indexer-alt/src/handlers/obj_info.rs | 46 ++++++++++++++++++- 5 files changed, 59 insertions(+), 9 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs index d1aa0d0d756e28..23c36ea2db911d 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs @@ -64,7 +64,12 @@ pub trait Handler: Processor { /// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning /// the number of rows affected. This function is optional, and defaults to not pruning at all. - async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result { + async fn prune( + &self, + _from: u64, + _to: u64, + _conn: &mut db::Connection<'_>, + ) -> anyhow::Result { Ok(0) } } @@ -201,9 +206,10 @@ pub(crate) fn pipeline( // the global cancel signal. We achieve this by creating a child cancel token that we call // cancel on once the committer tasks have shutdown. let pruner_cancel = cancel.child_token(); + let handler = Arc::new(handler); let processor = processor( - handler, + handler.clone(), checkpoint_rx, processor_tx, metrics.clone(), @@ -246,7 +252,7 @@ pub(crate) fn pipeline( pruner_cancel.clone(), ); - let pruner = pruner::(pruner_config, db, metrics, pruner_cancel.clone()); + let pruner = pruner(handler, pruner_config, db, metrics, pruner_cancel.clone()); tokio::spawn(async move { let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark); diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index 721035c77713d0..1e8b1a88322d9d 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -33,7 +33,8 @@ use super::{Handler, PrunerConfig}; /// /// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task /// will shutdown immediately. -pub(super) fn pruner( +pub(super) fn pruner( + handler: Arc, config: Option, db: Db, metrics: Arc, @@ -134,7 +135,7 @@ pub(super) fn pruner( }; let (from, to) = watermark.next_chunk(config.max_chunk_size); - let affected = match H::prune(from, to, &mut conn).await { + let affected = match handler.prune(from, to, &mut conn).await { Ok(affected) => { guard.stop_and_record(); watermark.pruner_hi = to as i64; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs index 1bed30eab43067..3e4cb1558ffed1 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs @@ -48,7 +48,7 @@ pub trait Processor { /// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters /// an error -- there is no retry logic at this level. pub(super) fn processor( - processor: P, + processor: Arc

, rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Arc, @@ -61,7 +61,6 @@ pub(super) fn processor( &metrics.latest_processed_checkpoint_timestamp_lag_ms, &metrics.latest_processed_checkpoint, ); - let processor = Arc::new(processor); match ReceiverStream::new(rx) .try_for_each_spawned(P::FANOUT, |checkpoint| { diff --git a/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs index b5f3c33899e224..9a64b842be6341 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs @@ -106,7 +106,7 @@ pub(crate) fn pipeline( let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER); let processor = processor( - handler, + Arc::new(handler), checkpoint_rx, processor_tx, metrics.clone(), diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs index efd485b0027963..96d40569672aa7 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -4,6 +4,7 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{anyhow, Result}; +use diesel::sql_query; use diesel_async::RunQueryDsl; use sui_field_count::FieldCount; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; @@ -18,7 +19,7 @@ use sui_types::{ object::{Object, Owner}, }; -use crate::consistent_pruning::PruningLookupTable; +use crate::consistent_pruning::{PruningInfo, PruningLookupTable}; #[derive(Default)] pub(crate) struct ObjInfo { @@ -48,6 +49,7 @@ impl Processor for ObjInfo { .map(|o| (o.id(), o)) .collect::>(); let mut values: BTreeMap = BTreeMap::new(); + let mut prune_info = PruningInfo::new(); for object_id in checkpoint_input_objects.keys() { if !latest_live_output_objects.contains_key(object_id) { // If an input object is not in the latest live output objects, it must have been deleted @@ -61,6 +63,7 @@ impl Processor for ObjInfo { update: ProcessedObjInfoUpdate::Delete(*object_id), }, ); + prune_info.add_deleted_object(*object_id); } } for (object_id, object) in latest_live_output_objects.iter() { @@ -78,8 +81,15 @@ impl Processor for ObjInfo { update: ProcessedObjInfoUpdate::Insert((*object).clone()), }, ); + // We do not need to prune if the object was created in this checkpoint, + // because this object would not have been in the table prior to this checkpoint. + if checkpoint_input_objects.contains_key(object_id) { + prune_info.add_mutated_object(*object_id); + } } } + self.pruning_lookup_table + .insert(cp_sequence_number, prune_info); Ok(values.into_values().collect()) } @@ -98,6 +108,40 @@ impl Handler for ObjInfo { .execute(conn) .await?) } + + async fn prune(&self, from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + use sui_indexer_alt_schema::schema::obj_info::dsl; + + let to_prune = self.pruning_lookup_table.take(from, to)?; + + // For each (object_id, cp_sequence_number_exclusive), delete all entries in obj_info with + // cp_sequence_number less than cp_sequence_number_exclusive that match the object_id. + + let values = to_prune + .iter() + .map(|(object_id, seq_number)| { + let object_id_hex = hex::encode(object_id); + format!("('\\x{}'::BYTEA, {}::BIGINT)", object_id_hex, seq_number) + }) + .collect::>() + .join(","); + let query = format!( + " + WITH to_prune_data (object_id, cp_sequence_number_exclusive) AS ( + VALUES {} + ) + DELETE FROM obj_info + USING to_prune_data + WHERE obj_info.{:?} = to_prune_data.object_id + AND obj_info.{:?} < to_prune_data.cp_sequence_number_exclusive + ", + values, + dsl::object_id, + dsl::cp_sequence_number, + ); + let rows_deleted = sql_query(query).execute(conn).await?; + Ok(rows_deleted) + } } impl FieldCount for ProcessedObjInfo {