From dd951f88d060cb039ef83fa596e6cfd2f5d6eda2 Mon Sep 17 00:00:00 2001 From: Lu Zhang <8418040+longbowlu@users.noreply.github.com> Date: Sat, 7 Sep 2024 17:39:26 -0700 Subject: [PATCH] [bridge-indexer] revamp task (#19245) ## Description This PR reworks `Tasks`: 1. get rid of trait `Tasks` and create struct `Tasks` instead. 2. add `is_live_task` field to `Task` 3. pass `Task` to several functions instead of its parameters. 4. for ingestion framework, use a custom batch read size for backfill tasks (this significantly improves the data download speed) ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../src/eth_bridge_indexer.rs | 17 +++---- crates/sui-bridge-indexer/src/models.rs | 5 +- crates/sui-bridge-indexer/src/storage.rs | 31 ++++++++++-- .../sui-bridge-indexer/src/sui_datasource.rs | 37 ++++++++++---- .../src/indexer_builder.rs | 46 ++++++++--------- crates/sui-indexer-builder/src/lib.rs | 50 +++++++++++++------ .../tests/indexer_test_utils.rs | 46 ++++++++++++----- .../tests/indexer_tests.rs | 35 ++++++++----- 8 files changed, 175 insertions(+), 92 deletions(-) diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index 409b787aafafe..a2ecbcbebb8a8 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -16,6 +16,7 @@ use sui_bridge::error::BridgeError; use sui_bridge::eth_client::EthClient; use sui_bridge::metered_eth_provider::MeteredEthHttpProvier; use sui_bridge::retry_with_max_elapsed_time; +use sui_indexer_builder::Task; use tokio::task::JoinHandle; use tracing::info; @@ -63,14 +64,13 @@ impl EthSubscriptionDatasource { impl Datasource for EthSubscriptionDatasource { async fn start_data_retrieval( &self, - starting_checkpoint: u64, - target_checkpoint: u64, + task: Task, data_sender: DataSender, ) -> Result>, Error> { let filter = Filter::new() .address(self.bridge_address) - .from_block(starting_checkpoint) - .to_block(target_checkpoint); + .from_block(task.start_checkpoint) + .to_block(task.target_checkpoint); let eth_ws_url = self.eth_ws_url.clone(); let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone(); @@ -194,8 +194,7 @@ impl EthSyncDatasource { impl Datasource for EthSyncDatasource { async fn start_data_retrieval( &self, - starting_checkpoint: u64, - target_checkpoint: u64, + task: Task, data_sender: DataSender, ) -> Result>, Error> { let provider = Arc::new( @@ -214,8 +213,8 @@ impl Datasource for EthSyncDatasource { let Ok(Ok(logs)) = retry_with_max_elapsed_time!( client.get_raw_events_in_range( bridge_address, - starting_checkpoint, - target_checkpoint + task.start_checkpoint, + task.target_checkpoint ), Duration::from_secs(30000) ) else { @@ -254,7 +253,7 @@ impl Datasource for EthSyncDatasource { data.push((log, block, transaction)); } - data_sender.send((target_checkpoint, data)).await?; + data_sender.send((task.target_checkpoint, data)).await?; indexer_metrics .last_synced_eth_block diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index 90435cd60c154..0d86b6f172986 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -4,7 +4,7 @@ use diesel::data_types::PgTimestamp; use diesel::{Identifiable, Insertable, Queryable, Selectable}; -use sui_indexer_builder::Task; +use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT}; use crate::schema::{ progress_store, sui_error_transactions, sui_progress_store, token_transfer, token_transfer_data, @@ -23,10 +23,11 @@ impl From for Task { fn from(value: ProgressStore) -> Self { Self { task_name: value.task_name, - checkpoint: value.checkpoint as u64, + start_checkpoint: value.checkpoint as u64, target_checkpoint: value.target_checkpoint as u64, // Ok to unwrap, timestamp is defaulted to now() in database timestamp: value.timestamp.expect("Timestamp not set").0 as u64, + is_live_task: value.target_checkpoint == LIVE_TASK_TARGET_CHECKPOINT, } } } diff --git a/crates/sui-bridge-indexer/src/storage.rs b/crates/sui-bridge-indexer/src/storage.rs index 2ab775d2c5fa2..4f853921944a3 100644 --- a/crates/sui-bridge-indexer/src/storage.rs +++ b/crates/sui-bridge-indexer/src/storage.rs @@ -21,7 +21,7 @@ use crate::schema::progress_store::{columns, dsl}; use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; use crate::{models, schema, ProcessedTxnData}; use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent}; -use sui_indexer_builder::Task; +use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT}; /// Persistent layer impl #[derive(Clone)] @@ -147,7 +147,7 @@ impl IndexerProgressStore for PgBridgePersistent { Ok(None) } - async fn get_ongoing_tasks(&self, prefix: &str) -> Result, anyhow::Error> { + async fn get_ongoing_tasks(&self, prefix: &str) -> Result { let mut conn = self.pool.get().await?; // get all unfinished tasks let cp: Vec = dsl::progress_store @@ -157,7 +157,8 @@ impl IndexerProgressStore for PgBridgePersistent { .order_by(columns::target_checkpoint.desc()) .load(&mut conn) .await?; - Ok(cp.into_iter().map(|d| d.into()).collect()) + let tasks = cp.into_iter().map(|d| d.into()).collect(); + Ok(Tasks::new(tasks)?) } async fn get_largest_backfill_task_target_checkpoint( @@ -177,6 +178,8 @@ impl IndexerProgressStore for PgBridgePersistent { Ok(cp.map(|c| c as u64)) } + /// Register a new task to progress store with a start checkpoint and target checkpoint. + /// Usually used for backfill tasks. async fn register_task( &mut self, task_name: String, @@ -197,11 +200,31 @@ impl IndexerProgressStore for PgBridgePersistent { Ok(()) } + /// Register a live task to progress store with a start checkpoint. + async fn register_live_task( + &mut self, + task_name: String, + start_checkpoint: u64, + ) -> Result<(), anyhow::Error> { + let mut conn = self.pool.get().await?; + diesel::insert_into(schema::progress_store::table) + .values(models::ProgressStore { + task_name, + checkpoint: start_checkpoint as i64, + target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT, + // Timestamp is defaulted to current time in DB if None + timestamp: None, + }) + .execute(&mut conn) + .await?; + Ok(()) + } + async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> { let mut conn = self.pool.get().await?; diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name))) .set(( - columns::checkpoint.eq(task.checkpoint as i64), + columns::checkpoint.eq(task.start_checkpoint as i64), columns::target_checkpoint.eq(task.target_checkpoint as i64), columns::timestamp.eq(now), )) diff --git a/crates/sui-bridge-indexer/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs index fe8891897f36f..78928dafff8a1 100644 --- a/crates/sui-bridge-indexer/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -12,8 +12,8 @@ use sui_data_ingestion_core::{ DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool, }; use sui_indexer_builder::indexer_builder::{DataSender, Datasource}; +use sui_indexer_builder::Task; use sui_sdk::SuiClient; -use sui_types::base_types::TransactionDigest; use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData; use sui_types::full_checkpoint_content::CheckpointTransaction; use sui_types::messages_checkpoint::CheckpointSequenceNumber; @@ -23,6 +23,9 @@ use tokio::task::JoinHandle; use crate::metrics::BridgeIndexerMetrics; +const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300; +const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10; + pub struct SuiCheckpointDatasource { remote_store_url: String, sui_client: Arc, @@ -58,23 +61,32 @@ impl SuiCheckpointDatasource { impl Datasource for SuiCheckpointDatasource { async fn start_data_retrieval( &self, - starting_checkpoint: u64, - target_checkpoint: u64, + task: Task, data_sender: DataSender, ) -> Result>, Error> { let (exit_sender, exit_receiver) = oneshot::channel(); let progress_store = PerTaskInMemProgressStore { - current_checkpoint: starting_checkpoint, - exit_checkpoint: target_checkpoint, + current_checkpoint: task.start_checkpoint, + exit_checkpoint: task.target_checkpoint, exit_sender: Some(exit_sender), }; + // The max concurrnecy of checkpoint to fetch at the same time for ingestion framework + let ingestion_reader_batch_size = if task.is_live_task { + // Live task uses smaller number to be cost effective + LIVE_TASK_INGESTION_READER_BATCH_SIZE + } else { + std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE") + .unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string()) + .parse::() + .unwrap() + }; + tracing::info!( + "Starting Sui checkpoint data retrieval with batch size {}", + ingestion_reader_batch_size + ); let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone()); let worker = IndexerWorker::new(data_sender); - let worker_pool = WorkerPool::new( - worker, - TransactionDigest::random().to_string(), - self.concurrency, - ); + let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency); executor.register(worker_pool).await?; let checkpoint_path = self.checkpoint_path.clone(); let remote_store_url = self.remote_store_url.clone(); @@ -84,7 +96,10 @@ impl Datasource for SuiCheckpointDatasource { checkpoint_path, Some(remote_store_url), vec![], // optional remote store access options - ReaderOptions::default(), + ReaderOptions { + batch_size: ingestion_reader_batch_size, + ..Default::default() + }, exit_receiver, ) .await?; diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index 8f8eef1dcf7d9..c703f1470c2fa 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -116,9 +116,7 @@ impl Indexer { let live_task_future = match ongoing_tasks.live_task() { Some(live_task) if !self.disable_live_task => { let live_task_future = self.datasource.start_ingestion_task( - live_task.task_name.clone(), - live_task.checkpoint, - live_task.target_checkpoint, + live_task, self.storage.clone(), self.data_mapper.clone(), ); @@ -127,7 +125,7 @@ impl Indexer { _ => None, }; - let backfill_tasks = ongoing_tasks.backfill_tasks(); + let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc(); let storage_clone = self.storage.clone(); let data_mapper_clone = self.data_mapper.clone(); let datasource_clone = self.datasource.clone(); @@ -135,12 +133,10 @@ impl Indexer { let handle = spawn_monitored_task!(async { // Execute tasks one by one for backfill_task in backfill_tasks { - if backfill_task.checkpoint < backfill_task.target_checkpoint { + if backfill_task.start_checkpoint < backfill_task.target_checkpoint { datasource_clone .start_ingestion_task( - backfill_task.task_name.clone(), - backfill_task.checkpoint, - backfill_task.target_checkpoint, + backfill_task, storage_clone.clone(), data_mapper_clone.clone(), ) @@ -181,10 +177,9 @@ impl Indexer { match ongoing_tasks.live_task() { None => { self.storage - .register_task( + .register_live_task( format!("{} - Live", self.name), live_task_from_checkpoint, - i64::MAX as u64, ) .await .tap_err(|e| { @@ -199,8 +194,8 @@ impl Indexer { // We still check this because in the case of slow // block generation (e.g. Ethereum), it's possible we will // stay on the same block for a bit. - if live_task_from_checkpoint != live_task.checkpoint { - live_task.checkpoint = live_task_from_checkpoint; + if live_task_from_checkpoint != live_task.start_checkpoint { + live_task.start_checkpoint = live_task_from_checkpoint; self.storage.update_task(live_task).await.tap_err(|e| { tracing::error!( "Failed to update live task to ({}-MAX): {:?}", @@ -318,7 +313,7 @@ pub trait IndexerProgressStore: Send { target_checkpoint_number: u64, ) -> anyhow::Result>; - async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result, Error>; + async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result; async fn get_largest_backfill_task_target_checkpoint( &self, @@ -328,10 +323,16 @@ pub trait IndexerProgressStore: Send { async fn register_task( &mut self, task_name: String, - checkpoint: u64, + start_checkpoint: u64, target_checkpoint: u64, ) -> Result<(), anyhow::Error>; + async fn register_live_task( + &mut self, + task_name: String, + start_checkpoint: u64, + ) -> Result<(), anyhow::Error>; + async fn update_task(&mut self, task: Task) -> Result<(), Error>; } @@ -339,9 +340,7 @@ pub trait IndexerProgressStore: Send { pub trait Datasource: Sync + Send { async fn start_ingestion_task( &self, - task_name: String, - starting_checkpoint: u64, - target_checkpoint: u64, + task: Task, mut storage: P, data_mapper: M, ) -> Result<(), Error> @@ -349,6 +348,9 @@ pub trait Datasource: Sync + Send { M: DataMapper, P: Persistent, { + let task_name = task.task_name.clone(); + let starting_checkpoint = task.start_checkpoint; + let target_checkpoint = task.target_checkpoint; let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE") .unwrap_or(INGESTION_BATCH_SIZE.to_string()) .parse::() @@ -365,7 +367,6 @@ pub trait Datasource: Sync + Send { starting_checkpoint, target_checkpoint, ); - let is_live_task = target_checkpoint == i64::MAX as u64; let (data_sender, data_rx) = metered_channel::channel( checkpoint_channel_size, &mysten_metrics::get_metrics() @@ -373,10 +374,8 @@ pub trait Datasource: Sync + Send { .channel_inflight .with_label_values(&[&task_name]), ); - let join_handle = self - .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender) - .await?; - + let is_live_task = task.is_live_task; + let join_handle = self.start_data_retrieval(task, data_sender).await?; let processed_checkpoints_metrics = self .get_tasks_processed_checkpoints_metric() .with_label_values(&[&task_name]); @@ -499,8 +498,7 @@ pub trait Datasource: Sync + Send { async fn start_data_retrieval( &self, - starting_checkpoint: u64, - target_checkpoint: u64, + task: Task, data_sender: DataSender, ) -> Result>, Error>; diff --git a/crates/sui-indexer-builder/src/lib.rs b/crates/sui-indexer-builder/src/lib.rs index af00dde516ac0..160e15c50a80c 100644 --- a/crates/sui-indexer-builder/src/lib.rs +++ b/crates/sui-indexer-builder/src/lib.rs @@ -3,32 +3,50 @@ pub mod indexer_builder; +pub const LIVE_TASK_TARGET_CHECKPOINT: i64 = i64::MAX; + #[derive(Clone, Debug)] pub struct Task { pub task_name: String, - pub checkpoint: u64, + pub start_checkpoint: u64, pub target_checkpoint: u64, pub timestamp: u64, + pub is_live_task: bool, } -pub trait Tasks { - fn live_task(&self) -> Option; - - fn backfill_tasks(&self) -> Vec; +#[derive(Clone, Debug)] +pub struct Tasks { + live_task: Option, + backfill_tasks: Vec, } -impl Tasks for Vec { - fn live_task(&self) -> Option { - // TODO: Change the schema to record live task properly. - self.iter() - .find(|t| t.target_checkpoint == i64::MAX as u64) - .cloned() +impl Tasks { + pub fn new(tasks: Vec) -> anyhow::Result { + let mut live_tasks = vec![]; + let mut backfill_tasks = vec![]; + for task in tasks { + if task.is_live_task { + live_tasks.push(task); + } else { + backfill_tasks.push(task); + } + } + if live_tasks.len() > 1 { + anyhow::bail!("More than one live task found: {:?}", live_tasks); + } + Ok(Self { + live_task: live_tasks.pop(), + backfill_tasks, + }) + } + + pub fn live_task(&self) -> Option { + self.live_task.clone() } - fn backfill_tasks(&self) -> Vec { - self.iter() - .filter(|t| t.target_checkpoint != i64::MAX as u64) - .cloned() - .collect() + pub fn backfill_tasks_ordered_desc(&self) -> Vec { + let mut tasks = self.backfill_tasks.clone(); + tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint)); + tasks } } diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index 4921fdeb2fc53..5b17e5254d6b8 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -16,7 +16,7 @@ use mysten_metrics::spawn_monitored_task; use sui_indexer_builder::indexer_builder::{ DataMapper, DataSender, Datasource, IndexerProgressStore, Persistent, }; -use sui_indexer_builder::Task; +use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT}; pub struct TestDatasource { pub data: Vec, @@ -33,14 +33,13 @@ where { async fn start_data_retrieval( &self, - starting_checkpoint: u64, - _target_checkpoint: u64, + task: Task, data_sender: DataSender, ) -> Result>, Error> { let data_clone = self.data.clone(); Ok(spawn_monitored_task!(async { - let mut cp = starting_checkpoint; + let mut cp = task.start_checkpoint; while cp < data_clone.len() as u64 { data_sender .send((cp, vec![data_clone[cp as usize].clone()])) @@ -94,7 +93,7 @@ impl InMemoryPersistent { .filter(|task| task.task_name.starts_with(task_prefix)) .cloned() .collect::>(); - tasks.sort_by(|t1, t2| t2.checkpoint.cmp(&t1.checkpoint)); + tasks.sort_by(|t1, t2| t2.start_checkpoint.cmp(&t1.start_checkpoint)); Ok(tasks) } } @@ -108,7 +107,7 @@ impl IndexerProgressStore for InMemoryPersistent { .await .get(&task_name) .unwrap() - .checkpoint) + .start_checkpoint) } async fn save_progress( @@ -124,22 +123,21 @@ impl IndexerProgressStore for InMemoryPersistent { .await .get_mut(&task_name) .unwrap() - .checkpoint = checkpoint_number; + .start_checkpoint = checkpoint_number; Ok(Some(checkpoint_number)) } - async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result, Error> { - let mut tasks = self + async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result { + let tasks = self .progress_store .lock() .await .values() .filter(|task| task.task_name.starts_with(task_prefix)) - .filter(|task| task.checkpoint.lt(&task.target_checkpoint)) + .filter(|task| task.start_checkpoint.lt(&task.target_checkpoint)) .cloned() .collect::>(); - tasks.sort_by(|t1, t2| t2.checkpoint.cmp(&t1.checkpoint)); - Ok(tasks) + Tasks::new(tasks) } async fn get_largest_backfill_task_target_checkpoint( @@ -167,9 +165,31 @@ impl IndexerProgressStore for InMemoryPersistent { task_name.clone(), Task { task_name: task_name.clone(), - checkpoint, + start_checkpoint: checkpoint, target_checkpoint, timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64, + is_live_task: false, + }, + ); + if existing.is_some() { + return Err(anyhow!("Task {task_name} already exists")); + } + Ok(()) + } + + async fn register_live_task( + &mut self, + task_name: String, + checkpoint: u64, + ) -> Result<(), Error> { + let existing = self.progress_store.lock().await.insert( + task_name.clone(), + Task { + task_name: task_name.clone(), + start_checkpoint: checkpoint, + target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT as u64, + timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64, + is_live_task: true, }, ); if existing.is_some() { diff --git a/crates/sui-indexer-builder/tests/indexer_tests.rs b/crates/sui-indexer-builder/tests/indexer_tests.rs index 8beeddcb5b74b..ef2dd03d1ec83 100644 --- a/crates/sui-indexer-builder/tests/indexer_tests.rs +++ b/crates/sui-indexer-builder/tests/indexer_tests.rs @@ -7,7 +7,7 @@ use prometheus::{ IntGaugeVec, Registry, }; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; -use sui_indexer_builder::Task; +use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT}; mod indexer_test_utils; @@ -119,9 +119,10 @@ async fn indexer_partitioned_task_with_data_already_in_db_test1() { "test_indexer - backfill - 1".to_string(), Task { task_name: "test_indexer - backfill - 1".to_string(), - checkpoint: 30, + start_checkpoint: 30, target_checkpoint: 30, timestamp: 0, + is_live_task: false, }, ); let mut indexer = IndexerBuilder::new( @@ -170,9 +171,10 @@ async fn indexer_partitioned_task_with_data_already_in_db_test2() { "test_indexer - backfill - 1".to_string(), Task { task_name: "test_indexer - backfill - 1".to_string(), - checkpoint: 30, + start_checkpoint: 30, target_checkpoint: 30, timestamp: 0, + is_live_task: false, }, ); let mut indexer = IndexerBuilder::new( @@ -222,18 +224,20 @@ async fn indexer_partitioned_task_with_data_already_in_db_test3() { "test_indexer - backfill - 20:30".to_string(), Task { task_name: "test_indexer - backfill - 20:30".to_string(), - checkpoint: 30, + start_checkpoint: 30, target_checkpoint: 30, timestamp: 0, + is_live_task: false, }, ); persistent.progress_store.lock().await.insert( "test_indexer - backfill - 10:19".to_string(), Task { task_name: "test_indexer - backfill - 10:19".to_string(), - checkpoint: 10, + start_checkpoint: 10, target_checkpoint: 19, timestamp: 0, + is_live_task: false, }, ); let mut indexer = IndexerBuilder::new( @@ -278,18 +282,20 @@ async fn indexer_partitioned_task_with_data_already_in_db_test4() { "test_indexer - backfill - 20:30".to_string(), Task { task_name: "test_indexer - backfill - 20:30".to_string(), - checkpoint: 30, + start_checkpoint: 30, target_checkpoint: 30, timestamp: 0, + is_live_task: false, }, ); persistent.progress_store.lock().await.insert( "test_indexer - backfill - 10:19".to_string(), Task { task_name: "test_indexer - backfill - 10:19".to_string(), - checkpoint: 10, + start_checkpoint: 10, target_checkpoint: 19, timestamp: 0, + is_live_task: false, }, ); let mut indexer = IndexerBuilder::new( @@ -338,9 +344,10 @@ async fn indexer_with_existing_live_task1() { "test_indexer - Live".to_string(), Task { task_name: "test_indexer - Live".to_string(), - checkpoint: 30, - target_checkpoint: i64::MAX as u64, + start_checkpoint: 30, + target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT as u64, timestamp: 0, + is_live_task: true, }, ); let mut indexer = IndexerBuilder::new( @@ -383,9 +390,10 @@ async fn indexer_with_existing_live_task2() { "test_indexer - Live".to_string(), Task { task_name: "test_indexer - Live".to_string(), - checkpoint: 30, - target_checkpoint: i64::MAX as u64, + start_checkpoint: 30, + target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT as u64, timestamp: 10, + is_live_task: true, }, ); let mut indexer = IndexerBuilder::new( @@ -414,7 +422,7 @@ fn assert_ranges(desc_ordered_tasks: &[Task], ranges: Vec<(u64, u64)>) { let mut iter = desc_ordered_tasks.iter(); for (start, end) in ranges { let task = iter.next().unwrap(); - assert_eq!(start, task.checkpoint); + assert_eq!(start, task.start_checkpoint); assert_eq!(end, task.target_checkpoint); } } @@ -438,9 +446,10 @@ async fn resume_test() { "test_indexer - backfill - 30".to_string(), Task { task_name: "test_indexer - backfill - 30".to_string(), - checkpoint: 10, + start_checkpoint: 10, target_checkpoint: 30, timestamp: 0, + is_live_task: false, }, ); let mut indexer = IndexerBuilder::new(