From 368fc4b5748aea7a21202f0f3f3d82be477ec338 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 21 Oct 2024 23:04:21 +0100 Subject: [PATCH] refactor(indexer-alt): pull out `pipeline` (#19933) ## Descrption Separate the `pipeline` abstraction from the `handler` abstraction, and put it into its own module. This is in preparation for: - Splitting up the batching and committing processes. This will make it possible to test committing multiple chunks in parallel, and it may be possible to re-use some of this logic in... - an `in_order::pipeline` to use for pipelines that need to write out data strictly in checkpoint order and along checkpoint boundaries. ## Test plan Existing tests and CI. ## Stack - #19926 - #19932 --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../src/handlers/ev_emit_mod.rs | 2 +- .../src/handlers/ev_struct_inst.rs | 2 +- .../src/handlers/kv_checkpoints.rs | 2 +- .../src/handlers/kv_objects.rs | 2 +- .../src/handlers/kv_transactions.rs | 2 +- crates/sui-indexer-alt/src/handlers/mod.rs | 609 +----------------- .../src/handlers/tx_affected_objects.rs | 2 +- .../src/handlers/tx_balance_changes.rs | 2 +- crates/sui-indexer-alt/src/lib.rs | 18 +- crates/sui-indexer-alt/src/main.rs | 14 +- .../src/pipeline/concurrent/committer.rs | 434 +++++++++++++ .../src/pipeline/concurrent/mod.rs | 48 ++ crates/sui-indexer-alt/src/pipeline/mod.rs | 57 ++ .../sui-indexer-alt/src/pipeline/processor.rs | 119 ++++ 14 files changed, 685 insertions(+), 628 deletions(-) create mode 100644 crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs create mode 100644 crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs create mode 100644 crates/sui-indexer-alt/src/pipeline/mod.rs create mode 100644 crates/sui-indexer-alt/src/pipeline/processor.rs diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index 0e157e6ea6c88..72da553d00de3 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -23,7 +23,7 @@ impl Handler for EvEmitMod { type Value = StoredEvEmitMod; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let CheckpointData { transactions, checkpoint_summary, diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index 3b224f5b10236..67be64b49a292 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -23,7 +23,7 @@ impl Handler for EvStructInst { type Value = StoredEvStructInst; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let CheckpointData { transactions, checkpoint_summary, diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index b310a088ebd07..ff58ed0f8d5f8 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -19,7 +19,7 @@ impl Handler for KvCheckpoints { type Value = StoredCheckpoint; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let sequence_number = checkpoint.checkpoint_summary.sequence_number as i64; Ok(vec![StoredCheckpoint { sequence_number, diff --git a/crates/sui-indexer-alt/src/handlers/kv_objects.rs b/crates/sui-indexer-alt/src/handlers/kv_objects.rs index 6eeaecea8c9d2..a6a5079b23740 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_objects.rs @@ -23,7 +23,7 @@ impl Handler for KvObjects { type Value = StoredObject; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let deleted_objects = checkpoint .eventually_removed_object_refs_post_version() .into_iter() diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 7c8241724e5e9..1cd16b6219156 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -23,7 +23,7 @@ impl Handler for KvTransactions { type Value = StoredTransaction; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let CheckpointData { transactions, checkpoint_summary, diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index bddf875a64c7a..69d11daae1298 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -1,29 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{ - collections::{BTreeMap, BTreeSet}, - sync::Arc, - time::Duration, -}; +use std::sync::Arc; -use futures::TryStreamExt; -use mysten_metrics::spawn_monitored_task; use sui_types::full_checkpoint_content::CheckpointData; -use tokio::{ - sync::mpsc, - task::JoinHandle, - time::{interval, MissedTickBehavior}, -}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; -use crate::{ - db::{self, Db}, - metrics::IndexerMetrics, - models::watermarks::{CommitterWatermark, Ordering}, -}; +use crate::db; pub mod ev_emit_mod; pub mod ev_struct_inst; @@ -33,20 +15,6 @@ pub mod kv_transactions; pub mod tx_affected_objects; pub mod tx_balance_changes; -/// Extra buffer added to the channel between the handler and the committer. There does not need to -/// be a huge capacity here because the committer is already buffering rows to insert internally. -const COMMITTER_BUFFER: usize = 5; - -/// The committer will wait at least this long between commits for any given pipeline. -const COOLDOWN_INTERVAL: Duration = Duration::from_millis(20); - -/// The committer will wait at least this long between attempts to commit a failed batch. -const RETRY_INTERVAL: Duration = Duration::from_millis(100); - -/// Tracing message for the watermark update will be logged at info level at least this many -/// checkpoints. -const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10; - /// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data into /// rows for their table, and how to write those rows to the database. /// @@ -80,581 +48,10 @@ pub trait Handler { type Value: Send + Sync + 'static; /// The processing logic for turning a checkpoint into rows of the table. - fn handle(checkpoint: &Arc) -> anyhow::Result>; + fn process(checkpoint: &Arc) -> anyhow::Result>; /// Take a chunk of values and commit them to the database, returning the number of rows /// affected. async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> anyhow::Result; } - -#[derive(clap::Args, Debug, Clone)] -pub struct CommitterConfig { - /// Committer will check for pending data at least this often - #[arg( - long, - default_value = "500", - value_name = "MILLISECONDS", - value_parser = |s: &str| s.parse().map(Duration::from_millis), - )] - commit_interval: Duration, - - /// Avoid writing to the watermark table - #[arg(long)] - skip_watermark: bool, -} - -/// A batch of processed values associated with a single checkpoint. This is an internal type used -/// to communicate between the handler and the committer parts of the pipeline. -struct Indexed { - /// Epoch this data is from - epoch: u64, - /// Checkpoint this data is from - cp_sequence_number: u64, - /// Max (exclusive) transaction sequence number in this batch - tx_hi: u64, - /// Values to be inserted into the database from this checkpoint - values: Vec, -} - -impl Indexed { - /// Split apart the information in this indexed checkpoint into its watermark and the values to - /// add to the database. - fn into_batch(self) -> (CommitterWatermark<'static>, Vec) { - let watermark = CommitterWatermark { - pipeline: H::NAME.into(), - epoch_hi_inclusive: self.epoch as i64, - checkpoint_hi_inclusive: self.cp_sequence_number as i64, - tx_hi: self.tx_hi as i64, - }; - - (watermark, self.values) - } -} - -/// Start a new indexing pipeline served by the handler, `H`. Starting strictly after the -/// `watermark` (or from the beginning if no watermark was provided). -/// -/// Each pipeline consists of a handler task which takes checkpoint data and breaks it down into -/// rows, ready for insertion, and a committer which writes those rows out to the database. -/// -/// Checkpoint data is fed into the pipeline through the `handler_rx` channel, and an internal -/// channel is created to communicate checkpoint-wise data to the committer. The pipeline can be -/// shutdown using its `cancel` token. -pub fn pipeline( - watermark: Option>, - config: CommitterConfig, - db: Db, - handler_rx: mpsc::Receiver>, - metrics: Arc, - cancel: CancellationToken, -) -> (JoinHandle<()>, JoinHandle<()>) { - let (handler_tx, committer_rx) = mpsc::channel(H::FANOUT + COMMITTER_BUFFER); - - let handler = handler::(handler_rx, handler_tx, metrics.clone(), cancel.clone()); - let committer = committer::(watermark, config, committer_rx, db, metrics, cancel); - - (handler, committer) -} - -/// The handler task is responsible for taking checkpoint data and breaking it down into rows ready -/// to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and -/// distributes them among `H::FANOUT` workers. -/// -/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx` -/// channel. -/// -/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters -/// an error -- there is no retry logic at this level. -fn handler( - rx: mpsc::Receiver>, - tx: mpsc::Sender>, - metrics: Arc, - cancel: CancellationToken, -) -> JoinHandle<()> { - /// Internal type used by workers to propagate errors or shutdown signals up to their - /// supervisor. - #[derive(thiserror::Error, Debug)] - enum Break { - #[error("Shutdown received")] - Cancel, - - #[error(transparent)] - Err(#[from] anyhow::Error), - } - - spawn_monitored_task!(async move { - info!(pipeline = H::NAME, "Starting handler"); - match ReceiverStream::new(rx) - .map(Ok) - .try_for_each_concurrent(H::FANOUT, |checkpoint| { - let tx = tx.clone(); - let metrics = metrics.clone(); - let cancel = cancel.clone(); - async move { - if cancel.is_cancelled() { - return Err(Break::Cancel); - } - - metrics - .total_handler_checkpoints_received - .with_label_values(&[H::NAME]) - .inc(); - - let guard = metrics - .handler_checkpoint_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - let values = H::handle(&checkpoint)?; - let elapsed = guard.stop_and_record(); - - let epoch = checkpoint.checkpoint_summary.epoch; - let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; - let tx_hi = checkpoint.checkpoint_summary.network_total_transactions; - - debug!( - pipeline = H::NAME, - checkpoint = cp_sequence_number, - elapsed_ms = elapsed * 1000.0, - "Processed checkpoint", - ); - - metrics - .total_handler_checkpoints_processed - .with_label_values(&[H::NAME]) - .inc(); - - metrics - .total_handler_rows_created - .with_label_values(&[H::NAME]) - .inc_by(values.len() as u64); - - tx.send(Indexed { - epoch, - cp_sequence_number, - tx_hi, - values, - }) - .await - .map_err(|_| Break::Cancel)?; - - Ok(()) - } - }) - .await - { - Ok(()) => { - info!(pipeline = H::NAME, "Checkpoints done, stopping handler"); - } - - Err(Break::Cancel) => { - info!(pipeline = H::NAME, "Shutdown received, stopping handler"); - } - - Err(Break::Err(e)) => { - error!(pipeline = H::NAME, "Error from handler: {e}"); - cancel.cancel(); - } - }; - }) -} - -/// The committer task is responsible for gathering rows to write to the database. It is a single -/// work loop that gathers checkpoint-wise row information, and periodically writes them out to the -/// database. -/// -/// The period between writes is controlled by the following factors: -/// -/// - Time since the last write (controlled by `config.commit_interval`). If there are rows pending -/// and this interval has elapsed since the last attempted commit, the committer will attempt -/// another write. -/// -/// - Time since last attempted write (controlled by `COOLDOWN_INTERVAL` and `RETRY_INTERVAL`). If -/// there was a recent successful write, the next write will wait at least `COOLDOWN_INTERVAL`, -/// and if there was a recent unsuccessful write, the next write will wait at least -/// `RETRY_INTERVAL`. This is to prevent one committer from hogging the database. -/// -/// - Number of pending rows. If this exceeds `H::BATCH_SIZE`, the committer will attempt to write -/// out at most `H::CHUNK_SIZE` worth of rows to the DB. -/// -/// If a write fails, the committer will save the batch it meant to write and try to write them -/// again at the next opportunity, potentially adding more rows to the batch if more have arrived -/// in the interim. -/// -/// This task will shutdown if canceled via the `cancel` token, or if the channel it receives data -/// on has been closed by the handler for some reason. -fn committer( - watermark: Option>, - config: CommitterConfig, - mut rx: mpsc::Receiver>, - db: Db, - metrics: Arc, - cancel: CancellationToken, -) -> JoinHandle<()> { - spawn_monitored_task!(async move { - // The `poll` interval controls the maximum time to wait between commits, regardless of the - // amount of data available. - let mut poll = interval(config.commit_interval); - let mut cool = interval(COOLDOWN_INTERVAL); - - // We don't care about keeping a regular cadence -- these intervals are used to guarantee - // things are spaced at out relative to each other. - poll.set_missed_tick_behavior(MissedTickBehavior::Delay); - cool.set_missed_tick_behavior(MissedTickBehavior::Delay); - - // Buffer to gather the next batch to write. This may be non-empty at the top of a tick of - // the committer's loop if the previous attempt at a write failed. Attempt is incremented - // every time a batch write fails, and is reset when it succeeds. - let mut attempt = 0; - let mut batch_values = vec![]; - let mut batch_watermarks = vec![]; - - // Data for checkpoints that haven't been written yet. Note that `pending_rows` includes - // rows in `batch`. - let mut pending: BTreeMap> = BTreeMap::new(); - let mut pending_rows = 0; - - // Track the high watermark for the pipeline. The pipeline confirms that it has written all - // checkpoint data up from the watermark it is initialised with up to and including this - // watermark. - // - // To correctly update the watermark, the committer tracks the watermark it last tried to - // write and the watermarks for any checkpoints that have been written since then - // ("pre-committed"). After each batch is written, the committer will try to progress the - // watermark as much as possible without going over any holes in the sequence of - // checkpoints. - // - // NOTE: When no watermark is provided, it is assumed that the pipeline is starting from - // scratch, but we still initialize it as if it is at (after) the genesis checkpoint. This - // means we never write a watermark for the genesis checkpoint, but would wait for another - // checkpoint to be written out before updating the watermark, which is fine in practice - // and simplifies the logic of tracking watermarks. - let mut precommitted: BTreeSet> = BTreeSet::new(); - let mut watermark = - watermark.unwrap_or_else(|| CommitterWatermark::initial(H::NAME.into())); - - // The committer will periodically output a log message at a higher log level to - // demonstrate that the pipeline is making progress. - let mut next_loud_watermark_update = - watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; - - info!(pipeline = H::NAME, ?watermark, "Starting committer"); - - loop { - tokio::select! { - // Break ties in favour of operations that reduce the size of the buffer. - // - // TODO (experiment): Do we need this? It adds some complexity/subtlety to this - // work loop, so if we don't notice a big difference, we should get rid of it. - biased; - - _ = cancel.cancelled() => { - info!(pipeline = H::NAME, "Shutdown received, stopping committer"); - break; - } - - // Time to write out another batch of rows, and update the watermark, if we can. - _ = poll.tick() => { - let Ok(mut conn) = db.connect().await else { - warn!(pipeline = H::NAME, "Failed to get connection for DB"); - cool.reset(); - continue; - }; - - let guard = metrics - .committer_gather_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - while batch_values.len() < H::CHUNK_SIZE { - let Some(mut entry) = pending.first_entry() else { - break; - }; - - let indexed = entry.get_mut(); - let values = &mut indexed.values; - if batch_values.len() + values.len() > H::CHUNK_SIZE { - let mut for_batch = values.split_off(H::CHUNK_SIZE - batch_values.len()); - std::mem::swap(values, &mut for_batch); - batch_values.extend(for_batch); - break; - } else { - let (watermark, values) = entry.remove().into_batch(); - batch_values.extend(values); - batch_watermarks.push(watermark); - } - } - - let elapsed = guard.stop_and_record(); - debug!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - rows = batch_values.len(), - pending = pending_rows, - "Gathered batch", - ); - - // TODO (experiment): Switch to COPY FROM, which should offer faster inserts? - // - // Note that COPY FROM cannot handle conflicts -- we need a way to gracefully - // fail over to `INSERT INTO ... ON CONFLICT DO NOTHING` if we encounter a - // conflict, and in that case, we are also subject to the same constraints on - // number of bind parameters as we had before. - // - // Postgres 17 supports an ON_ERROR option for COPY FROM which can ignore bad - // rows, but CloudSQL does not support Postgres 17 yet, and this directive only - // works if the FORMAT is set to TEXT or CSV, which are less efficient over the - // wire. - // - // The hope is that in the steady state, there will not be conflicts (they - // should only show up during backfills, or when we are handing off between - // indexers), so we can use a fallback mechanism for those cases but rely on - // COPY FROM most of the time. - // - // Note that the introduction of watermarks also complicates hand-over between - // two indexers writing to the same table: They cannot both update the - // watermark. One needs to subordinate to the other (or we need to set the - // watermark to the max of what is currently set and what was about to be - // written). - - metrics - .total_committer_batches_attempted - .with_label_values(&[H::NAME]) - .inc(); - - metrics - .committer_batch_size - .with_label_values(&[H::NAME]) - .observe(batch_values.len() as f64); - - let guard = metrics - .committer_commit_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - // TODO (experiment): Parallelize batch writes? - // - // Previous findings suggest that having about 5 parallel committers per table - // yields the best performance. Is that still true for this new architecture? - // If we go down this route, we should consider factoring that work out into a - // separate task that also handles the watermark. - - let affected = if batch_values.is_empty() { - 0 - } else { - match H::commit(&batch_values, &mut conn).await { - Ok(affected) => affected, - - Err(e) => { - let elapsed = guard.stop_and_record(); - - error!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - attempt, - committed = batch_values.len(), - pending = pending_rows, - "Error writing batch: {e}", - ); - - cool.reset_after(RETRY_INTERVAL); - attempt += 1; - continue; - } - } - }; - - let elapsed = guard.stop_and_record(); - - metrics - .total_committer_rows_committed - .with_label_values(&[H::NAME]) - .inc_by(batch_values.len() as u64); - - metrics - .total_committer_rows_affected - .with_label_values(&[H::NAME]) - .inc_by(affected as u64); - - debug!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - attempt, - affected, - committed = batch_values.len(), - pending = pending_rows, - "Wrote batch", - ); - - pending_rows -= batch_values.len(); - attempt = 0; - - precommitted.extend(batch_watermarks.drain(..)); - batch_values.clear(); - - // Check if the pipeline's watermark needs to be updated - let guard = metrics - .watermark_gather_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - let mut watermark_needs_update = false; - while let Some(pending) = precommitted.first() { - match watermark.next_cmp(pending) { - Ordering::Future => break, - Ordering::Past => { - // Out of order watermarks can be encountered when a pipeline is - // starting up, because ingestion must start at the lowest - // checkpoint across all pipelines, or because of a backfill, where - // the initial checkpoint has been overridden. - // - // Track how many we see to make sure it doesn't grow without - // bound. - metrics - .total_watermarks_out_of_order - .with_label_values(&[H::NAME]) - .inc(); - - precommitted.pop_first().unwrap(); - } - - Ordering::Next => { - // SAFETY: `precommitted` is known to be non-empty because of the loop - // condition. - watermark = precommitted.pop_first().unwrap(); - watermark_needs_update = true; - } - } - } - - let elapsed = guard.stop_and_record(); - - metrics - .watermark_epoch - .with_label_values(&[H::NAME]) - .set(watermark.epoch_hi_inclusive); - - metrics - .watermark_checkpoint - .with_label_values(&[H::NAME]) - .set(watermark.checkpoint_hi_inclusive); - - metrics - .watermark_transaction - .with_label_values(&[H::NAME]) - .set(watermark.tx_hi); - - debug!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - watermark = watermark.checkpoint_hi_inclusive, - pending = precommitted.len(), - "Gathered watermarks", - ); - - if !config.skip_watermark && watermark_needs_update { - let guard = metrics - .watermark_commit_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - match watermark.update(&mut conn).await { - // If there's an issue updating the watermark, log it but keep going, - // it's OK for the watermark to lag from a correctness perspective. - Err(e) => { - let elapsed = guard.stop_and_record(); - error!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - ?watermark, - "Error updating watermark: {e}", - ); - } - - Ok(updated) => { - let elapsed = guard.stop_and_record(); - - if updated { - metrics - .watermark_epoch_in_db - .with_label_values(&[H::NAME]) - .set(watermark.epoch_hi_inclusive); - - metrics - .watermark_checkpoint_in_db - .with_label_values(&[H::NAME]) - .set(watermark.checkpoint_hi_inclusive); - - metrics - .watermark_transaction_in_db - .with_label_values(&[H::NAME]) - .set(watermark.tx_hi); - } - - if watermark.checkpoint_hi_inclusive > next_loud_watermark_update { - next_loud_watermark_update += LOUD_WATERMARK_UPDATE_INTERVAL; - info!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - updated, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - "Watermark", - ); - } else { - debug!( - pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, - updated, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - "Watermark", - ); - } - } - } - } - - // TODO (amnn): Test this behaviour (requires tempdb and migrations). - if pending_rows == 0 && rx.is_closed() { - info!(pipeline = H::NAME, "Handler closed channel, pending rows empty, stopping committer"); - break; - } - - cool.reset(); - } - - // If there are enough pending rows, and we've expended the cooldown, reset the - // commit polling interval so that on the next iteration of the loop, we will write - // out another batch. - // - // TODO (experiment): Do we need this cooldown to deal with contention on the - // connection pool? It's possible that this is just going to eat into our - // throughput. - _ = cool.tick(), if pending_rows > H::BATCH_SIZE => { - poll.reset_immediately(); - } - - Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => { - metrics - .total_committer_rows_received - .with_label_values(&[H::NAME]) - .inc_by(indexed.values.len() as u64); - - if indexed.values.is_empty() { - // If the handler sends an empty batch, short-circuit the commit, and add - // it directly to the pre-commit list. - let (watermark, _) = indexed.into_batch(); - precommitted.insert(watermark); - } else { - pending_rows += indexed.values.len(); - pending.insert(indexed.cp_sequence_number, indexed); - } - } - } - } - }) -} diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index b8faa18a52ae4..b001c6c4819c5 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -23,7 +23,7 @@ impl Handler for TxAffectedObjects { type Value = StoredTxAffectedObject; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let CheckpointData { transactions, checkpoint_summary, diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index f23e25dae71ec..5c64b6c9564b3 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -32,7 +32,7 @@ impl Handler for TxBalanceChanges { type Value = StoredTxBalanceChange; - fn handle(checkpoint: &Arc) -> Result> { + fn process(checkpoint: &Arc) -> Result> { let CheckpointData { transactions, checkpoint_summary, diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index be41495611756..652ca646b98a7 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -5,10 +5,11 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; use anyhow::{Context, Result}; use db::{Db, DbConfig}; -use handlers::{pipeline, CommitterConfig, Handler}; +use handlers::Handler; use ingestion::{IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; use models::watermarks::CommitterWatermark; +use pipeline::{concurrent, PipelineConfig}; use task::graceful_shutdown; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -20,6 +21,7 @@ pub mod handlers; pub mod ingestion; pub mod metrics; pub mod models; +pub mod pipeline; pub mod schema; pub mod task; @@ -37,7 +39,7 @@ pub struct Indexer { ingestion_service: IngestionService, /// Parameters for the committers of each pipeline. - committer_config: CommitterConfig, + pipeline_config: PipelineConfig, /// Optional override of the checkpoint lowerbound. first_checkpoint: Option, @@ -69,7 +71,7 @@ pub struct IndexerConfig { pub db_config: DbConfig, #[command(flatten)] - pub committer_config: CommitterConfig, + pub pipeline_config: PipelineConfig, /// Override for the checkpoint to start ingestion from -- useful for backfills. By default, /// ingestion will start just after the lowest checkpoint watermark across all active @@ -97,7 +99,7 @@ impl Indexer { let IndexerConfig { ingestion_config, db_config, - committer_config, + pipeline_config, first_checkpoint, last_checkpoint, pipeline, @@ -118,7 +120,7 @@ impl Indexer { metrics, metrics_service, ingestion_service, - committer_config, + pipeline_config, first_checkpoint, last_checkpoint, enabled_pipelines: pipeline.into_iter().collect(), @@ -130,7 +132,7 @@ impl Indexer { /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, /// they will be idle until the ingestion service starts, and serves it checkpoint data. - pub async fn pipeline(&mut self) -> Result<()> { + pub async fn concurrent_pipeline(&mut self) -> Result<()> { if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(H::NAME) { info!("Skipping pipeline {}", H::NAME); return Ok(()); @@ -148,9 +150,9 @@ impl Indexer { .map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1) .min(self.first_checkpoint_from_watermark); - let (handler, committer) = pipeline::( + let (handler, committer) = concurrent::pipeline::( watermark, - self.committer_config.clone(), + self.pipeline_config.clone(), self.db.clone(), self.ingestion_service.subscribe().0, self.metrics.clone(), diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index dfcdd86d8d4ee..77d90d3fe0cb3 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -27,13 +27,13 @@ async fn main() -> Result<()> { let mut indexer = Indexer::new(args.indexer_config, cancel.clone()).await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; - indexer.pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; let h_indexer = indexer.run().await.context("Failed to start indexer")?; diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs new file mode 100644 index 0000000000000..d1b56edc7a5f3 --- /dev/null +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs @@ -0,0 +1,434 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, + time::Duration, +}; + +use mysten_metrics::spawn_monitored_task; +use tokio::{ + sync::mpsc, + task::JoinHandle, + time::{interval, MissedTickBehavior}, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +use crate::{ + db::Db, + handlers::Handler, + metrics::IndexerMetrics, + models::watermarks::{CommitterWatermark, Ordering}, + pipeline::{Indexed, PipelineConfig}, +}; + +/// The committer will wait at least this long between commits for any given pipeline. +const COOLDOWN_INTERVAL: Duration = Duration::from_millis(20); + +/// The committer will wait at least this long between attempts to commit a failed batch. +const RETRY_INTERVAL: Duration = Duration::from_millis(100); + +/// Tracing message for the watermark update will be logged at info level at least this many +/// checkpoints. +const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10; + +/// The committer task is responsible for gathering rows to write to the database. It is a single +/// work loop that gathers checkpoint-wise row information, and periodically writes them out to the +/// database. +/// +/// The period between writes is controlled by the following factors: +/// +/// - Time since the last write (controlled by `config.commit_interval`). If there are rows pending +/// and this interval has elapsed since the last attempted commit, the committer will attempt +/// another write. +/// +/// - Time since last attempted write (controlled by `COOLDOWN_INTERVAL` and `RETRY_INTERVAL`). If +/// there was a recent successful write, the next write will wait at least `COOLDOWN_INTERVAL`, +/// and if there was a recent unsuccessful write, the next write will wait at least +/// `RETRY_INTERVAL`. This is to prevent one committer from hogging the database. +/// +/// - Number of pending rows. If this exceeds `H::BATCH_SIZE`, the committer will attempt to write +/// out at most `H::CHUNK_SIZE` worth of rows to the DB. +/// +/// If a write fails, the committer will save the batch it meant to write and try to write them +/// again at the next opportunity, potentially adding more rows to the batch if more have arrived +/// in the interim. +/// +/// This task will shutdown if canceled via the `cancel` token, or if the channel it receives data +/// on has been closed by the handler for some reason. +pub(super) fn committer( + watermark: Option>, + config: PipelineConfig, + mut rx: mpsc::Receiver>, + db: Db, + metrics: Arc, + cancel: CancellationToken, +) -> JoinHandle<()> { + spawn_monitored_task!(async move { + // The `poll` interval controls the maximum time to wait between commits, regardless of the + // amount of data available. + let mut poll = interval(config.commit_interval); + let mut cool = interval(COOLDOWN_INTERVAL); + + // We don't care about keeping a regular cadence -- these intervals are used to guarantee + // things are spaced at out relative to each other. + poll.set_missed_tick_behavior(MissedTickBehavior::Delay); + cool.set_missed_tick_behavior(MissedTickBehavior::Delay); + + // Buffer to gather the next batch to write. This may be non-empty at the top of a tick of + // the committer's loop if the previous attempt at a write failed. Attempt is incremented + // every time a batch write fails, and is reset when it succeeds. + let mut attempt = 0; + let mut batch_values = vec![]; + let mut batch_watermarks = vec![]; + + // Data for checkpoints that haven't been written yet. Note that `pending_rows` includes + // rows in `batch`. + let mut pending: BTreeMap> = BTreeMap::new(); + let mut pending_rows = 0; + + // Track the high watermark for the pipeline. The pipeline confirms that it has written all + // checkpoint data up from the watermark it is initialised with up to and including this + // watermark. + // + // To correctly update the watermark, the committer tracks the watermark it last tried to + // write and the watermarks for any checkpoints that have been written since then + // ("pre-committed"). After each batch is written, the committer will try to progress the + // watermark as much as possible without going over any holes in the sequence of + // checkpoints. + // + // NOTE: When no watermark is provided, it is assumed that the pipeline is starting from + // scratch, but we still initialize it as if it is at (after) the genesis checkpoint. This + // means we never write a watermark for the genesis checkpoint, but would wait for another + // checkpoint to be written out before updating the watermark, which is fine in practice + // and simplifies the logic of tracking watermarks. + let mut precommitted: BTreeSet> = BTreeSet::new(); + let mut watermark = + watermark.unwrap_or_else(|| CommitterWatermark::initial(H::NAME.into())); + + // The committer will periodically output a log message at a higher log level to + // demonstrate that the pipeline is making progress. + let mut next_loud_watermark_update = + watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; + + info!(pipeline = H::NAME, ?watermark, "Starting committer"); + + loop { + tokio::select! { + // Break ties in favour of operations that reduce the size of the buffer. + // + // TODO (experiment): Do we need this? It adds some complexity/subtlety to this + // work loop, so if we don't notice a big difference, we should get rid of it. + biased; + + _ = cancel.cancelled() => { + info!(pipeline = H::NAME, "Shutdown received, stopping committer"); + break; + } + + // Time to write out another batch of rows, and update the watermark, if we can. + _ = poll.tick() => { + let Ok(mut conn) = db.connect().await else { + warn!(pipeline = H::NAME, "Failed to get connection for DB"); + cool.reset(); + continue; + }; + + let guard = metrics + .committer_gather_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + while batch_values.len() < H::CHUNK_SIZE { + let Some(mut entry) = pending.first_entry() else { + break; + }; + + let indexed = entry.get_mut(); + let values = &mut indexed.values; + if batch_values.len() + values.len() > H::CHUNK_SIZE { + let mut for_batch = values.split_off(H::CHUNK_SIZE - batch_values.len()); + std::mem::swap(values, &mut for_batch); + batch_values.extend(for_batch); + break; + } else { + let (watermark, values) = entry.remove().into_batch(); + batch_values.extend(values); + batch_watermarks.push(watermark); + } + } + + let elapsed = guard.stop_and_record(); + debug!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + rows = batch_values.len(), + pending = pending_rows, + "Gathered batch", + ); + + // TODO (experiment): Switch to COPY FROM, which should offer faster inserts? + // + // Note that COPY FROM cannot handle conflicts -- we need a way to gracefully + // fail over to `INSERT INTO ... ON CONFLICT DO NOTHING` if we encounter a + // conflict, and in that case, we are also subject to the same constraints on + // number of bind parameters as we had before. + // + // Postgres 17 supports an ON_ERROR option for COPY FROM which can ignore bad + // rows, but CloudSQL does not support Postgres 17 yet, and this directive only + // works if the FORMAT is set to TEXT or CSV, which are less efficient over the + // wire. + // + // The hope is that in the steady state, there will not be conflicts (they + // should only show up during backfills, or when we are handing off between + // indexers), so we can use a fallback mechanism for those cases but rely on + // COPY FROM most of the time. + // + // Note that the introduction of watermarks also complicates hand-over between + // two indexers writing to the same table: They cannot both update the + // watermark. One needs to subordinate to the other (or we need to set the + // watermark to the max of what is currently set and what was about to be + // written). + + metrics + .total_committer_batches_attempted + .with_label_values(&[H::NAME]) + .inc(); + + metrics + .committer_batch_size + .with_label_values(&[H::NAME]) + .observe(batch_values.len() as f64); + + let guard = metrics + .committer_commit_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + // TODO (experiment): Parallelize batch writes? + // + // Previous findings suggest that having about 5 parallel committers per table + // yields the best performance. Is that still true for this new architecture? + // If we go down this route, we should consider factoring that work out into a + // separate task that also handles the watermark. + + let affected = if batch_values.is_empty() { + 0 + } else { + match H::commit(&batch_values, &mut conn).await { + Ok(affected) => affected, + + Err(e) => { + let elapsed = guard.stop_and_record(); + + error!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + attempt, + committed = batch_values.len(), + pending = pending_rows, + "Error writing batch: {e}", + ); + + cool.reset_after(RETRY_INTERVAL); + attempt += 1; + continue; + } + } + }; + + let elapsed = guard.stop_and_record(); + + metrics + .total_committer_rows_committed + .with_label_values(&[H::NAME]) + .inc_by(batch_values.len() as u64); + + metrics + .total_committer_rows_affected + .with_label_values(&[H::NAME]) + .inc_by(affected as u64); + + debug!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + attempt, + affected, + committed = batch_values.len(), + pending = pending_rows, + "Wrote batch", + ); + + pending_rows -= batch_values.len(); + attempt = 0; + + precommitted.extend(batch_watermarks.drain(..)); + batch_values.clear(); + + // Check if the pipeline's watermark needs to be updated + let guard = metrics + .watermark_gather_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let mut watermark_needs_update = false; + while let Some(pending) = precommitted.first() { + match watermark.next_cmp(pending) { + Ordering::Future => break, + Ordering::Past => { + // Out of order watermarks can be encountered when a pipeline is + // starting up, because ingestion must start at the lowest + // checkpoint across all pipelines, or because of a backfill, where + // the initial checkpoint has been overridden. + // + // Track how many we see to make sure it doesn't grow without + // bound. + metrics + .total_watermarks_out_of_order + .with_label_values(&[H::NAME]) + .inc(); + + precommitted.pop_first().unwrap(); + } + + Ordering::Next => { + // SAFETY: `precommitted` is known to be non-empty because of the loop + // condition. + watermark = precommitted.pop_first().unwrap(); + watermark_needs_update = true; + } + } + } + + let elapsed = guard.stop_and_record(); + + metrics + .watermark_epoch + .with_label_values(&[H::NAME]) + .set(watermark.epoch_hi_inclusive); + + metrics + .watermark_checkpoint + .with_label_values(&[H::NAME]) + .set(watermark.checkpoint_hi_inclusive); + + metrics + .watermark_transaction + .with_label_values(&[H::NAME]) + .set(watermark.tx_hi); + + debug!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + watermark = watermark.checkpoint_hi_inclusive, + pending = precommitted.len(), + "Gathered watermarks", + ); + + if !config.skip_watermark && watermark_needs_update { + let guard = metrics + .watermark_commit_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + match watermark.update(&mut conn).await { + // If there's an issue updating the watermark, log it but keep going, + // it's OK for the watermark to lag from a correctness perspective. + Err(e) => { + let elapsed = guard.stop_and_record(); + error!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + ?watermark, + "Error updating watermark: {e}", + ); + } + + Ok(updated) => { + let elapsed = guard.stop_and_record(); + + if updated { + metrics + .watermark_epoch_in_db + .with_label_values(&[H::NAME]) + .set(watermark.epoch_hi_inclusive); + + metrics + .watermark_checkpoint_in_db + .with_label_values(&[H::NAME]) + .set(watermark.checkpoint_hi_inclusive); + + metrics + .watermark_transaction_in_db + .with_label_values(&[H::NAME]) + .set(watermark.tx_hi); + } + + if watermark.checkpoint_hi_inclusive > next_loud_watermark_update { + next_loud_watermark_update += LOUD_WATERMARK_UPDATE_INTERVAL; + info!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + updated, + epoch = watermark.epoch_hi_inclusive, + checkpoint = watermark.checkpoint_hi_inclusive, + transaction = watermark.tx_hi, + "Watermark", + ); + } else { + debug!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + updated, + epoch = watermark.epoch_hi_inclusive, + checkpoint = watermark.checkpoint_hi_inclusive, + transaction = watermark.tx_hi, + "Watermark", + ); + } + } + } + } + + // TODO (amnn): Test this behaviour (requires tempdb and migrations). + if pending_rows == 0 && rx.is_closed() { + info!(pipeline = H::NAME, "Handler closed channel, pending rows empty, stopping committer"); + break; + } + + cool.reset(); + } + + // If there are enough pending rows, and we've expended the cooldown, reset the + // commit polling interval so that on the next iteration of the loop, we will write + // out another batch. + // + // TODO (experiment): Do we need this cooldown to deal with contention on the + // connection pool? It's possible that this is just going to eat into our + // throughput. + _ = cool.tick(), if pending_rows > H::BATCH_SIZE => { + poll.reset_immediately(); + } + + Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => { + metrics + .total_committer_rows_received + .with_label_values(&[H::NAME]) + .inc_by(indexed.values.len() as u64); + + if indexed.values.is_empty() { + // If the handler sends an empty batch, short-circuit the commit, and add + // it directly to the pre-commit list. + let (watermark, _) = indexed.into_batch(); + precommitted.insert(watermark); + } else { + pending_rows += indexed.values.len(); + pending.insert(indexed.cp_sequence_number, indexed); + } + } + } + } + }) +} diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs new file mode 100644 index 0000000000000..160ff1ec32515 --- /dev/null +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs @@ -0,0 +1,48 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use committer::committer; +use sui_types::full_checkpoint_content::CheckpointData; +use tokio::{sync::mpsc, task::JoinHandle}; +use tokio_util::sync::CancellationToken; + +use crate::{ + db::Db, handlers::Handler, metrics::IndexerMetrics, models::watermarks::CommitterWatermark, +}; + +use super::{processor::processor, PipelineConfig, COMMITTER_BUFFER}; + +mod committer; + +/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting +/// strictly after the `watermark` (or from the beginning if no watermark was provided). +/// +/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into +/// rows, ready for insertion, and a committer which writes those rows out to the database. +/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order, +/// either because it received the checkpoints out-of-order or because of variance in processing +/// time. +/// +/// The committer also maintains a row in the `watermarks` table for the pipeline which tracks the +/// watermark below which all data has been committed (modulo pruning). +/// +/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and an internal +/// channel is created to communicate checkpoint-wise data to the committer. The pipeline can be +/// shutdown using its `cancel` token. +pub fn pipeline( + watermark: Option>, + config: PipelineConfig, + db: Db, + checkpoint_rx: mpsc::Receiver>, + metrics: Arc, + cancel: CancellationToken, +) -> (JoinHandle<()>, JoinHandle<()>) { + let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + COMMITTER_BUFFER); + + let processor = processor::(checkpoint_rx, processor_tx, metrics.clone(), cancel.clone()); + let committer = committer::(watermark, config, committer_rx, db, metrics, cancel); + + (processor, committer) +} diff --git a/crates/sui-indexer-alt/src/pipeline/mod.rs b/crates/sui-indexer-alt/src/pipeline/mod.rs new file mode 100644 index 0000000000000..4040331c727af --- /dev/null +++ b/crates/sui-indexer-alt/src/pipeline/mod.rs @@ -0,0 +1,57 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use crate::{handlers::Handler, models::watermarks::CommitterWatermark}; + +pub mod concurrent; +mod processor; + +/// Extra buffer added to the channel between the processor and the committer. There does not need to +/// be a huge capacity here because the committer is already buffering rows to insert internally. +const COMMITTER_BUFFER: usize = 5; + +#[derive(clap::Args, Debug, Clone)] +pub struct PipelineConfig { + /// Committer will check for pending data at least this often + #[arg( + long, + default_value = "500", + value_name = "MILLISECONDS", + value_parser = |s: &str| s.parse().map(Duration::from_millis), + )] + commit_interval: Duration, + + /// Avoid writing to the watermark table + #[arg(long)] + skip_watermark: bool, +} + +/// A batch of processed values associated with a single checkpoint. This is an internal type used +/// to communicate between the handler and the committer parts of the pipeline. +struct Indexed { + /// Epoch this data is from + epoch: u64, + /// Checkpoint this data is from + cp_sequence_number: u64, + /// Max (exclusive) transaction sequence number in this batch + tx_hi: u64, + /// Values to be inserted into the database from this checkpoint + values: Vec, +} + +impl Indexed { + /// Split apart the information in this indexed checkpoint into its watermark and the values to + /// add to the database. + fn into_batch(self) -> (CommitterWatermark<'static>, Vec) { + let watermark = CommitterWatermark { + pipeline: H::NAME.into(), + epoch_hi_inclusive: self.epoch as i64, + checkpoint_hi_inclusive: self.cp_sequence_number as i64, + tx_hi: self.tx_hi as i64, + }; + + (watermark, self.values) + } +} diff --git a/crates/sui-indexer-alt/src/pipeline/processor.rs b/crates/sui-indexer-alt/src/pipeline/processor.rs new file mode 100644 index 0000000000000..02735051f5156 --- /dev/null +++ b/crates/sui-indexer-alt/src/pipeline/processor.rs @@ -0,0 +1,119 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use futures::TryStreamExt; +use mysten_metrics::spawn_monitored_task; +use sui_types::full_checkpoint_content::CheckpointData; +use tokio::{sync::mpsc, task::JoinHandle}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info}; + +use crate::{handlers::Handler, metrics::IndexerMetrics}; + +use super::Indexed; + +/// The processor task is responsible for taking checkpoint data and breaking it down into rows +/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and +/// distributes them among `H::FANOUT` workers. +/// +/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx` +/// channel. +/// +/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters +/// an error -- there is no retry logic at this level. +pub(super) fn processor( + rx: mpsc::Receiver>, + tx: mpsc::Sender>, + metrics: Arc, + cancel: CancellationToken, +) -> JoinHandle<()> { + /// Internal type used by workers to propagate errors or shutdown signals up to their + /// supervisor. + #[derive(thiserror::Error, Debug)] + enum Break { + #[error("Shutdown received")] + Cancel, + + #[error(transparent)] + Err(#[from] anyhow::Error), + } + + spawn_monitored_task!(async move { + info!(pipeline = H::NAME, "Starting handler"); + match ReceiverStream::new(rx) + .map(Ok) + .try_for_each_concurrent(H::FANOUT, |checkpoint| { + let tx = tx.clone(); + let metrics = metrics.clone(); + let cancel = cancel.clone(); + async move { + if cancel.is_cancelled() { + return Err(Break::Cancel); + } + + metrics + .total_handler_checkpoints_received + .with_label_values(&[H::NAME]) + .inc(); + + let guard = metrics + .handler_checkpoint_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let values = H::process(&checkpoint)?; + let elapsed = guard.stop_and_record(); + + let epoch = checkpoint.checkpoint_summary.epoch; + let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; + let tx_hi = checkpoint.checkpoint_summary.network_total_transactions; + + debug!( + pipeline = H::NAME, + checkpoint = cp_sequence_number, + elapsed_ms = elapsed * 1000.0, + "Processed checkpoint", + ); + + metrics + .total_handler_checkpoints_processed + .with_label_values(&[H::NAME]) + .inc(); + + metrics + .total_handler_rows_created + .with_label_values(&[H::NAME]) + .inc_by(values.len() as u64); + + tx.send(Indexed { + epoch, + cp_sequence_number, + tx_hi, + values, + }) + .await + .map_err(|_| Break::Cancel)?; + + Ok(()) + } + }) + .await + { + Ok(()) => { + info!(pipeline = H::NAME, "Checkpoints done, stopping handler"); + } + + Err(Break::Cancel) => { + info!(pipeline = H::NAME, "Shutdown received, stopping handler"); + } + + Err(Break::Err(e)) => { + error!(pipeline = H::NAME, "Error from handler: {e}"); + cancel.cancel(); + } + }; + }) +}