From 8bd53998f88c4619f39dcf298f699199a0071e7c Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Wed, 27 Nov 2024 10:00:41 -0500 Subject: [PATCH] [json rpc] query events based on transaction digest (#20422) ## Description changes events query access pattern in JSON RPC from queries by events digest to queries by transaction digest. It's prerequisite to switch to BigTable based KV store --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-core/src/authority.rs | 25 +++ crates/sui-json-rpc/src/coin_api.rs | 4 +- crates/sui-json-rpc/src/read_api.rs | 202 +++++++----------- .../sui-storage/src/http_key_value_store.rs | 25 +++ crates/sui-storage/src/key_value_store.rs | 30 +++ crates/sui-storage/tests/key_value_tests.rs | 7 + 6 files changed, 166 insertions(+), 127 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index bfb4868b712c7..4b3a4e5dd7863 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -5409,6 +5409,31 @@ impl TransactionKeyValueStoreTrait for AuthorityState { .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint)) .collect()) } + + #[instrument(skip(self))] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + if digests.is_empty() { + return Ok(vec![]); + } + let events_digests: Vec<_> = self + .get_transaction_cache_reader() + .multi_get_executed_effects(digests) + .into_iter() + .map(|t| t.and_then(|t| t.events_digest().cloned())) + .collect(); + let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect(); + let mut events = self + .get_transaction_cache_reader() + .multi_get_events(&non_empty_events) + .into_iter(); + Ok(events_digests + .into_iter() + .map(|ev| ev.and_then(|_| events.next()?)) + .collect()) + } } #[cfg(msim)] diff --git a/crates/sui-json-rpc/src/coin_api.rs b/crates/sui-json-rpc/src/coin_api.rs index 77a6fea0e7bba..f0e2104746b63 100644 --- a/crates/sui-json-rpc/src/coin_api.rs +++ b/crates/sui-json-rpc/src/coin_api.rs @@ -436,7 +436,7 @@ mod tests { use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress}; use sui_types::coin::TreasuryCap; use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEventsDigest}; - use sui_types::effects::TransactionEffects; + use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult}; use sui_types::gas_coin::GAS; use sui_types::id::UID; @@ -476,6 +476,8 @@ mod tests { &self, digests: &[TransactionDigest], ) -> SuiResult>>; + + async fn multi_get_events_by_tx_digests(&self,digests: &[TransactionDigest]) -> SuiResult>>; } } diff --git a/crates/sui-json-rpc/src/read_api.rs b/crates/sui-json-rpc/src/read_api.rs index 905aa692cf418..5687b3f215cfc 100644 --- a/crates/sui-json-rpc/src/read_api.rs +++ b/crates/sui-json-rpc/src/read_api.rs @@ -40,7 +40,6 @@ use sui_storage::key_value_store::TransactionKeyValueStore; use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest}; use sui_types::collection_types::VecMap; use sui_types::crypto::AggregateAuthoritySignature; -use sui_types::digests::TransactionEventsDigest; use sui_types::display::DisplayVersionUpdatedEvent; use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents}; use sui_types::error::{SuiError, SuiObjectResponseError}; @@ -314,77 +313,44 @@ impl ReadApi { if opts.show_events { trace!("getting events"); - let events_digests_list = temp_response - .values() - .filter_map(|cache_entry| match &cache_entry.effects { - Some(eff) => eff.events_digest().cloned(), - None => None, - }) - .collect::>(); - // filter out empty events digest, as they do not have to be read from the DB - let empty_events_digest = TransactionEvents::default().digest(); - let events_digests_list = events_digests_list - .into_iter() - .filter(|d| d != &empty_events_digest) - .collect::>(); - - let mut events_digest_to_events = if events_digests_list.is_empty() { - HashMap::new() - } else { - // fetch events from the DB with retry, retry each 0.5s for 3s - let backoff = ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs(3)), - multiplier: 1.0, - ..ExponentialBackoff::default() - }; - let events = retry(backoff, || async { - match self - .transaction_kv_store - .multi_get_events(&events_digests_list) - .await - { - // Only return Ok when all the queried transaction events are found, otherwise retry - // until timeout, then return Err. - Ok(events) if !events.contains(&None) => Ok(events), - Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError( - "Events not found, transaction execution may be incomplete.".into(), - ))), - Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!( - "Failed to call multi_get_events: {e:?}" - )))), - } - }) - .await - .map_err(|e| { - Error::UnexpectedError(format!( - "Retrieving events with retry failed for events digests {events_digests_list:?}: {e:?}" - )) - })? - .into_iter(); - - // construct a hashmap of events digests -> events for fast lookup - let events_map = events_digests_list - .into_iter() - .zip(events) - .collect::>(); - // Double check that all events are `Some` and their digests match the key - for (events_digest, events) in events_map.iter() { - if let Some(events) = events { - if &events.digest() != events_digest { - return Err(Error::UnexpectedError(format!( - "Events digest {events_digest:?} does not match the key {:?}", - events.digest() - ))); - } - } else { - return Err(Error::UnexpectedError(format!( - "Events of digest {events_digest:?} is None, but it should not be" - ))); + let mut non_empty_digests = vec![]; + for cache_entry in temp_response.values() { + if let Some(effects) = &cache_entry.effects { + if effects.events_digest().is_some() { + non_empty_digests.push(cache_entry.digest); } } - events_map + } + // fetch events from the DB with retry, retry each 0.5s for 3s + let backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(3)), + multiplier: 1.0, + ..ExponentialBackoff::default() }; - events_digest_to_events.insert(empty_events_digest, Some(TransactionEvents::default())); + let mut events = retry(backoff, || async { + match self + .transaction_kv_store + .multi_get_events_by_tx_digests(&non_empty_digests) + .await + { + // Only return Ok when all the queried transaction events are found, otherwise retry + // until timeout, then return Err. + Ok(events) if !events.contains(&None) => Ok(events), + Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError( + "Events not found, transaction execution may be incomplete.".into(), + ))), + Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!( + "Failed to call multi_get_events: {e:?}" + )))), + } + }) + .await + .map_err(|e| { + Error::UnexpectedError(format!( + "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}" + )) + })? + .into_iter(); // fill cache with the events for (_, cache_entry) in temp_response.iter_mut() { @@ -392,15 +358,12 @@ impl ReadApi { if let Some(events_digest) = cache_entry.effects.as_ref().and_then(|e| e.events_digest()) { - let events = events_digest_to_events - .get(events_digest) - .cloned() - .unwrap_or_else(|| panic!("Expect event digest {events_digest:?} to be found in cache for transaction {transaction_digest}")) - .map(|events| to_sui_transaction_events(self, cache_entry.digest, events)); - match events { - Some(Ok(e)) => cache_entry.events = Some(e), - Some(Err(e)) => cache_entry.errors.push(e.to_string()), - None => { + match events.next() { + Some(Some(ev)) => { + cache_entry.events = + Some(to_sui_transaction_events(self, cache_entry.digest, ev)?) + } + None | Some(None) => { error!("Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"); cache_entry.errors.push(format!( "Failed to fetch events with event digest {events_digest:?}", @@ -830,30 +793,26 @@ impl ReadApiServer for ReadApi { } if opts.show_events && temp_response.effects.is_some() { - // safe to unwrap because we have checked is_some - if let Some(event_digest) = temp_response.effects.as_ref().unwrap().events_digest() - { - let transaction_kv_store = self.transaction_kv_store.clone(); - let event_digest = *event_digest; - let events = spawn_monitored_task!(async move { - transaction_kv_store - .get_events(event_digest) - .await - .map_err(|e| { - error!("Failed to call get transaction events for events digest: {event_digest:?} with error {e:?}"); - Error::from(e) - }) - }) + let transaction_kv_store = self.transaction_kv_store.clone(); + let events = spawn_monitored_task!(async move { + transaction_kv_store + .multi_get_events_by_tx_digests(&[digest]) .await - .map_err(Error::from)??; - match to_sui_transaction_events(self, digest, events) { + .map_err(|e| { + error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}"); + Error::from(e) + }) + }) + .await + .map_err(Error::from)?? + .pop() + .flatten(); + match events { + None => temp_response.events = Some(SuiTransactionBlockEvents::default()), + Some(events) => match to_sui_transaction_events(self, digest, events) { Ok(e) => temp_response.events = Some(e), Err(e) => temp_response.errors.push(e.to_string()), - }; - } else { - // events field will be Some if and only if `show_events` is true and - // there is no error in converting fetching events - temp_response.events = Some(SuiTransactionBlockEvents::default()); + }, } } @@ -935,38 +894,29 @@ impl ReadApiServer for ReadApi { let transaction_kv_store = self.transaction_kv_store.clone(); spawn_monitored_task!(async move{ let store = state.load_epoch_store_one_call_per_task(); - let effect = transaction_kv_store - .get_fx_by_tx_digest(transaction_digest) - .await - .map_err(Error::from)?; - let events = if let Some(event_digest) = effect.events_digest() { - transaction_kv_store - .get_events(*event_digest) + let events = transaction_kv_store + .multi_get_events_by_tx_digests(&[transaction_digest]) .await .map_err( |e| { - error!("Failed to get transaction events for event digest {event_digest:?} with error: {e:?}"); + error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}"); Error::StateReadError(e.into()) })? - .data - .into_iter() - .enumerate() - .map(|(seq, e)| { - let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?; - SuiEvent::try_from( - e, - *effect.transaction_digest(), - seq as u64, - None, - layout, - ) - }) - .collect::, _>>() - .map_err(Error::SuiError)? - } else { - vec![] - }; - Ok(events) + .pop() + .flatten(); + Ok(match events { + Some(events) => events + .data + .into_iter() + .enumerate() + .map(|(seq, e)| { + let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?; + SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout) + }) + .collect::, _>>() + .map_err(Error::SuiError)?, + None => vec![], + }) }).await.map_err(Error::from)? }) } diff --git a/crates/sui-storage/src/http_key_value_store.rs b/crates/sui-storage/src/http_key_value_store.rs index 94a93d712b286..d36423f7a3217 100644 --- a/crates/sui-storage/src/http_key_value_store.rs +++ b/crates/sui-storage/src/http_key_value_store.rs @@ -84,6 +84,7 @@ pub enum Key { CheckpointSummaryByDigest(CheckpointDigest), TxToCheckpoint(TransactionDigest), ObjectKey(ObjectID, VersionNumber), + EventsByTxDigest(TransactionDigest), } impl Key { @@ -99,6 +100,7 @@ impl Key { Key::CheckpointSummaryByDigest(_) => "cs", Key::TxToCheckpoint(_) => "tx2c", Key::ObjectKey(_, _) => "ob", + Key::EventsByTxDigest(_) => "evtx", } } @@ -117,6 +119,7 @@ impl Key { Key::CheckpointSummaryByDigest(digest) => encode_digest(digest), Key::TxToCheckpoint(digest) => encode_digest(digest), Key::ObjectKey(object_id, version) => encode_object_key(object_id, version), + Key::EventsByTxDigest(digest) => encode_digest(digest), } } @@ -543,4 +546,26 @@ impl TransactionKeyValueStoreTrait for HttpKVStore { Ok(results) } + + #[instrument(level = "trace", skip_all)] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + let keys = digests + .iter() + .map(|digest| Key::EventsByTxDigest(*digest)) + .collect::>(); + Ok(self + .multi_fetch(keys) + .await + .iter() + .zip(digests.iter()) + .map(map_fetch) + .map(|maybe_bytes| { + maybe_bytes + .and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..))) + }) + .collect::>()) + } } diff --git a/crates/sui-storage/src/key_value_store.rs b/crates/sui-storage/src/key_value_store.rs index 892ee5d1c5414..e4fbf1fdb7b36 100644 --- a/crates/sui-storage/src/key_value_store.rs +++ b/crates/sui-storage/src/key_value_store.rs @@ -383,6 +383,13 @@ impl TransactionKeyValueStore { ) -> SuiResult>> { self.inner.multi_get_transaction_checkpoint(digests).await } + + pub async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + self.inner.multi_get_events_by_tx_digests(digests).await + } } /// Immutable key/value store trait for storing/retrieving transactions, effects, and events. @@ -420,6 +427,11 @@ pub trait TransactionKeyValueStoreTrait { &self, digests: &[TransactionDigest], ) -> SuiResult>>; + + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>>; } /// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the @@ -588,6 +600,24 @@ impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore { Ok(res) } + + #[instrument(level = "trace", skip_all)] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?; + let (fallback, indices) = find_fallback(&res, digests); + if fallback.is_empty() { + return Ok(res); + } + let secondary_res = self + .fallback + .multi_get_events_by_tx_digests(&fallback) + .await?; + merge_res(&mut res, secondary_res, &indices); + Ok(res) + } } fn find_fallback(values: &[Option], keys: &[K]) -> (Vec, Vec) { diff --git a/crates/sui-storage/tests/key_value_tests.rs b/crates/sui-storage/tests/key_value_tests.rs index 9af4e0a99b365..9a29803ecb693 100644 --- a/crates/sui-storage/tests/key_value_tests.rs +++ b/crates/sui-storage/tests/key_value_tests.rs @@ -237,6 +237,13 @@ impl TransactionKeyValueStoreTrait for MockTxStore { .map(|digest| self.tx_to_checkpoint.get(digest).cloned()) .collect()) } + + async fn multi_get_events_by_tx_digests( + &self, + _: &[TransactionDigest], + ) -> SuiResult>> { + Ok(vec![]) + } } #[tokio::test]