Skip to content

Commit

Permalink
as discussed, define the CpMapping handler in the framework, but leav…
Browse files Browse the repository at this point in the history
…e it up to the indexer implementation to leverage it. Since the framework code doesn't depend on it for pruning yet, I've left the doc comments for later
  • Loading branch information
wlmyng committed Dec 19, 2024
1 parent 15f49bc commit 37c3f25
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ CREATE TABLE IF NOT EXISTS cp_mapping
-- The network total transactions at the end of this checkpoint subtracted by the number of
-- transactions in the checkpoint.
tx_lo BIGINT NOT NULL,
-- Exclusive upper transaction sequence number bound for this checkpoint, corresponds to the
-- checkpoint's network total transactions. If this number is equal to `tx_lo`, then this
-- checkpoint contains no transactions.
tx_hi BIGINT NOT NULL,
-- The epoch this checkpoint belongs to.
epoch BIGINT NOT NULL
);
62 changes: 3 additions & 59 deletions crates/sui-indexer-alt-framework/src/handlers/cp_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,18 @@ use anyhow::Result;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_pg_db::{self as db, Connection};
use sui_pg_db::{self as db};
use sui_types::full_checkpoint_content::CheckpointData;

#[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)]
#[diesel(table_name = cp_mapping)]
pub(crate) struct StoredCpMapping {
pub struct StoredCpMapping {
pub cp_sequence_number: i64,
pub tx_lo: i64,
pub tx_hi: i64,
pub epoch: i64,
}

pub struct PrunableRange {
from: StoredCpMapping,
to: StoredCpMapping,
}

pub(crate) struct CpMapping;

impl PrunableRange {
/// Gets the tx and epoch mappings for both the start and end checkpoints.
///
/// The values are expected to exist since the cp_mapping table must have enough information to
/// encompass the retention of other tables.
pub async fn get_range(
conn: &mut Connection<'_>,
from_cp: u64,
to_cp: u64,
) -> QueryResult<PrunableRange> {
let results = cp_mapping::table
.select(StoredCpMapping::as_select())
.filter(cp_mapping::cp_sequence_number.eq_any([from_cp as i64, to_cp as i64]))
.order(cp_mapping::cp_sequence_number.asc())
.load::<StoredCpMapping>(conn)
.await?;

match results.as_slice() {
[first, .., last] => Ok(PrunableRange {
from: first.clone(),
to: last.clone(),
}),
_ => Err(diesel::result::Error::NotFound),
}
}

/// Inclusive start and exclusive end range of prunable checkpoints.
pub fn checkpoint_interval(&self) -> (u64, u64) {
(
self.from.cp_sequence_number as u64,
self.to.cp_sequence_number as u64,
)
}

/// Inclusive start and exclusive end range of prunable txs.
pub fn tx_interval(&self) -> (u64, u64) {
(self.from.tx_lo as u64, self.to.tx_hi as u64)
}

/// Returns the epochs that contain the checkpoints in this range.
///
/// While the checkpoint and tx ranges use exclusive end bounds, the epoch is different in that
/// it represents which epoch the `from` and `to` checkpoints come from.
pub fn containing_epochs(&self) -> (u64, u64) {
(self.from.epoch as u64, self.to.epoch as u64)
}
}
pub struct CpMapping;

impl Processor for CpMapping {
const NAME: &'static str = "cp_mapping";
Expand All @@ -86,12 +32,10 @@ impl Processor for CpMapping {
let network_total_transactions =
checkpoint.checkpoint_summary.network_total_transactions as i64;
let tx_lo = network_total_transactions - checkpoint.transactions.len() as i64;
let tx_hi = network_total_transactions;
let epoch = checkpoint.checkpoint_summary.epoch as i64;
Ok(vec![StoredCpMapping {
cp_sequence_number,
tx_lo,
tx_hi,
epoch,
}])
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-framework/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod cp_mapping;
pub mod cp_mapping;
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use diesel::{
pg::Pg,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use handlers::cp_mapping;
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use pipeline::{
Expand All @@ -24,7 +23,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use watermarks::CommitterWatermark;

mod handlers;
pub mod handlers;
pub mod ingestion;
pub(crate) mod metrics;
pub mod pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
handlers::cp_mapping::PrunableRange,
metrics::IndexerMetrics,
pipeline::logging::{LoggerWatermark, WatermarkLogger},
watermarks::PrunerWatermark,
Expand Down
6 changes: 1 addition & 5 deletions crates/sui-indexer-alt-framework/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ diesel::table! {
cp_mapping (cp_sequence_number) {
cp_sequence_number -> Int8,
tx_lo -> Int8,
tx_hi -> Int8,
epoch -> Int8,
}
}
Expand All @@ -24,7 +23,4 @@ diesel::table! {
}
}

diesel::allow_tables_to_appear_in_same_query!(
cp_mapping,
watermarks,
);
diesel::allow_tables_to_appear_in_same_query!(cp_mapping, watermarks,);
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub struct PipelineLayer {
pub coin_balance_buckets_pruner: Option<ConcurrentLayer>,

// All concurrent pipelines
pub cp_mapping: Option<ConcurrentLayer>,
pub ev_emit_mod: Option<ConcurrentLayer>,
pub ev_struct_inst: Option<ConcurrentLayer>,
pub kv_checkpoints: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -273,6 +274,7 @@ impl PipelineLayer {
obj_info_pruner: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
coin_balance_buckets_pruner: Some(Default::default()),
cp_mapping: Some(Default::default()),
ev_emit_mod: Some(Default::default()),
ev_struct_inst: Some(Default::default()),
kv_checkpoints: Some(Default::default()),
Expand Down Expand Up @@ -409,6 +411,7 @@ impl Merge for PipelineLayer {
coin_balance_buckets_pruner: self
.coin_balance_buckets_pruner
.merge(other.coin_balance_buckets_pruner),
cp_mapping: self.cp_mapping.merge(other.cp_mapping),
ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod),
ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst),
kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints),
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use handlers::{
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds,
};
use sui_indexer_alt_framework::handlers::cp_mapping::CpMapping;
use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};
use sui_indexer_alt_framework::pipeline::{
concurrent::{ConcurrentConfig, PrunerConfig},
Expand Down Expand Up @@ -62,6 +63,7 @@ pub async fn start_indexer(
obj_info_pruner,
coin_balance_buckets,
coin_balance_buckets_pruner,
cp_mapping,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
Expand Down Expand Up @@ -201,6 +203,7 @@ pub async fn start_indexer(
);

// Unpruned concurrent pipelines
add_concurrent!(CpMapping, cp_mapping);
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
add_concurrent!(KvCheckpoints, kv_checkpoints);
Expand Down

0 comments on commit 37c3f25

Please sign in to comment.