Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inx): wait for slots to be finalized in INX #1395

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 92 additions & 8 deletions src/inx/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::stream::{Stream, StreamExt};
use futures::{
stream::{Stream, StreamExt},
TryStreamExt,
};
use inx::{client::InxClient, proto};
use iota_sdk::types::block::{payload::signed_transaction::TransactionId, slot::SlotIndex};
use packable::PackableExt;
Expand Down Expand Up @@ -37,6 +40,20 @@ impl Inx {
self.inx.read_node_status(proto::NoParams {}).await?.try_convert()
}

/// Wait for the status of the node to change.
pub async fn listen_to_status_changes(
&mut self,
) -> Result<impl Stream<Item = Result<NodeStatus, InxError>>, InxError> {
Ok(self
.inx
.listen_to_node_status(proto::NodeStatusRequest {
cooldown_in_milliseconds: 100,
})
.await?
.into_inner()
.map(|msg| msg?.try_convert()))
}

/// Get the configuration of the node.
pub async fn get_node_configuration(&mut self) -> Result<NodeConfiguration, InxError> {
self.inx
Expand All @@ -45,17 +62,84 @@ impl Inx {
.try_convert()
}

/// Get a stream of committed slots.
pub async fn get_committed_slots(
/// Get a committed slot by index.
pub async fn get_committed_slot(&mut self, slot: SlotIndex) -> Result<Commitment, InxError> {
self.inx
.read_commitment(proto::CommitmentRequest {
commitment_slot: slot.0,
commitment_id: None,
})
.await?
.try_convert()
}

/// Get a stream of finalized slots.
pub async fn get_finalized_slots(
&mut self,
request: SlotRangeRequest,
) -> Result<impl Stream<Item = Result<Commitment, InxError>>, InxError> {
Ok(self
.inx
.listen_to_commitments(proto::SlotRangeRequest::from(request))
struct StreamState {
inx: Option<Inx>,
latest_finalized_slot: u32,
curr_slot: u32,
last_slot: u32,
}

let latest_finalized_slot = self
.get_node_status()
.await?
.into_inner()
.map(|msg| msg?.try_convert()))
.latest_finalized_commitment
.commitment_id
.slot_index()
.0;
Ok(futures::stream::unfold(
StreamState {
inx: Some(self.clone()),
latest_finalized_slot,
curr_slot: request.start_slot(),
last_slot: request.end_slot(),
},
|mut state| async move {
// Inner function definition to simplify result type
async fn next(state: &mut StreamState) -> Result<Option<Commitment>, InxError> {
let Some(inx) = state.inx.as_mut() else { return Ok(None) };

if state.last_slot != 0 && state.curr_slot > state.last_slot {
return Ok(None);
}

// If the current slot is not yet finalized, we will wait.
if state.latest_finalized_slot < state.curr_slot {
let mut status_changes = inx.listen_to_status_changes().await?;
loop {
match status_changes.try_next().await? {
Some(status) => {
// If the status change updated the latest finalized commitment, we can continue.
if status.latest_finalized_commitment.commitment_id.slot_index().0
> state.latest_finalized_slot
{
state.latest_finalized_slot =
status.latest_finalized_commitment.commitment_id.slot_index().0;
break;
}
}
None => {
return Ok(None);
}
}
}
}
let commitment = inx.get_committed_slot(state.curr_slot.into()).await?;
Alex6323 marked this conversation as resolved.
Show resolved Hide resolved
state.curr_slot += 1;
Ok(Some(commitment))
}
let res = next(&mut state).await;
if res.is_err() {
state.inx = None;
}
res.transpose().map(|res| (res, state))
},
))
}

/// Get accepted blocks for a given slot.
Expand Down
10 changes: 10 additions & 0 deletions src/inx/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ impl SlotRangeRequest {
{
Self(to_slot_range_request(range))
}

/// Get the start slot.
pub fn start_slot(&self) -> u32 {
self.0.start_slot
}

/// Get the end slot.
pub fn end_slot(&self) -> u32 {
self.0.end_slot
}
}

impl From<SlotRangeRequest> for proto::SlotRangeRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/tangle/sources/inx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl InputSource for Inx {
) -> Result<BoxStream<Result<Commitment, Self::Error>>, Self::Error> {
let mut inx = self.clone();
Ok(Box::pin(
inx.get_committed_slots(SlotRangeRequest::from_range(range))
inx.get_finalized_slots(SlotRangeRequest::from_range(range))
.await?
.map_err(Self::Error::from),
))
Expand Down
Loading