Skip to content

Commit

Permalink
[json rpc] query events based on transaction digest (MystenLabs#20422)
Browse files Browse the repository at this point in the history
## Description 

changes events query access pattern in JSON RPC from queries by events
digest to queries by transaction digest.
It's prerequisite to switch to BigTable based KV store  


---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Nov 27, 2024
1 parent f0ade5e commit 8bd5399
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 127 deletions.
25 changes: 25 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5409,6 +5409,31 @@ impl TransactionKeyValueStoreTrait for AuthorityState {
.map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
.collect())
}

#[instrument(skip(self))]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
if digests.is_empty() {
return Ok(vec![]);
}
let events_digests: Vec<_> = self
.get_transaction_cache_reader()
.multi_get_executed_effects(digests)
.into_iter()
.map(|t| t.and_then(|t| t.events_digest().cloned()))
.collect();
let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
let mut events = self
.get_transaction_cache_reader()
.multi_get_events(&non_empty_events)
.into_iter();
Ok(events_digests
.into_iter()
.map(|ev| ev.and_then(|_| events.next()?))
.collect())
}
}

#[cfg(msim)]
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-json-rpc/src/coin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
use sui_types::coin::TreasuryCap;
use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEventsDigest};
use sui_types::effects::TransactionEffects;
use sui_types::effects::{TransactionEffects, TransactionEvents};
use sui_types::error::{SuiError, SuiResult};
use sui_types::gas_coin::GAS;
use sui_types::id::UID;
Expand Down Expand Up @@ -476,6 +476,8 @@ mod tests {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(&self,digests: &[TransactionDigest]) -> SuiResult<Vec<Option<TransactionEvents>>>;
}
}

Expand Down
202 changes: 76 additions & 126 deletions crates/sui-json-rpc/src/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use sui_storage::key_value_store::TransactionKeyValueStore;
use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
use sui_types::collection_types::VecMap;
use sui_types::crypto::AggregateAuthoritySignature;
use sui_types::digests::TransactionEventsDigest;
use sui_types::display::DisplayVersionUpdatedEvent;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
use sui_types::error::{SuiError, SuiObjectResponseError};
Expand Down Expand Up @@ -314,93 +313,57 @@ impl ReadApi {

if opts.show_events {
trace!("getting events");
let events_digests_list = temp_response
.values()
.filter_map(|cache_entry| match &cache_entry.effects {
Some(eff) => eff.events_digest().cloned(),
None => None,
})
.collect::<Vec<TransactionEventsDigest>>();
// filter out empty events digest, as they do not have to be read from the DB
let empty_events_digest = TransactionEvents::default().digest();
let events_digests_list = events_digests_list
.into_iter()
.filter(|d| d != &empty_events_digest)
.collect::<Vec<_>>();

let mut events_digest_to_events = if events_digests_list.is_empty() {
HashMap::new()
} else {
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
let events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events(&events_digests_list)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for events digests {events_digests_list:?}: {e:?}"
))
})?
.into_iter();

// construct a hashmap of events digests -> events for fast lookup
let events_map = events_digests_list
.into_iter()
.zip(events)
.collect::<HashMap<_, _>>();
// Double check that all events are `Some` and their digests match the key
for (events_digest, events) in events_map.iter() {
if let Some(events) = events {
if &events.digest() != events_digest {
return Err(Error::UnexpectedError(format!(
"Events digest {events_digest:?} does not match the key {:?}",
events.digest()
)));
}
} else {
return Err(Error::UnexpectedError(format!(
"Events of digest {events_digest:?} is None, but it should not be"
)));
let mut non_empty_digests = vec![];
for cache_entry in temp_response.values() {
if let Some(effects) = &cache_entry.effects {
if effects.events_digest().is_some() {
non_empty_digests.push(cache_entry.digest);
}
}
events_map
}
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
events_digest_to_events.insert(empty_events_digest, Some(TransactionEvents::default()));
let mut events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events_by_tx_digests(&non_empty_digests)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
))
})?
.into_iter();

// fill cache with the events
for (_, cache_entry) in temp_response.iter_mut() {
let transaction_digest = cache_entry.digest;
if let Some(events_digest) =
cache_entry.effects.as_ref().and_then(|e| e.events_digest())
{
let events = events_digest_to_events
.get(events_digest)
.cloned()
.unwrap_or_else(|| panic!("Expect event digest {events_digest:?} to be found in cache for transaction {transaction_digest}"))
.map(|events| to_sui_transaction_events(self, cache_entry.digest, events));
match events {
Some(Ok(e)) => cache_entry.events = Some(e),
Some(Err(e)) => cache_entry.errors.push(e.to_string()),
None => {
match events.next() {
Some(Some(ev)) => {
cache_entry.events =
Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
}
None | Some(None) => {
error!("Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}");
cache_entry.errors.push(format!(
"Failed to fetch events with event digest {events_digest:?}",
Expand Down Expand Up @@ -830,30 +793,26 @@ impl ReadApiServer for ReadApi {
}

if opts.show_events && temp_response.effects.is_some() {
// safe to unwrap because we have checked is_some
if let Some(event_digest) = temp_response.effects.as_ref().unwrap().events_digest()
{
let transaction_kv_store = self.transaction_kv_store.clone();
let event_digest = *event_digest;
let events = spawn_monitored_task!(async move {
transaction_kv_store
.get_events(event_digest)
.await
.map_err(|e| {
error!("Failed to call get transaction events for events digest: {event_digest:?} with error {e:?}");
Error::from(e)
})
})
let transaction_kv_store = self.transaction_kv_store.clone();
let events = spawn_monitored_task!(async move {
transaction_kv_store
.multi_get_events_by_tx_digests(&[digest])
.await
.map_err(Error::from)??;
match to_sui_transaction_events(self, digest, events) {
.map_err(|e| {
error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
Error::from(e)
})
})
.await
.map_err(Error::from)??
.pop()
.flatten();
match events {
None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
Some(events) => match to_sui_transaction_events(self, digest, events) {
Ok(e) => temp_response.events = Some(e),
Err(e) => temp_response.errors.push(e.to_string()),
};
} else {
// events field will be Some if and only if `show_events` is true and
// there is no error in converting fetching events
temp_response.events = Some(SuiTransactionBlockEvents::default());
},
}
}

Expand Down Expand Up @@ -935,38 +894,29 @@ impl ReadApiServer for ReadApi {
let transaction_kv_store = self.transaction_kv_store.clone();
spawn_monitored_task!(async move{
let store = state.load_epoch_store_one_call_per_task();
let effect = transaction_kv_store
.get_fx_by_tx_digest(transaction_digest)
.await
.map_err(Error::from)?;
let events = if let Some(event_digest) = effect.events_digest() {
transaction_kv_store
.get_events(*event_digest)
let events = transaction_kv_store
.multi_get_events_by_tx_digests(&[transaction_digest])
.await
.map_err(
|e| {
error!("Failed to get transaction events for event digest {event_digest:?} with error: {e:?}");
error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
Error::StateReadError(e.into())
})?
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(
e,
*effect.transaction_digest(),
seq as u64,
None,
layout,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?
} else {
vec![]
};
Ok(events)
.pop()
.flatten();
Ok(match events {
Some(events) => events
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?,
None => vec![],
})
}).await.map_err(Error::from)?
})
}
Expand Down
25 changes: 25 additions & 0 deletions crates/sui-storage/src/http_key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum Key {
CheckpointSummaryByDigest(CheckpointDigest),
TxToCheckpoint(TransactionDigest),
ObjectKey(ObjectID, VersionNumber),
EventsByTxDigest(TransactionDigest),
}

impl Key {
Expand All @@ -99,6 +100,7 @@ impl Key {
Key::CheckpointSummaryByDigest(_) => "cs",
Key::TxToCheckpoint(_) => "tx2c",
Key::ObjectKey(_, _) => "ob",
Key::EventsByTxDigest(_) => "evtx",
}
}

Expand All @@ -117,6 +119,7 @@ impl Key {
Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
Key::TxToCheckpoint(digest) => encode_digest(digest),
Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
Key::EventsByTxDigest(digest) => encode_digest(digest),
}
}

Expand Down Expand Up @@ -543,4 +546,26 @@ impl TransactionKeyValueStoreTrait for HttpKVStore {

Ok(results)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let keys = digests
.iter()
.map(|digest| Key::EventsByTxDigest(*digest))
.collect::<Vec<_>>();
Ok(self
.multi_fetch(keys)
.await
.iter()
.zip(digests.iter())
.map(map_fetch)
.map(|maybe_bytes| {
maybe_bytes
.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, &bytes.slice(1..)))
})
.collect::<Vec<_>>())
}
}
30 changes: 30 additions & 0 deletions crates/sui-storage/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ impl TransactionKeyValueStore {
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
self.inner.multi_get_transaction_checkpoint(digests).await
}

pub async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
self.inner.multi_get_events_by_tx_digests(digests).await
}
}

/// Immutable key/value store trait for storing/retrieving transactions, effects, and events.
Expand Down Expand Up @@ -420,6 +427,11 @@ pub trait TransactionKeyValueStoreTrait {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>>;
}

/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the
Expand Down Expand Up @@ -588,6 +600,24 @@ impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {

Ok(res)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
let (fallback, indices) = find_fallback(&res, digests);
if fallback.is_empty() {
return Ok(res);
}
let secondary_res = self
.fallback
.multi_get_events_by_tx_digests(&fallback)
.await?;
merge_res(&mut res, secondary_res, &indices);
Ok(res)
}
}

fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
Expand Down
Loading

0 comments on commit 8bd5399

Please sign in to comment.