Skip to content

Commit

Permalink
Integrate with ObjInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 20, 2024
1 parent df5acc7 commit 9196bde
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 9 deletions.
12 changes: 9 additions & 3 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ pub trait Handler: Processor<Value: FieldCount> {

/// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
async fn prune(
&self,
_from: u64,
_to: u64,
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
Ok(0)
}
}
Expand Down Expand Up @@ -201,9 +206,10 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
// the global cancel signal. We achieve this by creating a child cancel token that we call
// cancel on once the committer tasks have shutdown.
let pruner_cancel = cancel.child_token();
let handler = Arc::new(handler);

let processor = processor(
handler,
handler.clone(),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down Expand Up @@ -246,7 +252,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
pruner_cancel.clone(),
);

let pruner = pruner::<H>(pruner_config, db, metrics, pruner_cancel.clone());
let pruner = pruner(handler, pruner_config, db, metrics, pruner_cancel.clone());

tokio::spawn(async move {
let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use super::{Handler, PrunerConfig};
///
/// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task
/// will shutdown immediately.
pub(super) fn pruner<H: Handler + 'static>(
pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
handler: Arc<H>,
config: Option<PrunerConfig>,
db: Db,
metrics: Arc<IndexerMetrics>,
Expand Down Expand Up @@ -134,7 +135,7 @@ pub(super) fn pruner<H: Handler + 'static>(
};

let (from, to) = watermark.next_chunk(config.max_chunk_size);
let affected = match H::prune(from, to, &mut conn).await {
let affected = match handler.prune(from, to, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
watermark.pruner_hi = to as i64;
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt-framework/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait Processor {
/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters
/// an error -- there is no retry logic at this level.
pub(super) fn processor<P: Processor + Send + Sync + 'static>(
processor: P,
processor: Arc<P>,
rx: mpsc::Receiver<Arc<CheckpointData>>,
tx: mpsc::Sender<IndexedCheckpoint<P>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -61,7 +61,6 @@ pub(super) fn processor<P: Processor + Send + Sync + 'static>(
&metrics.latest_processed_checkpoint_timestamp_lag_ms,
&metrics.latest_processed_checkpoint,
);
let processor = Arc::new(processor);

match ReceiverStream::new(rx)
.try_for_each_spawned(P::FANOUT, |checkpoint| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);

let processor = processor(
handler,
Arc::new(handler),
checkpoint_rx,
processor_tx,
metrics.clone(),
Expand Down
46 changes: 45 additions & 1 deletion crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, Result};
use diesel::sql_query;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
Expand All @@ -18,7 +19,7 @@ use sui_types::{
object::{Object, Owner},
};

use crate::consistent_pruning::PruningLookupTable;
use crate::consistent_pruning::{PruningInfo, PruningLookupTable};

#[derive(Default)]
pub(crate) struct ObjInfo {
Expand Down Expand Up @@ -48,6 +49,7 @@ impl Processor for ObjInfo {
.map(|o| (o.id(), o))
.collect::<BTreeMap<_, _>>();
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
let mut prune_info = PruningInfo::new();
for object_id in checkpoint_input_objects.keys() {
if !latest_live_output_objects.contains_key(object_id) {
// If an input object is not in the latest live output objects, it must have been deleted
Expand All @@ -61,6 +63,7 @@ impl Processor for ObjInfo {
update: ProcessedObjInfoUpdate::Delete(*object_id),
},
);
prune_info.add_deleted_object(*object_id);
}
}
for (object_id, object) in latest_live_output_objects.iter() {
Expand All @@ -78,8 +81,15 @@ impl Processor for ObjInfo {
update: ProcessedObjInfoUpdate::Insert((*object).clone()),
},
);
// We do not need to prune if the object was created in this checkpoint,
// because this object would not have been in the table prior to this checkpoint.
if checkpoint_input_objects.contains_key(object_id) {
prune_info.add_mutated_object(*object_id);
}
}
}
self.pruning_lookup_table
.insert(cp_sequence_number, prune_info);

Ok(values.into_values().collect())
}
Expand All @@ -98,6 +108,40 @@ impl Handler for ObjInfo {
.execute(conn)
.await?)
}

async fn prune(&self, from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
use sui_indexer_alt_schema::schema::obj_info::dsl;

let to_prune = self.pruning_lookup_table.take(from, to)?;

// For each (object_id, cp_sequence_number_exclusive), delete all entries in obj_info with
// cp_sequence_number less than cp_sequence_number_exclusive that match the object_id.

let values = to_prune
.iter()
.map(|(object_id, seq_number)| {
let object_id_hex = hex::encode(object_id);
format!("('\\x{}'::BYTEA, {}::BIGINT)", object_id_hex, seq_number)
})
.collect::<Vec<_>>()
.join(",");
let query = format!(
"
WITH to_prune_data (object_id, cp_sequence_number_exclusive) AS (
VALUES {}
)
DELETE FROM obj_info
USING to_prune_data
WHERE obj_info.{:?} = to_prune_data.object_id
AND obj_info.{:?} < to_prune_data.cp_sequence_number_exclusive
",
values,
dsl::object_id,
dsl::cp_sequence_number,
);
let rows_deleted = sql_query(query).execute(conn).await?;
Ok(rows_deleted)
}
}

impl FieldCount for ProcessedObjInfo {
Expand Down

0 comments on commit 9196bde

Please sign in to comment.