diff --git a/crates/sui-data-ingestion/src/lib.rs b/crates/sui-data-ingestion/src/lib.rs index 1853b8f0af884..3ef48d9ec9713 100644 --- a/crates/sui-data-ingestion/src/lib.rs +++ b/crates/sui-data-ingestion/src/lib.rs @@ -4,7 +4,7 @@ mod progress_store; mod workers; -pub use progress_store::IngestionWorkflowsProgressStore; +pub use progress_store::DynamoDBProgressStore; pub use workers::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig, KVStoreWorker, diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index f403a01c2d447..cd2c00694a07d 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use sui_data_ingestion::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, - IngestionWorkflowsProgressStore, KVStoreTaskConfig, KVStoreWorker, + DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker, }; use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions}; use sui_data_ingestion_core::{IndexerExecutor, WorkerPool}; @@ -119,27 +119,12 @@ async fn main() -> Result<()> { mysten_metrics::init_metrics(®istry); let metrics = DataIngestionMetrics::new(®istry); - let mut bigtable_client = None; - for task in &config.tasks { - if let Task::BigTableKV(kv_config) = &task.task { - bigtable_client = Some( - BigTableClient::new_remote( - kv_config.instance_id.clone(), - false, - Some(Duration::from_secs(kv_config.timeout_secs as u64)), - ) - .await?, - ); - } - } - - let progress_store = IngestionWorkflowsProgressStore::new( + let progress_store = DynamoDBProgressStore::new( &config.progress_store.aws_access_key_id, &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, config.is_backfill, - bigtable_client, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index 550f164e4ddf7..02857becfc626 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -11,24 +11,21 @@ use aws_sdk_s3::config::{Credentials, Region}; use std::str::FromStr; use std::time::Duration; use sui_data_ingestion_core::ProgressStore; -use sui_kvstore::{BigTableClient, KeyValueStoreWriter}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; -pub struct IngestionWorkflowsProgressStore { +pub struct DynamoDBProgressStore { client: Client, table_name: String, is_backfill: bool, - bigtable_client: Option, } -impl IngestionWorkflowsProgressStore { +impl DynamoDBProgressStore { pub async fn new( aws_access_key_id: &str, aws_secret_access_key: &str, aws_region: String, table_name: String, is_backfill: bool, - bigtable_client: Option, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -53,13 +50,12 @@ impl IngestionWorkflowsProgressStore { client, table_name, is_backfill, - bigtable_client, } } } #[async_trait] -impl ProgressStore for IngestionWorkflowsProgressStore { +impl ProgressStore for DynamoDBProgressStore { async fn load(&mut self, task_name: String) -> Result { let item = self .client @@ -83,11 +79,6 @@ impl ProgressStore for IngestionWorkflowsProgressStore { if self.is_backfill && checkpoint_number % 1000 != 0 { return Ok(()); } - if let Some(ref mut bigtable_client) = self.bigtable_client { - bigtable_client - .save_watermark(&task_name, checkpoint_number) - .await?; - } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self diff --git a/crates/sui-kvstore/src/bigtable/client.rs b/crates/sui-kvstore/src/bigtable/client.rs index 553a97c6e9e92..5c85447622827 100644 --- a/crates/sui-kvstore/src/bigtable/client.rs +++ b/crates/sui-kvstore/src/bigtable/client.rs @@ -35,11 +35,9 @@ const OBJECTS_TABLE: &str = "objects"; const TRANSACTIONS_TABLE: &str = "transactions"; const CHECKPOINTS_TABLE: &str = "checkpoints"; const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest"; -const WATERMARK_TABLE: &str = "watermark"; const COLUMN_FAMILY_NAME: &str = "sui"; const DEFAULT_COLUMN_QUALIFIER: &str = ""; -const AGGREGATED_WATERMARK_NAME: &str = "bigtable"; const CHECKPOINT_SUMMARY_COLUMN_QUALIFIER: &str = "s"; const CHECKPOINT_SIGNATURES_COLUMN_QUALIFIER: &str = "sg"; const CHECKPOINT_CONTENTS_COLUMN_QUALIFIER: &str = "c"; @@ -133,21 +131,6 @@ impl KeyValueStoreWriter for BigTableClient { ) .await } - - async fn save_watermark( - &mut self, - name: &str, - watermark: CheckpointSequenceNumber, - ) -> Result<()> { - let key = name.as_bytes().to_vec(); - let value = watermark.to_be_bytes().to_vec(); - self.multi_set_with_timestamp( - WATERMARK_TABLE, - [(key, vec![(DEFAULT_COLUMN_QUALIFIER, value)])], - watermark as i64, - ) - .await - } } #[async_trait] @@ -254,7 +237,15 @@ impl KeyValueStoreReader for BigTableClient { } async fn get_latest_checkpoint(&mut self) -> Result { - self.get_watermark(AGGREGATED_WATERMARK_NAME).await + let upper_limit = u64::MAX.to_be_bytes().to_vec(); + match self + .reversed_scan(CHECKPOINTS_TABLE, upper_limit) + .await? + .pop() + { + Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)), + None => Ok(0), + } } async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result> { @@ -266,17 +257,6 @@ impl KeyValueStoreReader for BigTableClient { } Ok(None) } - - async fn get_watermark(&mut self, watermark_name: &str) -> Result { - let key = watermark_name.as_bytes().to_vec(); - let mut response = self.multi_get(WATERMARK_TABLE, vec![key]).await?; - if let Some(row) = response.pop() { - if let Some((_, value)) = row.into_iter().next() { - return Ok(u64::from_be_bytes(value.as_slice().try_into()?)); - } - } - Ok(0) - } } impl BigTableClient { @@ -402,15 +382,6 @@ impl BigTableClient { &mut self, table_name: &str, values: impl IntoIterator)> + std::marker::Send, - ) -> Result<()> { - self.multi_set_with_timestamp(table_name, values, -1).await - } - - async fn multi_set_with_timestamp( - &mut self, - table_name: &str, - values: impl IntoIterator)> + std::marker::Send, - timestamp: i64, ) -> Result<()> { let mut entries = vec![]; for (row_key, cells) in values { @@ -422,7 +393,7 @@ impl BigTableClient { column_qualifier: column_name.to_owned().into_bytes(), // The timestamp of the cell into which new data should be written. // Use -1 for current Bigtable server time. - timestamp_micros: timestamp, + timestamp_micros: -1, value, })), }) diff --git a/crates/sui-kvstore/src/bigtable/init.sh b/crates/sui-kvstore/src/bigtable/init.sh index b41ce0ec1fe09..f96ac5c1e9827 100755 --- a/crates/sui-kvstore/src/bigtable/init.sh +++ b/crates/sui-kvstore/src/bigtable/init.sh @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then command+=(-project emulator) fi -for table in objects transactions checkpoints checkpoints_by_digest watermark; do +for table in objects transactions checkpoints checkpoints_by_digest; do ( set -x "${command[@]}" createtable $table diff --git a/crates/sui-kvstore/src/lib.rs b/crates/sui-kvstore/src/lib.rs index d8fa9117de8c1..5d3ab55a3f64a 100644 --- a/crates/sui-kvstore/src/lib.rs +++ b/crates/sui-kvstore/src/lib.rs @@ -34,7 +34,6 @@ pub trait KeyValueStoreReader { ) -> Result>; async fn get_latest_checkpoint(&mut self) -> Result; async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result>; - async fn get_watermark(&mut self, task_name: &str) -> Result; } #[async_trait] @@ -42,11 +41,6 @@ pub trait KeyValueStoreWriter { async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>; async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>; async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>; - async fn save_watermark( - &mut self, - name: &str, - watermark: CheckpointSequenceNumber, - ) -> Result<()>; } #[derive(Clone, Debug)]