Skip to content

Commit

Permalink
Revert "[kv store] add watermark table to bigtable (#20390)" (#20670)
Browse files Browse the repository at this point in the history
This reverts commit 7eadb13.

## Description 

The hot row-hot cell pattern significantly slows down writes to
Bigtable, impacting the ingestion speed of core pipelines



---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Dec 18, 2024
1 parent 3e63476 commit 84141f5
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 76 deletions.
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod progress_store;
mod workers;

pub use progress_store::IngestionWorkflowsProgressStore;
pub use progress_store::DynamoDBProgressStore;
pub use workers::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig,
KVStoreWorker,
Expand Down
19 changes: 2 additions & 17 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
IngestionWorkflowsProgressStore, KVStoreTaskConfig, KVStoreWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
Expand Down Expand Up @@ -119,27 +119,12 @@ async fn main() -> Result<()> {
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);

let mut bigtable_client = None;
for task in &config.tasks {
if let Task::BigTableKV(kv_config) = &task.task {
bigtable_client = Some(
BigTableClient::new_remote(
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?,
);
}
}

let progress_store = IngestionWorkflowsProgressStore::new(
let progress_store = DynamoDBProgressStore::new(
&config.progress_store.aws_access_key_id,
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
bigtable_client,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down
15 changes: 3 additions & 12 deletions crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@ use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::{BigTableClient, KeyValueStoreWriter};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct IngestionWorkflowsProgressStore {
pub struct DynamoDBProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
}

impl IngestionWorkflowsProgressStore {
impl DynamoDBProgressStore {
pub async fn new(
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: String,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -53,13 +50,12 @@ impl IngestionWorkflowsProgressStore {
client,
table_name,
is_backfill,
bigtable_client,
}
}
}

#[async_trait]
impl ProgressStore for IngestionWorkflowsProgressStore {
impl ProgressStore for DynamoDBProgressStore {
async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
let item = self
.client
Expand All @@ -83,11 +79,6 @@ impl ProgressStore for IngestionWorkflowsProgressStore {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
if let Some(ref mut bigtable_client) = self.bigtable_client {
bigtable_client
.save_watermark(&task_name, checkpoint_number)
.await?;
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down
49 changes: 10 additions & 39 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ const OBJECTS_TABLE: &str = "objects";
const TRANSACTIONS_TABLE: &str = "transactions";
const CHECKPOINTS_TABLE: &str = "checkpoints";
const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest";
const WATERMARK_TABLE: &str = "watermark";

const COLUMN_FAMILY_NAME: &str = "sui";
const DEFAULT_COLUMN_QUALIFIER: &str = "";
const AGGREGATED_WATERMARK_NAME: &str = "bigtable";
const CHECKPOINT_SUMMARY_COLUMN_QUALIFIER: &str = "s";
const CHECKPOINT_SIGNATURES_COLUMN_QUALIFIER: &str = "sg";
const CHECKPOINT_CONTENTS_COLUMN_QUALIFIER: &str = "c";
Expand Down Expand Up @@ -133,21 +131,6 @@ impl KeyValueStoreWriter for BigTableClient {
)
.await
}

async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()> {
let key = name.as_bytes().to_vec();
let value = watermark.to_be_bytes().to_vec();
self.multi_set_with_timestamp(
WATERMARK_TABLE,
[(key, vec![(DEFAULT_COLUMN_QUALIFIER, value)])],
watermark as i64,
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -254,7 +237,15 @@ impl KeyValueStoreReader for BigTableClient {
}

async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
self.get_watermark(AGGREGATED_WATERMARK_NAME).await
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.await?
.pop()
{
Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)),
None => Ok(0),
}
}

async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>> {
Expand All @@ -266,17 +257,6 @@ impl KeyValueStoreReader for BigTableClient {
}
Ok(None)
}

async fn get_watermark(&mut self, watermark_name: &str) -> Result<CheckpointSequenceNumber> {
let key = watermark_name.as_bytes().to_vec();
let mut response = self.multi_get(WATERMARK_TABLE, vec![key]).await?;
if let Some(row) = response.pop() {
if let Some((_, value)) = row.into_iter().next() {
return Ok(u64::from_be_bytes(value.as_slice().try_into()?));
}
}
Ok(0)
}
}

impl BigTableClient {
Expand Down Expand Up @@ -402,15 +382,6 @@ impl BigTableClient {
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
) -> Result<()> {
self.multi_set_with_timestamp(table_name, values, -1).await
}

async fn multi_set_with_timestamp(
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
timestamp: i64,
) -> Result<()> {
let mut entries = vec![];
for (row_key, cells) in values {
Expand All @@ -422,7 +393,7 @@ impl BigTableClient {
column_qualifier: column_name.to_owned().into_bytes(),
// The timestamp of the cell into which new data should be written.
// Use -1 for current Bigtable server time.
timestamp_micros: timestamp,
timestamp_micros: -1,
value,
})),
})
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-kvstore/src/bigtable/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
command+=(-project emulator)
fi

for table in objects transactions checkpoints checkpoints_by_digest watermark; do
for table in objects transactions checkpoints checkpoints_by_digest; do
(
set -x
"${command[@]}" createtable $table
Expand Down
6 changes: 0 additions & 6 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@ pub trait KeyValueStoreReader {
) -> Result<Option<Checkpoint>>;
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber>;
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
async fn get_watermark(&mut self, task_name: &str) -> Result<u64>;
}

#[async_trait]
pub trait KeyValueStoreWriter {
async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>;
async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>;
async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>;
async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()>;
}

#[derive(Clone, Debug)]
Expand Down

0 comments on commit 84141f5

Please sign in to comment.