Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STR-809 Fix Bridge Client Bug and add functional Test #564

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ borsh = { version = "1.5.0", features = ["derive"] }
bytes = "1.6.0"
chrono = "0.4.38"
clap = "4"
deadpool = "0.12.1"
digest = "0.10"
ethnum = "1.5.0"
eyre = "0.6"
Expand Down Expand Up @@ -243,7 +244,6 @@ tracing-opentelemetry = "0.27"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
zeroize = { version = "1.8.1", features = ["derive"] }

# This is needed for custom build of SP1
[profile.release.build-override]
opt-level = 3
Expand Down
1 change: 1 addition & 0 deletions bin/bridge-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ argh.workspace = true
async-trait.workspace = true
bitcoin.workspace = true
chrono.workspace = true
deadpool.workspace = true
directories = "5.0.1"
jsonrpsee.workspace = true
rockbound.workspace = true
Expand Down
26 changes: 20 additions & 6 deletions bin/bridge-client/src/modes/operator/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use bitcoin::{
key::{Keypair, Parity},
secp256k1::{PublicKey, SecretKey, XOnlyPublicKey, SECP256K1},
};
use jsonrpsee::{core::client::async_client::Client as L2RpcClient, ws_client::WsClientBuilder};
use strata_bridge_exec::handler::ExecHandler;
use deadpool::managed::{self, Pool};
use strata_bridge_exec::{
handler::ExecHandler,
ws_client::{WsClientConfig, WsClientManager},
};
use strata_bridge_sig_manager::prelude::SignatureManager;
use strata_bridge_tx_builder::prelude::TxBuildContext;
use strata_btcio::rpc::{traits::Reader, BitcoinClient};
Expand Down Expand Up @@ -62,10 +65,21 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> {
BitcoinClient::new(args.btc_url, args.btc_user, args.btc_pass)
.expect("error creating the bitcoin client"),
);
let l2_rpc_client: L2RpcClient = WsClientBuilder::default()
.build(args.rollup_url)

let config = WsClientConfig {
url: args.rollup_url.clone(),
};
let manager = WsClientManager { config };
let l2_rpc_client_pool: Pool<WsClientManager> =
managed::Pool::<WsClientManager>::builder(manager)
.max_size(5)
.build()
.unwrap();

let l2_rpc_client = l2_rpc_client_pool
.get()
.await
.expect("failed to connect to the rollup RPC server");
.expect("cannot get rpc client from pool");

// Get the keypair after deriving the wallet xpriv.
let operator_keys = resolve_xpriv(args.master_xpriv, args.master_xpriv_path)?;
Expand Down Expand Up @@ -130,7 +144,7 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> {
let exec_handler = ExecHandler {
tx_build_ctx: tx_context,
sig_manager,
l2_rpc_client,
l2_rpc_client_pool,
keypair,
own_index,
msg_polling_interval,
Expand Down
86 changes: 43 additions & 43 deletions bin/bridge-client/src/modes/operator/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,56 @@ use strata_storage::ops::{bridge_duty::BridgeDutyOps, bridge_duty_index::BridgeD
use tokio::{task::JoinSet, time::sleep};
use tracing::{error, info, trace, warn};

pub(super) struct TaskManager<L2Client, TxBuildContext, Bcast>
pub(super) struct TaskManager<TxBuildContext, Bcast>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
Bcast: Broadcaster,
{
pub(super) exec_handler: Arc<ExecHandler<L2Client, TxBuildContext>>,
pub(super) exec_handler: Arc<ExecHandler<TxBuildContext>>,
pub(super) broadcaster: Arc<Bcast>,
pub(super) bridge_duty_db_ops: Arc<BridgeDutyOps>,
pub(super) bridge_duty_idx_db_ops: Arc<BridgeDutyIndexOps>,
}

impl<L2Client, TxBuildContext, Bcast> TaskManager<L2Client, TxBuildContext, Bcast>
impl<TxBuildContext, Bcast> TaskManager<TxBuildContext, Bcast>
where
L2Client: StrataApiClient + Sync + Send + 'static,
TxBuildContext: BuildContext + Sync + Send + 'static,
Bcast: Broadcaster + Sync + Send + 'static,
{
pub(super) async fn start(&self, duty_polling_interval: Duration) -> anyhow::Result<()> {
loop {
let RpcBridgeDuties {
if let Ok(RpcBridgeDuties {
duties,
start_index,
stop_index,
} = self.poll_duties().await?;

let mut handles = JoinSet::new();
for duty in duties {
let exec_handler = self.exec_handler.clone();
let bridge_duty_ops = self.bridge_duty_db_ops.clone();
let broadcaster = self.broadcaster.clone();
handles.spawn(async move {
process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await
});
}
}) = self.poll_duties().await
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this to an if let like this is a minor change but it makes the structure much different. This should reason about the error conditions more explicitly.

I don't even see the error case being handled here, we need to do that. From what I can tell, if this errors immediately (like because the connection is broken) then we just spin in a hot loop eating a CPU core until we don't encounter the error anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented exponential backoff with max retry count which allows Bridge to handle the ws related issues more gracefully

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you completely sure that this will not hot spin under any circumstances? If so then please add a comment explaining why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's in new commit. there is max retry count added with backoff, which doesn't let it looping forever

let mut handles = JoinSet::new();
for duty in duties {
let exec_handler = self.exec_handler.clone();
let bridge_duty_ops = self.bridge_duty_db_ops.clone();
let broadcaster = self.broadcaster.clone();
handles.spawn(async move {
process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await
});
}

let any_failed = handles.join_all().await.iter().any(|res| res.is_err());
let any_failed = handles.join_all().await.iter().any(|res| res.is_err());

// if none of the duties failed, update the duty index so that the
// next batch is fetched in the next poll.
//
// otherwise, don't update the index so that the current batch is refetched and
// ones that were not executed successfully are executed again.
if !any_failed {
info!(%start_index, %stop_index, "updating duty index");
if let Err(e) = self
.bridge_duty_idx_db_ops
.set_index_async(stop_index)
.await
{
error!(error = %e, %start_index, %stop_index, "could not update duty index");
// if none of the duties failed, update the duty index so that the
// next batch is fetched in the next poll.
//
// otherwise, don't update the index so that the current batch is refetched and
// ones that were not executed successfully are executed again.
if !any_failed {
info!(%start_index, %stop_index, "updating duty index");
if let Err(e) = self
.bridge_duty_idx_db_ops
.set_index_async(stop_index)
.await
{
error!(error = %e, %start_index, %stop_index, "could not update duty index");
}
}
}

Expand All @@ -85,13 +84,17 @@ where
.unwrap_or(Some(0))
.unwrap_or(0);

let l2_rpc_client = self
.exec_handler
.l2_rpc_client_pool
.get()
.await
.expect("cannot get rpc client");
let RpcBridgeDuties {
duties,
start_index,
stop_index,
} = self
.exec_handler
.l2_rpc_client
} = l2_rpc_client
.get_bridge_duties(self.exec_handler.own_index, start_index)
.await?;

Expand Down Expand Up @@ -150,14 +153,13 @@ where
/// # Errors
///
/// If the duty fails to be processed.
async fn process_duty<L2Client, TxBuildContext, Bcast>(
exec_handler: Arc<ExecHandler<L2Client, TxBuildContext>>,
async fn process_duty<TxBuildContext, Bcast>(
exec_handler: Arc<ExecHandler<TxBuildContext>>,
duty_status_ops: Arc<BridgeDutyOps>,
broadcaster: Arc<Bcast>,
duty: &BridgeDuty,
) -> ExecResult<()>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
Bcast: Broadcaster,
{
Expand Down Expand Up @@ -212,15 +214,14 @@ where
/// # Errors
///
/// If there is an error during the execution of the duty.
async fn execute_duty<L2Client, TxBuildContext, Tx, Bcast>(
exec_handler: Arc<ExecHandler<L2Client, TxBuildContext>>,
async fn execute_duty<TxBuildContext, Tx, Bcast>(
exec_handler: Arc<ExecHandler<TxBuildContext>>,
broadcaster: Arc<Bcast>,
duty_status_ops: Arc<BridgeDutyOps>,
tracker_txid: Txid,
tx_info: Tx,
) -> ExecResult<()>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
Tx: TxKind + Debug,
Bcast: Broadcaster,
Expand Down Expand Up @@ -280,13 +281,12 @@ where

/// Aggregates nonces and signatures for a given [`Txid`] and then, broadcasts the fully signed
/// transaction to Bitcoin.
async fn aggregate_and_broadcast<L2Client, TxBuildContext, Bcast>(
exec_handler: Arc<ExecHandler<L2Client, TxBuildContext>>,
async fn aggregate_and_broadcast<TxBuildContext, Bcast>(
exec_handler: Arc<ExecHandler<TxBuildContext>>,
broadcaster: Arc<Bcast>,
txid: &Txid,
) -> ExecResult<()>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
Bcast: Broadcaster,
{
Expand Down
1 change: 1 addition & 0 deletions crates/bridge-exec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ strata-rpc-types.workspace = true

bitcoin = { workspace = true, features = ["rand-std"] }
borsh.workspace = true
deadpool.workspace = true
format_serde_error.workspace = true
jsonrpsee.workspace = true
serde.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions crates/bridge-exec/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Defines the error types associated with executing the transaction duties.

use std::fmt;
use deadpool::managed::{Manager::Error, PoolError};
use jsonrpsee::core::ClientError as L2ClientError;
use strata_bridge_tx_builder::errors::BridgeTxBuilderError;
use strata_btcio::rpc::error::ClientError as L1ClientError;
Expand Down Expand Up @@ -47,6 +49,10 @@ pub enum ExecError {
/// Signer does not have access to the [`Xpriv`](bitcoin::bip32::Xpriv)
#[error("bitcoin signer do not have access to the private keys, i.e. xpriv")]
Xpriv,

/// Error getting the web socket client from pool
#[error("fetching websocket client from pool failed: {0}")]
WsPool(#[from] PoolError<Error>),
}

/// Result of a execution that may produce an [`ExecError`].
Expand Down
39 changes: 23 additions & 16 deletions crates/bridge-exec/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, time::Duration};

use bitcoin::{key::Keypair, Transaction, Txid};
use borsh::{BorshDeserialize, BorshSerialize};
use deadpool::managed::Pool;
use jsonrpsee::tokio::time::sleep;
use strata_bridge_sig_manager::manager::SignatureManager;
use strata_bridge_tx_builder::{context::BuildContext, TxKind};
Expand All @@ -19,22 +20,22 @@ use strata_rpc_api::StrataApiClient;
use strata_rpc_types::HexBytes;
use tracing::{debug, info, warn};

use crate::errors::{ExecError, ExecResult};
use crate::{
errors::{ExecError, ExecResult},
ws_client::WsClientManager,
};

/// The execution context for handling bridge-related signing activities.
#[derive(Clone)]
pub struct ExecHandler<
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
> {
pub struct ExecHandler<TxBuildContext: BuildContext + Sync + Send> {
/// The build context required to create transaction data needed for signing.
pub tx_build_ctx: TxBuildContext,

/// The signature manager that handles nonce and signature aggregation.
pub sig_manager: SignatureManager,

/// The RPC client to connect to the RPC full node.
pub l2_rpc_client: L2Client,
pub l2_rpc_client_pool: Pool<WsClientManager>,
voidash marked this conversation as resolved.
Show resolved Hide resolved

/// The keypair for this client used to sign bridge-related messages.
pub keypair: Keypair,
Expand All @@ -46,9 +47,8 @@ pub struct ExecHandler<
pub msg_polling_interval: Duration,
}

impl<L2Client, TxBuildContext> ExecHandler<L2Client, TxBuildContext>
impl<TxBuildContext> ExecHandler<TxBuildContext>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
{
/// Construct and sign a transaction based on the provided `TxInfo`.
Expand Down Expand Up @@ -122,9 +122,13 @@ where
let raw_message = borsh::to_vec::<BridgeMessage>(&signed_message)
.expect("should be able to borsh serialize raw message");

self.l2_rpc_client
.submit_bridge_msg(raw_message.into())
.await?;
let l2_rpc_client = self
.l2_rpc_client_pool
.get()
.await
.map_err(|err| ExecError::WsPool(err))?;
voidash marked this conversation as resolved.
Show resolved Hide resolved

l2_rpc_client.submit_bridge_msg(raw_message.into()).await?;

info!(%txid, ?scope, ?payload, "broadcasted message");
Ok(signed_message)
Expand Down Expand Up @@ -286,9 +290,13 @@ where
Payload: BorshDeserialize + Debug,
{
let raw_scope: HexBytes = scope.into();
info!(scope=?scope, "getting messages from the L2 Client");
let received_payloads = self
.l2_rpc_client
info!(scope = ?scope, "getting messages from the L2 Client");
voidash marked this conversation as resolved.
Show resolved Hide resolved

let l2_rpc_client = self.l2_rpc_client_pool.get()
.await
.map_err(|err| ExecError::WsPool(err))?;

let received_payloads = l2_rpc_client
.get_msgs_by_scope(raw_scope)
.await?
.into_iter()
Expand All @@ -315,9 +323,8 @@ where
}
}

impl<L2Client, TxBuildContext> Debug for ExecHandler<L2Client, TxBuildContext>
impl<TxBuildContext> Debug for ExecHandler<TxBuildContext>
where
L2Client: StrataApiClient + Sync + Send,
TxBuildContext: BuildContext + Sync + Send,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
1 change: 1 addition & 0 deletions crates/bridge-exec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
pub mod config;
pub mod errors;
pub mod handler;
pub mod ws_client;
Loading
Loading