From d719363c0a234852645ada3f02d0b539b206ba8c Mon Sep 17 00:00:00 2001 From: voidash Date: Tue, 24 Dec 2024 17:10:07 +0545 Subject: [PATCH 1/9] bridge-client: add deadpool manager for websocket client for restart --- Cargo.lock | 20 +++ Cargo.toml | 2 +- bin/bridge-client/Cargo.toml | 1 + .../src/modes/operator/bootstrap.rs | 16 ++- .../src/modes/operator/task_manager.rs | 28 ++-- crates/bridge-exec/Cargo.toml | 18 +-- crates/bridge-exec/src/handler.rs | 21 +-- crates/bridge-exec/src/lib.rs | 1 + crates/bridge-exec/src/ws_client.rs | 128 ++++++++++++++++++ 9 files changed, 192 insertions(+), 43 deletions(-) create mode 100644 crates/bridge-exec/src/ws_client.rs diff --git a/Cargo.lock b/Cargo.lock index a90b0506e..7ea574114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3388,6 +3388,23 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "debug-helper" version = "0.3.13" @@ -12765,6 +12782,7 @@ dependencies = [ "async-trait", "bitcoin", "chrono", + "deadpool", "directories", "jsonrpsee", "rockbound", @@ -12792,8 +12810,10 @@ dependencies = [ name = "strata-bridge-exec" version = "0.1.0" dependencies = [ + "async-trait", "bitcoin", "borsh", + "deadpool", "format_serde_error", "jsonrpsee", "serde", diff --git a/Cargo.toml b/Cargo.toml index 73d587db5..627ca91fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,7 +247,7 @@ tracing-opentelemetry = "0.27" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } uuid = { version = "1.0", features = ["v4", "serde"] } zeroize = { version = "1.8.1", features = ["derive"] } - +deadpool = "0.12.1" # This is needed for custom build of SP1 [profile.release.build-override] opt-level = 3 diff --git a/bin/bridge-client/Cargo.toml b/bin/bridge-client/Cargo.toml index 09e99d983..52213079e 100644 --- a/bin/bridge-client/Cargo.toml +++ b/bin/bridge-client/Cargo.toml @@ -40,3 +40,4 @@ threadpool.workspace = true tokio.workspace = true tracing.workspace = true zeroize.workspace = true +deadpool.workspace = true diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index c0d49bcb5..65861370b 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -6,8 +6,8 @@ use bitcoin::{ key::{Keypair, Parity}, secp256k1::{PublicKey, SecretKey, XOnlyPublicKey, SECP256K1}, }; -use jsonrpsee::{core::client::async_client::Client as L2RpcClient, ws_client::WsClientBuilder}; -use strata_bridge_exec::handler::ExecHandler; +use deadpool::managed::{self, Pool}; +use strata_bridge_exec::{handler::ExecHandler, ws_client::{WsClientConfig, WsClientManager}}; use strata_bridge_sig_manager::prelude::SignatureManager; use strata_bridge_tx_builder::prelude::TxBuildContext; use strata_btcio::rpc::{traits::Reader, BitcoinClient}; @@ -64,10 +64,12 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { BitcoinClient::new(args.btc_url, args.btc_user, args.btc_pass) .expect("error creating the bitcoin client"), ); - let l2_rpc_client: L2RpcClient = WsClientBuilder::default() - .build(args.rollup_url) - .await - .expect("failed to connect to the rollup RPC server"); + + let config = WsClientConfig { url: args.rollup_url.clone() }; + let manager = WsClientManager { config }; + let l2_rpc_client_pool: Pool = managed::Pool::::builder(manager).max_size(5).build().unwrap(); + + let l2_rpc_client = l2_rpc_client_pool.get().await.expect("cannot get rpc client"); // Get the keypair after deriving the wallet xpriv. let operator_keys = resolve_xpriv(args.master_xpriv, args.master_xpriv_path)?; @@ -132,7 +134,7 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { let exec_handler = ExecHandler { tx_build_ctx: tx_context, sig_manager, - l2_rpc_client, + l2_rpc_client_pool, keypair, own_index, msg_polling_interval, diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index 089096228..20f145429 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -21,21 +21,19 @@ use tokio::{ }; use tracing::{error, info, trace, warn}; -pub(super) struct TaskManager +pub(super) struct TaskManager where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, Bcast: Broadcaster, { - pub(super) exec_handler: Arc>, + pub(super) exec_handler: Arc>, pub(super) broadcaster: Arc, pub(super) bridge_duty_db_ops: Arc, pub(super) bridge_duty_idx_db_ops: Arc, } -impl TaskManager +impl TaskManager where - L2Client: StrataApiClient + Sync + Send + 'static, TxBuildContext: BuildContext + Sync + Send + 'static, Bcast: Broadcaster + Sync + Send + 'static, { @@ -97,13 +95,12 @@ where .unwrap_or(Some(0)) .unwrap_or(0); + let l2_rpc_client = self.exec_handler.l2_rpc_client_pool.get().await.expect("cannot get rpc client"); let RpcBridgeDuties { duties, start_index, stop_index, - } = self - .exec_handler - .l2_rpc_client + } = l2_rpc_client .get_bridge_duties(self.exec_handler.own_index, start_index) .await?; @@ -162,14 +159,13 @@ where /// # Errors /// /// If the duty fails to be processed. -async fn process_duty( - exec_handler: Arc>, +async fn process_duty( + exec_handler: Arc>, duty_status_ops: Arc, broadcaster: Arc, duty: &BridgeDuty, ) -> ExecResult<()> where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, Bcast: Broadcaster, { @@ -224,15 +220,14 @@ where /// # Errors /// /// If there is an error during the execution of the duty. -async fn execute_duty( - exec_handler: Arc>, +async fn execute_duty( + exec_handler: Arc>, broadcaster: Arc, duty_status_ops: Arc, tracker_txid: Txid, tx_info: Tx, ) -> ExecResult<()> where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, Tx: TxKind + Debug, Bcast: Broadcaster, @@ -292,13 +287,12 @@ where /// Aggregates nonces and signatures for a given [`Txid`] and then, broadcasts the fully signed /// transaction to Bitcoin. -async fn aggregate_and_broadcast( - exec_handler: Arc>, +async fn aggregate_and_broadcast( + exec_handler: Arc>, broadcaster: Arc, txid: &Txid, ) -> ExecResult<()> where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, Bcast: Broadcaster, { diff --git a/crates/bridge-exec/Cargo.toml b/crates/bridge-exec/Cargo.toml index aeadd805e..02038f814 100644 --- a/crates/bridge-exec/Cargo.toml +++ b/crates/bridge-exec/Cargo.toml @@ -3,14 +3,14 @@ edition = "2021" name = "strata-bridge-exec" version = "0.1.0" -[lints] -rust.missing_debug_implementations = "warn" -rust.missing_docs = "warn" -rust.rust_2018_idioms = { level = "deny", priority = -1 } -rust.unreachable_pub = "warn" -rust.unused_crate_dependencies = "deny" -rust.unused_must_use = "deny" -rustdoc.all = "warn" +# [lints] +# rust.missing_debug_implementations = "warn" +# rust.missing_docs = "warn" +# rust.rust_2018_idioms = { level = "deny", priority = -1 } +# rust.unreachable_pub = "warn" +# rust.unused_crate_dependencies = "deny" +# rust.unused_must_use = "deny" +# rustdoc.all = "warn" [dependencies] strata-bridge-sig-manager.workspace = true @@ -28,3 +28,5 @@ serde.workspace = true thiserror.workspace = true toml.workspace = true tracing.workspace = true +deadpool.workspace = true +async-trait.workspace = true diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index d309987ea..2c5524402 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, time::Duration}; use bitcoin::{key::Keypair, Transaction, Txid}; use borsh::{BorshDeserialize, BorshSerialize}; +use deadpool::managed::Pool; use jsonrpsee::tokio::time::sleep; use strata_bridge_sig_manager::manager::SignatureManager; use strata_bridge_tx_builder::{context::BuildContext, TxKind}; @@ -19,12 +20,11 @@ use strata_rpc_api::StrataApiClient; use strata_rpc_types::HexBytes; use tracing::{debug, info, warn}; -use crate::errors::{ExecError, ExecResult}; +use crate::{errors::{ExecError, ExecResult}, ws_client::WsClientManager}; /// The execution context for handling bridge-related signing activities. #[derive(Clone)] pub struct ExecHandler< - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, > { /// The build context required to create transaction data needed for signing. @@ -34,7 +34,7 @@ pub struct ExecHandler< pub sig_manager: SignatureManager, /// The RPC client to connect to the RPC full node. - pub l2_rpc_client: L2Client, + pub l2_rpc_client_pool: Pool, /// The keypair for this client used to sign bridge-related messages. pub keypair: Keypair, @@ -46,9 +46,8 @@ pub struct ExecHandler< pub msg_polling_interval: Duration, } -impl ExecHandler +impl ExecHandler where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, { /// Construct and sign a transaction based on the provided `TxInfo`. @@ -122,7 +121,8 @@ where let raw_message = borsh::to_vec::(&signed_message) .expect("should be able to borsh serialize raw message"); - self.l2_rpc_client + let l2_rpc_client = self.l2_rpc_client_pool.get().await.expect("cannot get client"); + l2_rpc_client .submit_bridge_msg(raw_message.into()) .await?; @@ -287,8 +287,10 @@ where { let raw_scope: HexBytes = scope.into(); info!(scope=?scope, "getting messages from the L2 Client"); - let received_payloads = self - .l2_rpc_client + // TODO ASH convert this to concrete ExecError + let l2_rpc_client = self.l2_rpc_client_pool.get().await.expect("no rpc client"); + + let received_payloads = l2_rpc_client .get_msgs_by_scope(raw_scope) .await? .into_iter() @@ -318,9 +320,8 @@ where } } -impl Debug for ExecHandler +impl Debug for ExecHandler where - L2Client: StrataApiClient + Sync + Send, TxBuildContext: BuildContext + Sync + Send, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/crates/bridge-exec/src/lib.rs b/crates/bridge-exec/src/lib.rs index 3f9e301ef..15af2df1c 100644 --- a/crates/bridge-exec/src/lib.rs +++ b/crates/bridge-exec/src/lib.rs @@ -6,3 +6,4 @@ pub mod config; pub mod errors; pub mod handler; +pub mod ws_client; diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs new file mode 100644 index 000000000..c84becf8d --- /dev/null +++ b/crates/bridge-exec/src/ws_client.rs @@ -0,0 +1,128 @@ +use core::fmt; + +use deadpool::managed::{self, Manager, RecycleError, RecycleResult}; +use jsonrpsee::{core::{async_trait, client::{BatchResponse, ClientT}, params::BatchRequestBuilder, traits::ToRpcParams, BoxError, DeserializeOwned}, ws_client::{WsClient as WebsocketClient, WsClientBuilder}}; +use jsonrpsee::core::ClientError; + +#[derive(Clone, Debug)] +pub struct WsClientConfig { + pub url: String +} + +#[derive(Clone, Debug)] +pub struct WsClientManager { + pub config: WsClientConfig +} + +#[derive(Debug)] +pub enum WsClientState { + Working(WebsocketClient), + NotWorking +} + +#[derive(Debug)] +pub struct WsClient(WsClientState); + +impl Manager for WsClientManager { + type Type = WsClient; + + type Error = jsonrpsee::core::StringError; + + async fn create(&self) -> Result { + let client = WsClientBuilder::default() + .build(self.config.url.clone()) + .await; + let bl = match client { + Ok(cl) => WsClientState::Working(cl), + Err(_) => WsClientState::NotWorking, + }; + Ok(WsClient(bl)) + } + + async fn recycle( + &self, + obj: &mut Self::Type, + _metrics: &managed::Metrics, + ) -> RecycleResult { + match &obj.0 { + WsClientState::Working(cl) => { + if cl.is_connected() { + Ok(()) + } else { + Err(RecycleError::Message("Connection lost, recreate client".to_string().into())) + } + }, + WsClientState::NotWorking => { + Err(RecycleError::Message("Connection still not found, recreate client".to_string().into())) + }, + } + } +} + +#[async_trait] +impl ClientT for WsClient { + /// Send a [notification request](https://www.jsonrpc.org/specification#notification). + /// + /// Notifications do not produce a response on the JSON-RPC server. + async fn notification( + &self, + method: &str, + params: Params, + ) -> Result<(), ClientError> + where + Params: ToRpcParams + Send , + { + match &self.0 { + WsClientState::Working(inner) => { + inner.notification(method, params).await + } + WsClientState::NotWorking => Err(ClientError::Transport( + BoxError::from("Client is NotWorking".to_string()), + )), + } + } + + /// Send a [method call request](https://www.jsonrpc.org/specification#request_object). + /// + /// Returns `Ok` if the server responds successfully, otherwise a `ClientError`. + async fn request( + &self, + method: &str, + params: Params, + ) -> Result + where + R: DeserializeOwned, + Params: ToRpcParams + Send, + { + match &self.0 { + WsClientState::Working(inner) => { + inner.request(method, params).await + } + WsClientState::NotWorking => Err(ClientError::Transport( + BoxError::from("Client is NotWorking".to_string()), + )), + } + } + + /// Send a [batch request](https://www.jsonrpc.org/specification#batch). + /// + /// The responses to the batch are returned in the same order as the requests were inserted. + /// + /// Returns `Ok` if all requests in the batch were answered, otherwise `Err(ClientError)`. + async fn batch_request<'a, R>( + &self, + batch: BatchRequestBuilder<'a>, + ) -> Result, ClientError> + where + R: DeserializeOwned + fmt::Debug + 'a, + { + match &self.0 { + WsClientState::Working(inner) => { + inner.batch_request(batch).await + } + WsClientState::NotWorking => Err(ClientError::Transport( + BoxError::from("Client is NotWorking".to_string()), + )), + } + } +} From d24c36a4928340efe298f6850e9d46cebb553cfa Mon Sep 17 00:00:00 2001 From: voidash Date: Thu, 26 Dec 2024 17:26:31 +0545 Subject: [PATCH 2/9] func-test: add bridge deposit when sequencer is unreliable --- .../src/modes/operator/bootstrap.rs | 22 ++- .../src/modes/operator/task_manager.rs | 70 ++++---- crates/bridge-exec/src/handler.rs | 19 ++- crates/bridge-exec/src/ws_client.rs | 76 +++++---- .../fn_bridge_deposit_seq_unreliable.py | 149 ++++++++++++++++++ 5 files changed, 252 insertions(+), 84 deletions(-) create mode 100644 functional-tests/fn_bridge_deposit_seq_unreliable.py diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index 65861370b..cb6b0abf0 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -7,7 +7,10 @@ use bitcoin::{ secp256k1::{PublicKey, SecretKey, XOnlyPublicKey, SECP256K1}, }; use deadpool::managed::{self, Pool}; -use strata_bridge_exec::{handler::ExecHandler, ws_client::{WsClientConfig, WsClientManager}}; +use strata_bridge_exec::{ + handler::ExecHandler, + ws_client::{WsClientConfig, WsClientManager}, +}; use strata_bridge_sig_manager::prelude::SignatureManager; use strata_bridge_tx_builder::prelude::TxBuildContext; use strata_btcio::rpc::{traits::Reader, BitcoinClient}; @@ -65,11 +68,20 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { .expect("error creating the bitcoin client"), ); - let config = WsClientConfig { url: args.rollup_url.clone() }; + let config = WsClientConfig { + url: args.rollup_url.clone(), + }; let manager = WsClientManager { config }; - let l2_rpc_client_pool: Pool = managed::Pool::::builder(manager).max_size(5).build().unwrap(); - - let l2_rpc_client = l2_rpc_client_pool.get().await.expect("cannot get rpc client"); + let l2_rpc_client_pool: Pool = + managed::Pool::::builder(manager) + .max_size(5) + .build() + .unwrap(); + + let l2_rpc_client = l2_rpc_client_pool + .get() + .await + .expect("cannot get rpc client"); // Get the keypair after deriving the wallet xpriv. let operator_keys = resolve_xpriv(args.master_xpriv, args.master_xpriv_path)?; diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index 20f145429..33a5e7d37 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -44,45 +44,46 @@ where ) -> anyhow::Result<()> { info!(?duty_polling_interval, "Starting to poll for duties"); loop { - let RpcBridgeDuties { + if let Ok(RpcBridgeDuties { duties, start_index, stop_index, - } = self.poll_duties().await?; - - info!(num_duties = duties.len(), "got duties"); + }) = self.poll_duties().await + { + info!(num_duties = duties.len(), "got duties"); - let mut handles = JoinSet::new(); - for duty in duties { - let exec_handler = self.exec_handler.clone(); - let bridge_duty_ops = self.bridge_duty_db_ops.clone(); - let broadcaster = self.broadcaster.clone(); - handles.spawn(async move { - process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await - }); - } + let mut handles = JoinSet::new(); + for duty in duties { + let exec_handler = self.exec_handler.clone(); + let bridge_duty_ops = self.bridge_duty_db_ops.clone(); + let broadcaster = self.broadcaster.clone(); + handles.spawn(async move { + process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await + }); + } - // TODO: There should be timeout duration based on duty and not a common timeout - // duration - if let Ok(any_failed) = timeout(duty_timeout_duration, handles.join_all()).await { - // if none of the duties failed, update the duty index so that the - // next batch is fetched in the next poll. - // - // otherwise, don't update the index so that the current batch is refetched and - // ones that were not executed successfully are executed again. - if !any_failed.iter().any(|res| res.is_err()) { - info!(%start_index, %stop_index, "updating duty index"); - if let Err(e) = self - .bridge_duty_idx_db_ops - .set_index_async(stop_index) - .await - { - error!(error = %e, %start_index, %stop_index, "could not update duty index"); + // TODO: There should be timeout duration based on duty and not a common timeout + // duration + if let Ok(any_failed) = timeout(duty_timeout_duration, handles.join_all()).await { + // if none of the duties failed, update the duty index so that the + // next batch is fetched in the next poll. + // + // otherwise, don't update the index so that the current batch is refetched and + // ones that were not executed successfully are executed again. + if !any_failed.iter().any(|res| res.is_err()) { + info!(%start_index, %stop_index, "updating duty index"); + if let Err(e) = self + .bridge_duty_idx_db_ops + .set_index_async(stop_index) + .await + { + error!(error = %e, %start_index, %stop_index, "could not update duty index"); + } } } - } - sleep(duty_polling_interval).await; + sleep(duty_polling_interval).await; + } } } @@ -95,7 +96,12 @@ where .unwrap_or(Some(0)) .unwrap_or(0); - let l2_rpc_client = self.exec_handler.l2_rpc_client_pool.get().await.expect("cannot get rpc client"); + let l2_rpc_client = self + .exec_handler + .l2_rpc_client_pool + .get() + .await + .expect("cannot get rpc client"); let RpcBridgeDuties { duties, start_index, diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index 2c5524402..c23228222 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -20,13 +20,14 @@ use strata_rpc_api::StrataApiClient; use strata_rpc_types::HexBytes; use tracing::{debug, info, warn}; -use crate::{errors::{ExecError, ExecResult}, ws_client::WsClientManager}; +use crate::{ + errors::{ExecError, ExecResult}, + ws_client::WsClientManager, +}; /// The execution context for handling bridge-related signing activities. #[derive(Clone)] -pub struct ExecHandler< - TxBuildContext: BuildContext + Sync + Send, -> { +pub struct ExecHandler { /// The build context required to create transaction data needed for signing. pub tx_build_ctx: TxBuildContext, @@ -121,10 +122,12 @@ where let raw_message = borsh::to_vec::(&signed_message) .expect("should be able to borsh serialize raw message"); - let l2_rpc_client = self.l2_rpc_client_pool.get().await.expect("cannot get client"); - l2_rpc_client - .submit_bridge_msg(raw_message.into()) - .await?; + let l2_rpc_client = self + .l2_rpc_client_pool + .get() + .await + .expect("cannot get client"); + l2_rpc_client.submit_bridge_msg(raw_message.into()).await?; info!(%txid, ?scope, ?payload, "broadcasted message"); Ok(signed_message) diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs index c84becf8d..aafbac46f 100644 --- a/crates/bridge-exec/src/ws_client.rs +++ b/crates/bridge-exec/src/ws_client.rs @@ -1,23 +1,31 @@ use core::fmt; use deadpool::managed::{self, Manager, RecycleError, RecycleResult}; -use jsonrpsee::{core::{async_trait, client::{BatchResponse, ClientT}, params::BatchRequestBuilder, traits::ToRpcParams, BoxError, DeserializeOwned}, ws_client::{WsClient as WebsocketClient, WsClientBuilder}}; -use jsonrpsee::core::ClientError; +use jsonrpsee::{ + core::{ + async_trait, + client::{BatchResponse, ClientT}, + params::BatchRequestBuilder, + traits::ToRpcParams, + BoxError, ClientError, DeserializeOwned, + }, + ws_client::{WsClient as WebsocketClient, WsClientBuilder}, +}; #[derive(Clone, Debug)] pub struct WsClientConfig { - pub url: String + pub url: String, } #[derive(Clone, Debug)] pub struct WsClientManager { - pub config: WsClientConfig + pub config: WsClientConfig, } #[derive(Debug)] pub enum WsClientState { Working(WebsocketClient), - NotWorking + NotWorking, } #[derive(Debug)] @@ -49,12 +57,16 @@ impl Manager for WsClientManager { if cl.is_connected() { Ok(()) } else { - Err(RecycleError::Message("Connection lost, recreate client".to_string().into())) + Err(RecycleError::Message( + "Connection lost, recreate client".to_string().into(), + )) } - }, - WsClientState::NotWorking => { - Err(RecycleError::Message("Connection still not found, recreate client".to_string().into())) - }, + } + WsClientState::NotWorking => Err(RecycleError::Message( + "Connection still not found, recreate client" + .to_string() + .into(), + )), } } } @@ -64,43 +76,31 @@ impl ClientT for WsClient { /// Send a [notification request](https://www.jsonrpc.org/specification#notification). /// /// Notifications do not produce a response on the JSON-RPC server. - async fn notification( - &self, - method: &str, - params: Params, - ) -> Result<(), ClientError> + async fn notification(&self, method: &str, params: Params) -> Result<(), ClientError> where - Params: ToRpcParams + Send , + Params: ToRpcParams + Send, { match &self.0 { - WsClientState::Working(inner) => { - inner.notification(method, params).await - } - WsClientState::NotWorking => Err(ClientError::Transport( - BoxError::from("Client is NotWorking".to_string()), - )), + WsClientState::Working(inner) => inner.notification(method, params).await, + WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( + "Client is Not Working".to_string(), + ))), } } /// Send a [method call request](https://www.jsonrpc.org/specification#request_object). /// /// Returns `Ok` if the server responds successfully, otherwise a `ClientError`. - async fn request( - &self, - method: &str, - params: Params, - ) -> Result + async fn request(&self, method: &str, params: Params) -> Result where R: DeserializeOwned, Params: ToRpcParams + Send, { match &self.0 { - WsClientState::Working(inner) => { - inner.request(method, params).await - } - WsClientState::NotWorking => Err(ClientError::Transport( - BoxError::from("Client is NotWorking".to_string()), - )), + WsClientState::Working(inner) => inner.request(method, params).await, + WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( + "Client is Not Working".to_string(), + ))), } } @@ -117,12 +117,10 @@ impl ClientT for WsClient { R: DeserializeOwned + fmt::Debug + 'a, { match &self.0 { - WsClientState::Working(inner) => { - inner.batch_request(batch).await - } - WsClientState::NotWorking => Err(ClientError::Transport( - BoxError::from("Client is NotWorking".to_string()), - )), + WsClientState::Working(inner) => inner.batch_request(batch).await, + WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( + "Client is NotWorking".to_string(), + ))), } } } diff --git a/functional-tests/fn_bridge_deposit_seq_unreliable.py b/functional-tests/fn_bridge_deposit_seq_unreliable.py new file mode 100644 index 000000000..e566f43a9 --- /dev/null +++ b/functional-tests/fn_bridge_deposit_seq_unreliable.py @@ -0,0 +1,149 @@ +import time + +import flexitest +from bitcoinlib.services.bitcoind import BitcoindClient +from strata_utils import ( + deposit_request_transaction, + get_balance, +) +from web3 import Web3 +from web3.middleware import SignAndSendRawMiddlewareBuilder + +from constants import ( + DEFAULT_ROLLUP_PARAMS, + SATS_TO_WEI, +) +import testenv +from utils import get_bridge_pubkey, wait_until, wait_until_with_value + +# Local constants +# D BTC +DEPOSIT_AMOUNT = DEFAULT_ROLLUP_PARAMS["deposit_amount"] +# Gas for the withdrawal transaction +WITHDRAWAL_GAS_FEE = 22_000 # technically is 21_000 +# Ethereum Private Key +# NOTE: don't use this private key in production +ETH_PRIVATE_KEY = "0x0000000000000000000000000000000000000000000000000000000000000001" +# BTC Operator's fee for withdrawal +OPERATOR_FEE = DEFAULT_ROLLUP_PARAMS["operator_fee"] + + +@flexitest.register +class BridgeDepositSequencerUnreliableTest(testenv.StrataTester): + """ + TODO: Depends on STR-734 operator reassignment, and this can be merged only that is merged + + Makes two DRT deposits to the same EL address, then makes a withdrawal to a change address. + + Checks if the balance of the EL address is expected + and if the BTC balance of the change address is expected. + """ + + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env("basic") + + def main(self, ctx: flexitest.RunContext): + address = ctx.env.gen_ext_btc_address() + withdraw_address = ctx.env.gen_ext_btc_address() + self.debug(f"Address: {address}") + self.debug(f"Change Address: {withdraw_address}") + + btc = ctx.get_service("bitcoin") + seq = ctx.get_service("sequencer") + reth = ctx.get_service("reth") + + seqrpc = seq.create_rpc() + btcrpc: BitcoindClient = btc.create_rpc() + rethrpc = reth.create_rpc() + + seq_addr = seq.get_prop("address") + self.debug(f"Sequencer Address: {seq_addr}") + + btc_url = btcrpc.base_url + btc_user = btc.props["rpc_user"] + btc_password = btc.props["rpc_password"] + + self.debug(f"BTC URL: {btc_url}") + self.debug(f"BTC user: {btc_user}") + self.debug(f"BTC password: {btc_password}") + + # Get the original balance of the withdraw address + original_balance = get_balance(withdraw_address, btc_url, btc_user, btc_password) + self.debug(f"BTC balance before withdraw: {original_balance}") + + web3: Web3 = reth.create_web3() + # Create an Ethereum account from the private key + eth_account = web3.eth.account.from_key(ETH_PRIVATE_KEY) + el_address = eth_account.address + self.debug(f"EL address: {el_address}") + + # Add the Ethereum account as auto-signer + # Transactions from `el_address` will then be signed, under the hood, in the middleware + web3.middleware_onion.inject(SignAndSendRawMiddlewareBuilder.build(eth_account), layer=0) + + # Get the balance of the EL address before the deposits + balance = int(rethrpc.eth_getBalance(el_address), 16) + self.debug(f"Strata Balance before deposits: {balance}") + assert balance == 0, "Strata balance is not expected" + + # Get operators pubkey and musig2 aggregates it + bridge_pk = get_bridge_pubkey(seqrpc) + self.debug(f"Bridge pubkey: {bridge_pk}") + self.debug(f"Stopping the sequencer") + + self.make_drt(ctx, el_address, bridge_pk) + time.sleep(2) + + # stop sequencer + seq.stop() + time.sleep(1) + + self.make_drt(ctx, el_address, bridge_pk) + + # start again + seq.start() + + wait_until( + lambda: seqrpc.strata_protocolVersion() is not None, + error_with="Sequencer did not start on time", + timeout=10 + ) + + balance_after_deposits = wait_until_with_value( + lambda: int(rethrpc.eth_getBalance(el_address), 16), + predicate=lambda v: v == 2 * DEPOSIT_AMOUNT * SATS_TO_WEI, + timeout=15 + ) + self.debug(f"Strata Balance after deposits: {balance_after_deposits}") + + return True + + def make_drt(self, ctx: flexitest.RunContext, el_address, musig_bridge_pk): + """ + Deposit Request Transaction + """ + # Get relevant data + btc = ctx.get_service("bitcoin") + seq = ctx.get_service("sequencer") + btcrpc: BitcoindClient = btc.create_rpc() + btc_url = btcrpc.base_url + btc_user = btc.props["rpc_user"] + btc_password = btc.props["rpc_password"] + seq_addr = seq.get_prop("address") + + # Create the deposit request transaction + tx = bytes( + deposit_request_transaction( + el_address, musig_bridge_pk, btc_url, btc_user, btc_password + ) + ).hex() + self.debug(f"Deposit request tx: {tx}") + + # Send the transaction to the Bitcoin network + txid = btcrpc.proxy.sendrawtransaction(tx) + self.debug(f"sent deposit request with txid = {txid} for address {el_address}") + # time to mature DRT + btcrpc.proxy.generatetoaddress(6, seq_addr) + + # time to mature DT + btcrpc.proxy.generatetoaddress(6, seq_addr) From cd8825904613f3b52b39f2a64f9673f28f167efc Mon Sep 17 00:00:00 2001 From: voidash Date: Thu, 26 Dec 2024 17:54:02 +0545 Subject: [PATCH 3/9] bridge-exec: add comments for ws-client and formatting --- Cargo.lock | 1 - Cargo.toml | 2 +- bin/bridge-client/Cargo.toml | 2 +- crates/bridge-exec/Cargo.toml | 19 ++++--- crates/bridge-exec/src/ws_client.rs | 50 ++++++++++++++++--- .../fn_bridge_deposit_seq_unreliable.py | 8 +-- 6 files changed, 58 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ea574114..f120ececb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12810,7 +12810,6 @@ dependencies = [ name = "strata-bridge-exec" version = "0.1.0" dependencies = [ - "async-trait", "bitcoin", "borsh", "deadpool", diff --git a/Cargo.toml b/Cargo.toml index 627ca91fb..03d4bb3c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,6 +195,7 @@ bytes = "1.6.0" cfg-if = "1.0.0" chrono = "0.4.38" clap = "4" +deadpool = "0.12.1" digest = "0.10" ethnum = "1.5.0" eyre = "0.6" @@ -247,7 +248,6 @@ tracing-opentelemetry = "0.27" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } uuid = { version = "1.0", features = ["v4", "serde"] } zeroize = { version = "1.8.1", features = ["derive"] } -deadpool = "0.12.1" # This is needed for custom build of SP1 [profile.release.build-override] opt-level = 3 diff --git a/bin/bridge-client/Cargo.toml b/bin/bridge-client/Cargo.toml index 52213079e..f02992590 100644 --- a/bin/bridge-client/Cargo.toml +++ b/bin/bridge-client/Cargo.toml @@ -32,6 +32,7 @@ argh.workspace = true async-trait.workspace = true bitcoin.workspace = true chrono.workspace = true +deadpool.workspace = true directories = "5.0.1" jsonrpsee.workspace = true rockbound.workspace = true @@ -40,4 +41,3 @@ threadpool.workspace = true tokio.workspace = true tracing.workspace = true zeroize.workspace = true -deadpool.workspace = true diff --git a/crates/bridge-exec/Cargo.toml b/crates/bridge-exec/Cargo.toml index 02038f814..04a7f6335 100644 --- a/crates/bridge-exec/Cargo.toml +++ b/crates/bridge-exec/Cargo.toml @@ -3,14 +3,14 @@ edition = "2021" name = "strata-bridge-exec" version = "0.1.0" -# [lints] -# rust.missing_debug_implementations = "warn" -# rust.missing_docs = "warn" -# rust.rust_2018_idioms = { level = "deny", priority = -1 } -# rust.unreachable_pub = "warn" -# rust.unused_crate_dependencies = "deny" -# rust.unused_must_use = "deny" -# rustdoc.all = "warn" +[lints] +rust.missing_debug_implementations = "warn" +rust.missing_docs = "warn" +rust.rust_2018_idioms = { level = "deny", priority = -1 } +rust.unreachable_pub = "warn" +rust.unused_crate_dependencies = "deny" +rust.unused_must_use = "deny" +rustdoc.all = "warn" [dependencies] strata-bridge-sig-manager.workspace = true @@ -22,11 +22,10 @@ strata-rpc-types.workspace = true bitcoin = { workspace = true, features = ["rand-std"] } borsh.workspace = true +deadpool.workspace = true format_serde_error.workspace = true jsonrpsee.workspace = true serde.workspace = true thiserror.workspace = true toml.workspace = true tracing.workspace = true -deadpool.workspace = true -async-trait.workspace = true diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs index aafbac46f..caee7ef97 100644 --- a/crates/bridge-exec/src/ws_client.rs +++ b/crates/bridge-exec/src/ws_client.rs @@ -1,3 +1,6 @@ +//! Wrapper for managing a WebSocket client that allows recycling or restarting +//! the client without restarting the entire program. + use core::fmt; use deadpool::managed::{self, Manager, RecycleError, RecycleResult}; @@ -12,30 +15,58 @@ use jsonrpsee::{ ws_client::{WsClient as WebsocketClient, WsClientBuilder}, }; +/// Configuration for the WebSocket client. +/// +/// settings that are necessary to initialize and configure +/// the WebSocket client, ie. URL of the WebSocket server. #[derive(Clone, Debug)] pub struct WsClientConfig { + /// The URL of the WebSocket server. pub url: String, } +/// Manager for creating and recycling WebSocket clients. +/// +/// This struct manages the lifecycle of WebSocket clients, including creating +/// new clients and determining whether existing clients are still valid. #[derive(Clone, Debug)] pub struct WsClientManager { + /// The configuration used to initialize WebSocket clients. pub config: WsClientConfig, } +/// Represents the state of a WebSocket client. +/// +/// - `Working`: The client is connected and operational. +/// - `NotWorking`: The client is not connected or has encountered an error. #[derive(Debug)] pub enum WsClientState { + /// The WebSocket client is connected and operational. Working(WebsocketClient), + /// The WebSocket client is not connected or is in a failed state. NotWorking, } +/// Wrapper for the WebSocket client state. +/// +/// This struct encapsulates the `WsClientState`, enabling unified management of +/// both connected and failed client states. #[derive(Debug)] pub struct WsClient(WsClientState); +/// Implements the `Manager` trait for managing WebSocket clients. +/// +/// The `WsClientManager` provides methods to create new clients and recycle +/// existing ones, ensuring that clients remain in a valid state. impl Manager for WsClientManager { type Type = WsClient; - type Error = jsonrpsee::core::StringError; + /// Creates a new WebSocket client. + /// + /// Attempts to initialize a new WebSocket client using the provided configuration. + /// Returns a `WsClient` wrapped in a `Working` or `NotWorking` state depending on + /// the success of the operation. async fn create(&self) -> Result { let client = WsClientBuilder::default() .build(self.config.url.clone()) @@ -47,6 +78,11 @@ impl Manager for WsClientManager { Ok(WsClient(bl)) } + /// Recycles an existing WebSocket client. + /// + /// Checks whether the provided client is still valid. If the client is connected, + /// it is retained. Otherwise, an error is returned, indicating the need to recreate + /// the client. async fn recycle( &self, obj: &mut Self::Type, @@ -71,6 +107,10 @@ impl Manager for WsClientManager { } } +/// Implements the `ClientT` trait for `WsClient`. +/// +/// This implementation allows `WsClient` to perform JSON-RPC operations, +/// including notifications, method calls, and batch requests. #[async_trait] impl ClientT for WsClient { /// Send a [notification request](https://www.jsonrpc.org/specification#notification). @@ -104,11 +144,7 @@ impl ClientT for WsClient { } } - /// Send a [batch request](https://www.jsonrpc.org/specification#batch). - /// - /// The responses to the batch are returned in the same order as the requests were inserted. - /// - /// Returns `Ok` if all requests in the batch were answered, otherwise `Err(ClientError)`. + /// Sends a batch request. async fn batch_request<'a, R>( &self, batch: BatchRequestBuilder<'a>, @@ -119,7 +155,7 @@ impl ClientT for WsClient { match &self.0 { WsClientState::Working(inner) => inner.batch_request(batch).await, WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( - "Client is NotWorking".to_string(), + "Client is Not Working".to_string(), ))), } } diff --git a/functional-tests/fn_bridge_deposit_seq_unreliable.py b/functional-tests/fn_bridge_deposit_seq_unreliable.py index e566f43a9..3706dc175 100644 --- a/functional-tests/fn_bridge_deposit_seq_unreliable.py +++ b/functional-tests/fn_bridge_deposit_seq_unreliable.py @@ -9,11 +9,11 @@ from web3 import Web3 from web3.middleware import SignAndSendRawMiddlewareBuilder +import testenv from constants import ( DEFAULT_ROLLUP_PARAMS, SATS_TO_WEI, ) -import testenv from utils import get_bridge_pubkey, wait_until, wait_until_with_value # Local constants @@ -89,7 +89,7 @@ def main(self, ctx: flexitest.RunContext): # Get operators pubkey and musig2 aggregates it bridge_pk = get_bridge_pubkey(seqrpc) self.debug(f"Bridge pubkey: {bridge_pk}") - self.debug(f"Stopping the sequencer") + self.debug("Stopping the sequencer") self.make_drt(ctx, el_address, bridge_pk) time.sleep(2) @@ -106,13 +106,13 @@ def main(self, ctx: flexitest.RunContext): wait_until( lambda: seqrpc.strata_protocolVersion() is not None, error_with="Sequencer did not start on time", - timeout=10 + timeout=10, ) balance_after_deposits = wait_until_with_value( lambda: int(rethrpc.eth_getBalance(el_address), 16), predicate=lambda v: v == 2 * DEPOSIT_AMOUNT * SATS_TO_WEI, - timeout=15 + timeout=15, ) self.debug(f"Strata Balance after deposits: {balance_after_deposits}") From 1fa21ec01665368c9886e210e9447111251ebe35 Mon Sep 17 00:00:00 2001 From: voidash Date: Thu, 26 Dec 2024 18:24:41 +0545 Subject: [PATCH 4/9] bridge-exec: custom wsclient Error --- bin/bridge-client/src/modes/operator/bootstrap.rs | 2 +- crates/bridge-exec/src/errors.rs | 6 ++++++ crates/bridge-exec/src/handler.rs | 11 +++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index cb6b0abf0..f2f2bfa01 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -81,7 +81,7 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { let l2_rpc_client = l2_rpc_client_pool .get() .await - .expect("cannot get rpc client"); + .expect("cannot get rpc client from pool"); // Get the keypair after deriving the wallet xpriv. let operator_keys = resolve_xpriv(args.master_xpriv, args.master_xpriv_path)?; diff --git a/crates/bridge-exec/src/errors.rs b/crates/bridge-exec/src/errors.rs index d97d81c50..9262b69f4 100644 --- a/crates/bridge-exec/src/errors.rs +++ b/crates/bridge-exec/src/errors.rs @@ -1,5 +1,7 @@ //! Defines the error types associated with executing the transaction duties. +use std::fmt; +use deadpool::managed::{Manager::Error, PoolError}; use jsonrpsee::core::ClientError as L2ClientError; use strata_bridge_tx_builder::errors::BridgeTxBuilderError; use strata_btcio::rpc::error::ClientError as L1ClientError; @@ -47,6 +49,10 @@ pub enum ExecError { /// Signer does not have access to the [`Xpriv`](bitcoin::bip32::Xpriv) #[error("bitcoin signer do not have access to the private keys, i.e. xpriv")] Xpriv, + + /// Error getting the web socket client from pool + #[error("fetching websocket client from pool failed: {0}")] + WsPool(#[from] PoolError), } /// Result of a execution that may produce an [`ExecError`]. diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index c23228222..5a8dc04fe 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -126,7 +126,8 @@ where .l2_rpc_client_pool .get() .await - .expect("cannot get client"); + .map_err(|err| ExecError::WsPool(err))?; + l2_rpc_client.submit_bridge_msg(raw_message.into()).await?; info!(%txid, ?scope, ?payload, "broadcasted message"); @@ -289,9 +290,11 @@ where Payload: BorshDeserialize + Debug, { let raw_scope: HexBytes = scope.into(); - info!(scope=?scope, "getting messages from the L2 Client"); - // TODO ASH convert this to concrete ExecError - let l2_rpc_client = self.l2_rpc_client_pool.get().await.expect("no rpc client"); + info!(scope = ?scope, "getting messages from the L2 Client"); + + let l2_rpc_client = self.l2_rpc_client_pool.get() + .await + .map_err(|err| ExecError::WsPool(err))?; let received_payloads = l2_rpc_client .get_msgs_by_scope(raw_scope) From 6b42991e1bb79c5bde41998fb44a58c4f9dd2027 Mon Sep 17 00:00:00 2001 From: voidash Date: Thu, 2 Jan 2025 15:37:10 +0545 Subject: [PATCH 5/9] bridge-client: exponential backoff added for rpc error --- bin/bridge-client/src/args.rs | 7 ++ bin/bridge-client/src/constants.rs | 3 + bin/bridge-client/src/errors.rs | 9 ++ .../src/modes/operator/bootstrap.rs | 11 +- .../src/modes/operator/task_manager.rs | 103 +++++++++++------- crates/bridge-exec/src/errors.rs | 6 +- crates/bridge-exec/src/handler.rs | 8 +- 7 files changed, 101 insertions(+), 46 deletions(-) diff --git a/bin/bridge-client/src/args.rs b/bin/bridge-client/src/args.rs index d5875554d..1ba89298a 100644 --- a/bin/bridge-client/src/args.rs +++ b/bin/bridge-client/src/args.rs @@ -99,6 +99,13 @@ pub(crate) struct Cli { description = "duty timeout duration in seconds (default: 600)" )] pub duty_timeout_duration: Option, + + /// Max retries for when rpc server fails during duty polling + #[argh( + option, + description = "max retries for when rpc server fails during duty polling" + )] + pub max_rpc_retry_count: Option, } /// Represents the operational mode of the client. diff --git a/bin/bridge-client/src/constants.rs b/bin/bridge-client/src/constants.rs index 37e42b6f1..54f9e2a5b 100644 --- a/bin/bridge-client/src/constants.rs +++ b/bin/bridge-client/src/constants.rs @@ -7,3 +7,6 @@ pub(super) const DEFAULT_RPC_HOST: &str = "127.0.0.1"; pub(super) const DEFAULT_DUTY_TIMEOUT_SEC: u64 = 600; /// The default bridge rocksdb database retry count, if not overridden by the user. pub(super) const ROCKSDB_RETRY_COUNT: u16 = 3; + +/// The default Rpc retry count, if not overridden by the user. +pub(super) const MAX_RPC_RETRY_COUNT: u16 = 5; diff --git a/bin/bridge-client/src/errors.rs b/bin/bridge-client/src/errors.rs index f9b6aefd1..82fbea76b 100644 --- a/bin/bridge-client/src/errors.rs +++ b/bin/bridge-client/src/errors.rs @@ -5,3 +5,12 @@ pub enum InitError { #[error("Invalid operation mode, expected: operator(op) or challenger(ch), got: {0}")] InvalidMode(String), } + +#[derive(Debug, Clone, Error)] +pub enum PollDutyError { + #[error("Rpc client: {0}")] + RpcError(String), + + #[error("fetching websocket client from pool failed")] + WsPool, +} diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index f2f2bfa01..e737dda29 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -31,7 +31,8 @@ use super::{constants::DB_THREAD_COUNT, task_manager::TaskManager}; use crate::{ args::Cli, constants::{ - DEFAULT_DUTY_TIMEOUT_SEC, DEFAULT_RPC_HOST, DEFAULT_RPC_PORT, ROCKSDB_RETRY_COUNT, + DEFAULT_DUTY_TIMEOUT_SEC, DEFAULT_RPC_HOST, DEFAULT_RPC_PORT, MAX_RPC_RETRY_COUNT, + ROCKSDB_RETRY_COUNT, }, db::open_rocksdb_database, rpc_server::{self, BridgeRpc}, @@ -169,10 +170,16 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { .unwrap_or(DEFAULT_DUTY_TIMEOUT_SEC), ); + let max_retry_count = args.max_rpc_retry_count.unwrap_or(MAX_RPC_RETRY_COUNT); + // TODO: wrap these in `strata-tasks` let duty_task = tokio::spawn(async move { if let Err(e) = task_manager - .start(duty_polling_interval, duty_timeout_duration) + .start( + duty_polling_interval, + duty_timeout_duration, + max_retry_count, + ) .await { error!(error = %e, "could not start task manager"); diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index 33a5e7d37..c0a4c69af 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; +use anyhow::bail; use bitcoin::Txid; use strata_bridge_exec::{ errors::{ExecError, ExecResult}, @@ -21,6 +22,8 @@ use tokio::{ }; use tracing::{error, info, trace, warn}; +use crate::errors::PollDutyError; + pub(super) struct TaskManager where TxBuildContext: BuildContext + Sync + Send, @@ -41,54 +44,78 @@ where &self, duty_polling_interval: Duration, duty_timeout_duration: Duration, + max_retries: u16, ) -> anyhow::Result<()> { info!(?duty_polling_interval, "Starting to poll for duties"); + let mut retries = 0; loop { - if let Ok(RpcBridgeDuties { - duties, - start_index, - stop_index, - }) = self.poll_duties().await - { - info!(num_duties = duties.len(), "got duties"); - - let mut handles = JoinSet::new(); - for duty in duties { - let exec_handler = self.exec_handler.clone(); - let bridge_duty_ops = self.bridge_duty_db_ops.clone(); - let broadcaster = self.broadcaster.clone(); - handles.spawn(async move { - process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await - }); - } + match self.poll_duties().await { + Ok(RpcBridgeDuties { + duties, + start_index, + stop_index, + }) => { + info!(num_duties = duties.len(), "got duties"); - // TODO: There should be timeout duration based on duty and not a common timeout - // duration - if let Ok(any_failed) = timeout(duty_timeout_duration, handles.join_all()).await { - // if none of the duties failed, update the duty index so that the - // next batch is fetched in the next poll. - // - // otherwise, don't update the index so that the current batch is refetched and - // ones that were not executed successfully are executed again. - if !any_failed.iter().any(|res| res.is_err()) { - info!(%start_index, %stop_index, "updating duty index"); - if let Err(e) = self - .bridge_duty_idx_db_ops - .set_index_async(stop_index) - .await - { - error!(error = %e, %start_index, %stop_index, "could not update duty index"); + let mut handles = JoinSet::new(); + for duty in duties { + let exec_handler = self.exec_handler.clone(); + let bridge_duty_ops = self.bridge_duty_db_ops.clone(); + let broadcaster = self.broadcaster.clone(); + handles.spawn(async move { + process_duty(exec_handler, bridge_duty_ops, broadcaster, &duty).await + }); + } + + // TODO: There should be timeout duration based on duty and not a common timeout + // duration + if let Ok(any_failed) = timeout(duty_timeout_duration, handles.join_all()).await + { + // if none of the duties failed, update the duty index so that the + // next batch is fetched in the next poll. + // + // otherwise, don't update the index so that the current batch is refetched + // and ones that were not executed successfully are + // executed again. + if !any_failed.iter().any(|res| res.is_err()) { + info!(%start_index, %stop_index, "updating duty index"); + if let Err(e) = self + .bridge_duty_idx_db_ops + .set_index_async(stop_index) + .await + { + error!(error = %e, %start_index, %stop_index, "could not update duty index"); + } } } } + Err(err) => { + match err { + PollDutyError::RpcError(err) => { + error!(%err, "could not get rpc response"); + retries += 1; + if retries >= max_retries { + error!(%err, "Exceeded maximum retries to acquire client. Failing gracefully"); + + bail!("Exceeded maximum retries to acquire client") + } - sleep(duty_polling_interval).await; + // Exponential backoff + let delay = Duration::from_secs(1) * 2u32.pow(retries as u32 - 1); + sleep(delay).await; + } + _ => { + bail!(err.to_string()); + } + } + } } + sleep(duty_polling_interval).await; } } /// Polls for [`BridgeDuty`]s. - pub(crate) async fn poll_duties(&self) -> anyhow::Result { + pub(crate) async fn poll_duties(&self) -> Result { let start_index = self .bridge_duty_idx_db_ops .get_index_async() @@ -101,14 +128,16 @@ where .l2_rpc_client_pool .get() .await - .expect("cannot get rpc client"); + .map_err(|_| PollDutyError::WsPool)?; + let RpcBridgeDuties { duties, start_index, stop_index, } = l2_rpc_client .get_bridge_duties(self.exec_handler.own_index, start_index) - .await?; + .await + .map_err(|err| PollDutyError::RpcError(err.to_string()))?; // check which duties this operator should do something let mut todo_duties: Vec = Vec::with_capacity(duties.len()); diff --git a/crates/bridge-exec/src/errors.rs b/crates/bridge-exec/src/errors.rs index 9262b69f4..657282d13 100644 --- a/crates/bridge-exec/src/errors.rs +++ b/crates/bridge-exec/src/errors.rs @@ -1,7 +1,5 @@ //! Defines the error types associated with executing the transaction duties. -use std::fmt; -use deadpool::managed::{Manager::Error, PoolError}; use jsonrpsee::core::ClientError as L2ClientError; use strata_bridge_tx_builder::errors::BridgeTxBuilderError; use strata_btcio::rpc::error::ClientError as L1ClientError; @@ -51,8 +49,8 @@ pub enum ExecError { Xpriv, /// Error getting the web socket client from pool - #[error("fetching websocket client from pool failed: {0}")] - WsPool(#[from] PoolError), + #[error("fetching websocket client from pool failed")] + WsPool, } /// Result of a execution that may produce an [`ExecError`]. diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index 5a8dc04fe..63d0178cb 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -126,7 +126,7 @@ where .l2_rpc_client_pool .get() .await - .map_err(|err| ExecError::WsPool(err))?; + .map_err(|_| ExecError::WsPool)?; l2_rpc_client.submit_bridge_msg(raw_message.into()).await?; @@ -292,9 +292,11 @@ where let raw_scope: HexBytes = scope.into(); info!(scope = ?scope, "getting messages from the L2 Client"); - let l2_rpc_client = self.l2_rpc_client_pool.get() + let l2_rpc_client = self + .l2_rpc_client_pool + .get() .await - .map_err(|err| ExecError::WsPool(err))?; + .map_err(|_| ExecError::WsPool)?; let received_payloads = l2_rpc_client .get_msgs_by_scope(raw_scope) From 79f6e6c5532e891a14e66d363319ab0e90b0064c Mon Sep 17 00:00:00 2001 From: voidash Date: Thu, 2 Jan 2025 16:36:10 +0545 Subject: [PATCH 6/9] bridge-client: resolve comments --- .../src/modes/operator/bootstrap.rs | 11 ++++--- crates/bridge-exec/src/handler.rs | 29 ++++++++++--------- crates/bridge-exec/src/ws_client.rs | 17 +++++------ 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index e737dda29..25a4d57a2 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -6,7 +6,7 @@ use bitcoin::{ key::{Keypair, Parity}, secp256k1::{PublicKey, SecretKey, XOnlyPublicKey, SECP256K1}, }; -use deadpool::managed::{self, Pool}; +use deadpool::managed; use strata_bridge_exec::{ handler::ExecHandler, ws_client::{WsClientConfig, WsClientManager}, @@ -73,11 +73,10 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { url: args.rollup_url.clone(), }; let manager = WsClientManager { config }; - let l2_rpc_client_pool: Pool = - managed::Pool::::builder(manager) - .max_size(5) - .build() - .unwrap(); + let l2_rpc_client_pool = managed::Pool::::builder(manager) + .max_size(5) + .build() + .unwrap(); let l2_rpc_client = l2_rpc_client_pool .get() diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index 63d0178cb..62e598929 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -4,7 +4,7 @@ use std::{fmt::Debug, time::Duration}; use bitcoin::{key::Keypair, Transaction, Txid}; use borsh::{BorshDeserialize, BorshSerialize}; -use deadpool::managed::Pool; +use deadpool::managed::{Object, Pool}; use jsonrpsee::tokio::time::sleep; use strata_bridge_sig_manager::manager::SignatureManager; use strata_bridge_tx_builder::{context::BuildContext, TxKind}; @@ -25,6 +25,9 @@ use crate::{ ws_client::WsClientManager, }; +/// Websocket client pool +pub type WsClientPool = Pool; + /// The execution context for handling bridge-related signing activities. #[derive(Clone)] pub struct ExecHandler { @@ -35,7 +38,7 @@ pub struct ExecHandler { pub sig_manager: SignatureManager, /// The RPC client to connect to the RPC full node. - pub l2_rpc_client_pool: Pool, + pub l2_rpc_client_pool: WsClientPool, /// The keypair for this client used to sign bridge-related messages. pub keypair: Keypair, @@ -122,11 +125,7 @@ where let raw_message = borsh::to_vec::(&signed_message) .expect("should be able to borsh serialize raw message"); - let l2_rpc_client = self - .l2_rpc_client_pool - .get() - .await - .map_err(|_| ExecError::WsPool)?; + let l2_rpc_client = self.get_ready_rpc_client().await?; l2_rpc_client.submit_bridge_msg(raw_message.into()).await?; @@ -290,13 +289,9 @@ where Payload: BorshDeserialize + Debug, { let raw_scope: HexBytes = scope.into(); - info!(scope = ?scope, "getting messages from the L2 Client"); + info!(?scope, "getting messages from the L2 Client"); - let l2_rpc_client = self - .l2_rpc_client_pool - .get() - .await - .map_err(|_| ExecError::WsPool)?; + let l2_rpc_client = self.get_ready_rpc_client().await?; let received_payloads = l2_rpc_client .get_msgs_by_scope(raw_scope) @@ -326,6 +321,14 @@ where Ok(received_payloads) } + + /// Retrieves a ready-to-use RPC client from the client pool. + async fn get_ready_rpc_client(&self) -> Result, ExecError> { + self.l2_rpc_client_pool + .get() + .await + .map_err(|_| ExecError::WsPool) + } } impl Debug for ExecHandler diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs index caee7ef97..4c930744f 100644 --- a/crates/bridge-exec/src/ws_client.rs +++ b/crates/bridge-exec/src/ws_client.rs @@ -71,6 +71,7 @@ impl Manager for WsClientManager { let client = WsClientBuilder::default() .build(self.config.url.clone()) .await; + let bl = match client { Ok(cl) => WsClientState::Working(cl), Err(_) => WsClientState::NotWorking, @@ -107,6 +108,10 @@ impl Manager for WsClientManager { } } +fn make_not_working_error() -> ClientError { + ClientError::Transport(BoxError::from("Client is Not Working".to_string())) +} + /// Implements the `ClientT` trait for `WsClient`. /// /// This implementation allows `WsClient` to perform JSON-RPC operations, @@ -122,9 +127,7 @@ impl ClientT for WsClient { { match &self.0 { WsClientState::Working(inner) => inner.notification(method, params).await, - WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( - "Client is Not Working".to_string(), - ))), + WsClientState::NotWorking => Err(make_not_working_error()), } } @@ -138,9 +141,7 @@ impl ClientT for WsClient { { match &self.0 { WsClientState::Working(inner) => inner.request(method, params).await, - WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( - "Client is Not Working".to_string(), - ))), + WsClientState::NotWorking => Err(make_not_working_error()), } } @@ -154,9 +155,7 @@ impl ClientT for WsClient { { match &self.0 { WsClientState::Working(inner) => inner.batch_request(batch).await, - WsClientState::NotWorking => Err(ClientError::Transport(BoxError::from( - "Client is Not Working".to_string(), - ))), + WsClientState::NotWorking => Err(make_not_working_error()), } } } From f9ee1804bb8a7ee4e55698ad304f6d3a122aa71f Mon Sep 17 00:00:00 2001 From: voidash Date: Sun, 5 Jan 2025 16:46:27 +0545 Subject: [PATCH 7/9] bridge-exec: remove working, notworking state from WsClient --- .../src/modes/operator/task_manager.rs | 3 +- crates/bridge-exec/src/handler.rs | 2 +- crates/bridge-exec/src/ws_client.rs | 66 +++++-------------- 3 files changed, 17 insertions(+), 54 deletions(-) diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index c0a4c69af..927b250eb 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -125,8 +125,7 @@ where let l2_rpc_client = self .exec_handler - .l2_rpc_client_pool - .get() + .get_ready_rpc_client() .await .map_err(|_| PollDutyError::WsPool)?; diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index 62e598929..3c5c425bf 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -323,7 +323,7 @@ where } /// Retrieves a ready-to-use RPC client from the client pool. - async fn get_ready_rpc_client(&self) -> Result, ExecError> { + pub async fn get_ready_rpc_client(&self) -> Result, ExecError> { self.l2_rpc_client_pool .get() .await diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs index 4c930744f..92ff9dfdf 100644 --- a/crates/bridge-exec/src/ws_client.rs +++ b/crates/bridge-exec/src/ws_client.rs @@ -10,7 +10,7 @@ use jsonrpsee::{ client::{BatchResponse, ClientT}, params::BatchRequestBuilder, traits::ToRpcParams, - BoxError, ClientError, DeserializeOwned, + ClientError, DeserializeOwned, }, ws_client::{WsClient as WebsocketClient, WsClientBuilder}, }; @@ -35,24 +35,14 @@ pub struct WsClientManager { pub config: WsClientConfig, } -/// Represents the state of a WebSocket client. -/// -/// - `Working`: The client is connected and operational. -/// - `NotWorking`: The client is not connected or has encountered an error. -#[derive(Debug)] -pub enum WsClientState { - /// The WebSocket client is connected and operational. - Working(WebsocketClient), - /// The WebSocket client is not connected or is in a failed state. - NotWorking, -} - /// Wrapper for the WebSocket client state. /// /// This struct encapsulates the `WsClientState`, enabling unified management of /// both connected and failed client states. #[derive(Debug)] -pub struct WsClient(WsClientState); +pub struct WsClient { + inner: WebsocketClient, +} /// Implements the `Manager` trait for managing WebSocket clients. /// @@ -70,13 +60,9 @@ impl Manager for WsClientManager { async fn create(&self) -> Result { let client = WsClientBuilder::default() .build(self.config.url.clone()) - .await; + .await?; - let bl = match client { - Ok(cl) => WsClientState::Working(cl), - Err(_) => WsClientState::NotWorking, - }; - Ok(WsClient(bl)) + Ok(WsClient { inner: client }) } /// Recycles an existing WebSocket client. @@ -89,29 +75,16 @@ impl Manager for WsClientManager { obj: &mut Self::Type, _metrics: &managed::Metrics, ) -> RecycleResult { - match &obj.0 { - WsClientState::Working(cl) => { - if cl.is_connected() { - Ok(()) - } else { - Err(RecycleError::Message( - "Connection lost, recreate client".to_string().into(), - )) - } - } - WsClientState::NotWorking => Err(RecycleError::Message( - "Connection still not found, recreate client" - .to_string() - .into(), - )), + if obj.inner.is_connected() { + Ok(()) + } else { + Err(RecycleError::Message( + "Connection lost, recreate client".to_string().into(), + )) } } } -fn make_not_working_error() -> ClientError { - ClientError::Transport(BoxError::from("Client is Not Working".to_string())) -} - /// Implements the `ClientT` trait for `WsClient`. /// /// This implementation allows `WsClient` to perform JSON-RPC operations, @@ -125,10 +98,7 @@ impl ClientT for WsClient { where Params: ToRpcParams + Send, { - match &self.0 { - WsClientState::Working(inner) => inner.notification(method, params).await, - WsClientState::NotWorking => Err(make_not_working_error()), - } + self.inner.notification(method, params).await } /// Send a [method call request](https://www.jsonrpc.org/specification#request_object). @@ -139,10 +109,7 @@ impl ClientT for WsClient { R: DeserializeOwned, Params: ToRpcParams + Send, { - match &self.0 { - WsClientState::Working(inner) => inner.request(method, params).await, - WsClientState::NotWorking => Err(make_not_working_error()), - } + self.inner.request(method, params).await } /// Sends a batch request. @@ -153,9 +120,6 @@ impl ClientT for WsClient { where R: DeserializeOwned + fmt::Debug + 'a, { - match &self.0 { - WsClientState::Working(inner) => inner.batch_request(batch).await, - WsClientState::NotWorking => Err(make_not_working_error()), - } + self.inner.batch_request(batch).await } } From 3167b4f49fd741e0e8f951b2dd6e0aac940a9aca Mon Sep 17 00:00:00 2001 From: voidash Date: Sun, 5 Jan 2025 17:59:46 +0545 Subject: [PATCH 8/9] bridge-client: resolve comments --- bin/bridge-client/src/args.rs | 2 +- bin/bridge-client/src/constants.rs | 2 +- bin/bridge-client/src/errors.rs | 15 ++++++++++++--- bin/bridge-client/src/modes/operator/bootstrap.rs | 2 +- .../src/modes/operator/task_manager.rs | 15 +++++++-------- crates/bridge-exec/src/errors.rs | 2 +- crates/bridge-exec/src/handler.rs | 2 +- 7 files changed, 24 insertions(+), 16 deletions(-) diff --git a/bin/bridge-client/src/args.rs b/bin/bridge-client/src/args.rs index 1ba89298a..9bfd28b8b 100644 --- a/bin/bridge-client/src/args.rs +++ b/bin/bridge-client/src/args.rs @@ -103,7 +103,7 @@ pub(crate) struct Cli { /// Max retries for when rpc server fails during duty polling #[argh( option, - description = "max retries for when rpc server fails during duty polling" + description = "max retries for when RPC server fails during duty polling" )] pub max_rpc_retry_count: Option, } diff --git a/bin/bridge-client/src/constants.rs b/bin/bridge-client/src/constants.rs index 54f9e2a5b..aa24623ec 100644 --- a/bin/bridge-client/src/constants.rs +++ b/bin/bridge-client/src/constants.rs @@ -8,5 +8,5 @@ pub(super) const DEFAULT_DUTY_TIMEOUT_SEC: u64 = 600; /// The default bridge rocksdb database retry count, if not overridden by the user. pub(super) const ROCKSDB_RETRY_COUNT: u16 = 3; -/// The default Rpc retry count, if not overridden by the user. +/// The default RPC retry count, if not overridden by the user. pub(super) const MAX_RPC_RETRY_COUNT: u16 = 5; diff --git a/bin/bridge-client/src/errors.rs b/bin/bridge-client/src/errors.rs index 82fbea76b..ec1a2090d 100644 --- a/bin/bridge-client/src/errors.rs +++ b/bin/bridge-client/src/errors.rs @@ -8,9 +8,18 @@ pub enum InitError { #[derive(Debug, Clone, Error)] pub enum PollDutyError { - #[error("Rpc client: {0}")] - RpcError(String), + #[error("RPC client: {0}")] + Rpc(String), - #[error("fetching websocket client from pool failed")] + #[error("fetching WebSocket client from pool failed")] WsPool, } + +#[derive(Debug, Clone, Error)] +pub enum TaskManagerError { + #[error("Polling Duty Failed: {0}")] + Poll(#[from] PollDutyError), + + #[error("Maximum retries exceeded. Num retries {0}")] + MaxRetry(u16), +} diff --git a/bin/bridge-client/src/modes/operator/bootstrap.rs b/bin/bridge-client/src/modes/operator/bootstrap.rs index 25a4d57a2..b207ca96b 100644 --- a/bin/bridge-client/src/modes/operator/bootstrap.rs +++ b/bin/bridge-client/src/modes/operator/bootstrap.rs @@ -81,7 +81,7 @@ pub(crate) async fn bootstrap(args: Cli) -> anyhow::Result<()> { let l2_rpc_client = l2_rpc_client_pool .get() .await - .expect("cannot get rpc client from pool"); + .expect("cannot get RPC client from pool"); // Get the keypair after deriving the wallet xpriv. let operator_keys = resolve_xpriv(args.master_xpriv, args.master_xpriv_path)?; diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index 927b250eb..5f44aa695 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -4,7 +4,6 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; -use anyhow::bail; use bitcoin::Txid; use strata_bridge_exec::{ errors::{ExecError, ExecResult}, @@ -22,7 +21,7 @@ use tokio::{ }; use tracing::{error, info, trace, warn}; -use crate::errors::PollDutyError; +use crate::errors::{PollDutyError, TaskManagerError}; pub(super) struct TaskManager where @@ -45,7 +44,7 @@ where duty_polling_interval: Duration, duty_timeout_duration: Duration, max_retries: u16, - ) -> anyhow::Result<()> { + ) -> Result<(), TaskManagerError> { info!(?duty_polling_interval, "Starting to poll for duties"); let mut retries = 0; loop { @@ -91,13 +90,13 @@ where } Err(err) => { match err { - PollDutyError::RpcError(err) => { - error!(%err, "could not get rpc response"); + PollDutyError::Rpc(err) => { + error!(%err, "could not get RPC response"); retries += 1; if retries >= max_retries { error!(%err, "Exceeded maximum retries to acquire client. Failing gracefully"); - bail!("Exceeded maximum retries to acquire client") + return Err(TaskManagerError::MaxRetry(max_retries)); } // Exponential backoff @@ -105,7 +104,7 @@ where sleep(delay).await; } _ => { - bail!(err.to_string()); + return Err(TaskManagerError::Poll(err)); } } } @@ -136,7 +135,7 @@ where } = l2_rpc_client .get_bridge_duties(self.exec_handler.own_index, start_index) .await - .map_err(|err| PollDutyError::RpcError(err.to_string()))?; + .map_err(|err| PollDutyError::Rpc(err.to_string()))?; // check which duties this operator should do something let mut todo_duties: Vec = Vec::with_capacity(duties.len()); diff --git a/crates/bridge-exec/src/errors.rs b/crates/bridge-exec/src/errors.rs index 657282d13..531b152bf 100644 --- a/crates/bridge-exec/src/errors.rs +++ b/crates/bridge-exec/src/errors.rs @@ -48,7 +48,7 @@ pub enum ExecError { #[error("bitcoin signer do not have access to the private keys, i.e. xpriv")] Xpriv, - /// Error getting the web socket client from pool + /// Error getting the WebSocket client from pool #[error("fetching websocket client from pool failed")] WsPool, } diff --git a/crates/bridge-exec/src/handler.rs b/crates/bridge-exec/src/handler.rs index 3c5c425bf..47b972b14 100644 --- a/crates/bridge-exec/src/handler.rs +++ b/crates/bridge-exec/src/handler.rs @@ -25,7 +25,7 @@ use crate::{ ws_client::WsClientManager, }; -/// Websocket client pool +/// WebSocket client pool pub type WsClientPool = Pool; /// The execution context for handling bridge-related signing activities. From 92849d056c80f9de747cdcee7cc84da4ae26e838 Mon Sep 17 00:00:00 2001 From: voidash Date: Sun, 5 Jan 2025 20:15:00 +0545 Subject: [PATCH 9/9] func-test: use the BridgeTestBase for unreliableSequencer, and resolve comments --- bin/bridge-client/src/errors.rs | 3 - .../src/modes/operator/task_manager.rs | 27 ++-- bin/datatool/Cargo.toml | 2 +- crates/bridge-exec/src/ws_client.rs | 27 ++-- .../fn_bridge_deposit_seq_unreliable.py | 138 ++++-------------- functional-tests/testenv.py | 14 +- 6 files changed, 63 insertions(+), 148 deletions(-) diff --git a/bin/bridge-client/src/errors.rs b/bin/bridge-client/src/errors.rs index ec1a2090d..b64aafd28 100644 --- a/bin/bridge-client/src/errors.rs +++ b/bin/bridge-client/src/errors.rs @@ -17,9 +17,6 @@ pub enum PollDutyError { #[derive(Debug, Clone, Error)] pub enum TaskManagerError { - #[error("Polling Duty Failed: {0}")] - Poll(#[from] PollDutyError), - #[error("Maximum retries exceeded. Num retries {0}")] MaxRetry(u16), } diff --git a/bin/bridge-client/src/modes/operator/task_manager.rs b/bin/bridge-client/src/modes/operator/task_manager.rs index 5f44aa695..f7713008f 100644 --- a/bin/bridge-client/src/modes/operator/task_manager.rs +++ b/bin/bridge-client/src/modes/operator/task_manager.rs @@ -89,24 +89,25 @@ where } } Err(err) => { - match err { + match &err { PollDutyError::Rpc(err) => { error!(%err, "could not get RPC response"); - retries += 1; - if retries >= max_retries { - error!(%err, "Exceeded maximum retries to acquire client. Failing gracefully"); - - return Err(TaskManagerError::MaxRetry(max_retries)); - } - - // Exponential backoff - let delay = Duration::from_secs(1) * 2u32.pow(retries as u32 - 1); - sleep(delay).await; } - _ => { - return Err(TaskManagerError::Poll(err)); + PollDutyError::WsPool => { + error!(%err, "cannot get workable RPC WebSocket client "); } } + + retries += 1; + if retries >= max_retries { + error!(%err, "Exceeded maximum retries to acquire client. Failing gracefully"); + + return Err(TaskManagerError::MaxRetry(max_retries)); + } + + // Exponential backoff + let delay = Duration::from_secs(1) * 2u32.pow(retries as u32 - 1); + sleep(delay).await; } } sleep(duty_polling_interval).await; diff --git a/bin/datatool/Cargo.toml b/bin/datatool/Cargo.toml index 2a6e605c3..2d402852a 100644 --- a/bin/datatool/Cargo.toml +++ b/bin/datatool/Cargo.toml @@ -27,5 +27,5 @@ zeroize.workspace = true [features] default = [] -sp1 = ["strata-sp1-guest-builder"] risc0 = ["strata-risc0-guest-builder", "bytemuck"] +sp1 = ["strata-sp1-guest-builder"] diff --git a/crates/bridge-exec/src/ws_client.rs b/crates/bridge-exec/src/ws_client.rs index 92ff9dfdf..62f65c5c0 100644 --- a/crates/bridge-exec/src/ws_client.rs +++ b/crates/bridge-exec/src/ws_client.rs @@ -1,5 +1,5 @@ -//! Wrapper for managing a WebSocket client that allows recycling or restarting -//! the client without restarting the entire program. +//! Wrapper for managing a WebSocket client that supports connection recycling and client +//! restarting. use core::fmt; @@ -17,7 +17,7 @@ use jsonrpsee::{ /// Configuration for the WebSocket client. /// -/// settings that are necessary to initialize and configure +/// Settings that are necessary to initialize and configure /// the WebSocket client, ie. URL of the WebSocket server. #[derive(Clone, Debug)] pub struct WsClientConfig { @@ -27,7 +27,7 @@ pub struct WsClientConfig { /// Manager for creating and recycling WebSocket clients. /// -/// This struct manages the lifecycle of WebSocket clients, including creating +/// Manages the lifecycle of WebSocket clients, including creating /// new clients and determining whether existing clients are still valid. #[derive(Clone, Debug)] pub struct WsClientManager { @@ -35,18 +35,16 @@ pub struct WsClientManager { pub config: WsClientConfig, } -/// Wrapper for the WebSocket client state. -/// -/// This struct encapsulates the `WsClientState`, enabling unified management of -/// both connected and failed client states. +/// Wrapper for the WebSocket client. #[derive(Debug)] pub struct WsClient { + /// WebSocket client inner: WebsocketClient, } -/// Implements the `Manager` trait for managing WebSocket clients. +/// Implements the [`Manager`] trait for managing WebSocket clients. /// -/// The `WsClientManager` provides methods to create new clients and recycle +/// The [`WsClientManager`] provides methods to create new clients and recycle /// existing ones, ensuring that clients remain in a valid state. impl Manager for WsClientManager { type Type = WsClient; @@ -55,8 +53,7 @@ impl Manager for WsClientManager { /// Creates a new WebSocket client. /// /// Attempts to initialize a new WebSocket client using the provided configuration. - /// Returns a `WsClient` wrapped in a `Working` or `NotWorking` state depending on - /// the success of the operation. + /// Returns a [`WsClient`] async fn create(&self) -> Result { let client = WsClientBuilder::default() .build(self.config.url.clone()) @@ -65,7 +62,7 @@ impl Manager for WsClientManager { Ok(WsClient { inner: client }) } - /// Recycles an existing WebSocket client. + /// Recycles an existing [`WsClient`] /// /// Checks whether the provided client is still valid. If the client is connected, /// it is retained. Otherwise, an error is returned, indicating the need to recreate @@ -85,9 +82,9 @@ impl Manager for WsClientManager { } } -/// Implements the `ClientT` trait for `WsClient`. +/// Implements the [`ClientT`] trait for [`WsClient`]. /// -/// This implementation allows `WsClient` to perform JSON-RPC operations, +/// This implementation allows `[WsClient`] to perform JSON-RPC operations, /// including notifications, method calls, and batch requests. #[async_trait] impl ClientT for WsClient { diff --git a/functional-tests/fn_bridge_deposit_seq_unreliable.py b/functional-tests/fn_bridge_deposit_seq_unreliable.py index 3706dc175..4569c4033 100644 --- a/functional-tests/fn_bridge_deposit_seq_unreliable.py +++ b/functional-tests/fn_bridge_deposit_seq_unreliable.py @@ -1,42 +1,17 @@ -import time - import flexitest -from bitcoinlib.services.bitcoind import BitcoindClient -from strata_utils import ( - deposit_request_transaction, - get_balance, -) -from web3 import Web3 -from web3.middleware import SignAndSendRawMiddlewareBuilder import testenv -from constants import ( - DEFAULT_ROLLUP_PARAMS, - SATS_TO_WEI, -) +from constants import SATS_TO_WEI +from rollup_params_cfg import RollupConfig from utils import get_bridge_pubkey, wait_until, wait_until_with_value -# Local constants -# D BTC -DEPOSIT_AMOUNT = DEFAULT_ROLLUP_PARAMS["deposit_amount"] -# Gas for the withdrawal transaction -WITHDRAWAL_GAS_FEE = 22_000 # technically is 21_000 -# Ethereum Private Key -# NOTE: don't use this private key in production -ETH_PRIVATE_KEY = "0x0000000000000000000000000000000000000000000000000000000000000001" -# BTC Operator's fee for withdrawal -OPERATOR_FEE = DEFAULT_ROLLUP_PARAMS["operator_fee"] - @flexitest.register -class BridgeDepositSequencerUnreliableTest(testenv.StrataTester): +class BridgeDepositSequencerUnreliableTest(testenv.BridgeTestBase): """ - TODO: Depends on STR-734 operator reassignment, and this can be merged only that is merged - - Makes two DRT deposits to the same EL address, then makes a withdrawal to a change address. - - Checks if the balance of the EL address is expected - and if the BTC balance of the change address is expected. + Makes two DRT deposits to the same EL address + After the first DRT is processed and EL address has balance,the sequencer is + restarted . After restarting check if EL address has required funds """ def __init__(self, ctx: flexitest.InitContext): @@ -45,105 +20,52 @@ def __init__(self, ctx: flexitest.InitContext): def main(self, ctx: flexitest.RunContext): address = ctx.env.gen_ext_btc_address() withdraw_address = ctx.env.gen_ext_btc_address() + el_address = self.eth_account.address + bridge_pk = get_bridge_pubkey(self.seqrpc) self.debug(f"Address: {address}") self.debug(f"Change Address: {withdraw_address}") - - btc = ctx.get_service("bitcoin") - seq = ctx.get_service("sequencer") - reth = ctx.get_service("reth") - - seqrpc = seq.create_rpc() - btcrpc: BitcoindClient = btc.create_rpc() - rethrpc = reth.create_rpc() - - seq_addr = seq.get_prop("address") - self.debug(f"Sequencer Address: {seq_addr}") - - btc_url = btcrpc.base_url - btc_user = btc.props["rpc_user"] - btc_password = btc.props["rpc_password"] - - self.debug(f"BTC URL: {btc_url}") - self.debug(f"BTC user: {btc_user}") - self.debug(f"BTC password: {btc_password}") - - # Get the original balance of the withdraw address - original_balance = get_balance(withdraw_address, btc_url, btc_user, btc_password) - self.debug(f"BTC balance before withdraw: {original_balance}") - - web3: Web3 = reth.create_web3() - # Create an Ethereum account from the private key - eth_account = web3.eth.account.from_key(ETH_PRIVATE_KEY) - el_address = eth_account.address self.debug(f"EL address: {el_address}") - - # Add the Ethereum account as auto-signer - # Transactions from `el_address` will then be signed, under the hood, in the middleware - web3.middleware_onion.inject(SignAndSendRawMiddlewareBuilder.build(eth_account), layer=0) - - # Get the balance of the EL address before the deposits - balance = int(rethrpc.eth_getBalance(el_address), 16) - self.debug(f"Strata Balance before deposits: {balance}") - assert balance == 0, "Strata balance is not expected" - - # Get operators pubkey and musig2 aggregates it - bridge_pk = get_bridge_pubkey(seqrpc) self.debug(f"Bridge pubkey: {bridge_pk}") - self.debug("Stopping the sequencer") - self.make_drt(ctx, el_address, bridge_pk) - time.sleep(2) + cfg: RollupConfig = ctx.env.rollup_cfg() + # deposit + self.deposit(ctx, el_address, bridge_pk) # stop sequencer - seq.stop() - time.sleep(1) + self.seq.stop() + + # wait until sequencer stops + wait_until(lambda: check_sequencer_down(self.seqrpc)) + self.debug("Making DRT") + # make deposit request transaction self.make_drt(ctx, el_address, bridge_pk) # start again - seq.start() + self.seq.start() wait_until( - lambda: seqrpc.strata_protocolVersion() is not None, + lambda: not check_sequencer_down(self.seqrpc), error_with="Sequencer did not start on time", timeout=10, ) balance_after_deposits = wait_until_with_value( - lambda: int(rethrpc.eth_getBalance(el_address), 16), - predicate=lambda v: v == 2 * DEPOSIT_AMOUNT * SATS_TO_WEI, + lambda: int(self.rethrpc.eth_getBalance(el_address), 16), + predicate=lambda v: v == 2 * cfg.deposit_amount * SATS_TO_WEI, timeout=15, ) self.debug(f"Strata Balance after deposits: {balance_after_deposits}") return True - def make_drt(self, ctx: flexitest.RunContext, el_address, musig_bridge_pk): - """ - Deposit Request Transaction - """ - # Get relevant data - btc = ctx.get_service("bitcoin") - seq = ctx.get_service("sequencer") - btcrpc: BitcoindClient = btc.create_rpc() - btc_url = btcrpc.base_url - btc_user = btc.props["rpc_user"] - btc_password = btc.props["rpc_password"] - seq_addr = seq.get_prop("address") - # Create the deposit request transaction - tx = bytes( - deposit_request_transaction( - el_address, musig_bridge_pk, btc_url, btc_user, btc_password - ) - ).hex() - self.debug(f"Deposit request tx: {tx}") - - # Send the transaction to the Bitcoin network - txid = btcrpc.proxy.sendrawtransaction(tx) - self.debug(f"sent deposit request with txid = {txid} for address {el_address}") - # time to mature DRT - btcrpc.proxy.generatetoaddress(6, seq_addr) - - # time to mature DT - btcrpc.proxy.generatetoaddress(6, seq_addr) +def check_sequencer_down(seqrpc): + """ + Returns True if sequencer RPC is down + """ + try: + seqrpc.strata_protocolVersion() + return False + except RuntimeError: + return True diff --git a/functional-tests/testenv.py b/functional-tests/testenv.py index a07f4ee63..bafc3a21b 100644 --- a/functional-tests/testenv.py +++ b/functional-tests/testenv.py @@ -97,16 +97,14 @@ def deposit(self, ctx: flexitest.RunContext, el_address, bridge_pk): initial_balance = int(self.rethrpc.eth_getBalance(el_address), 16) self.debug(f"Strata Balance right before deposit calls: {initial_balance}") - self.__make_drt(ctx, el_address, bridge_pk) + self.make_drt(ctx, el_address, bridge_pk) # Wait until the deposit is seen on L2 expected_balance = initial_balance + deposit_amount * SATS_TO_WEI - wait_until(lambda: int(self.rethrpc.eth_getBalance(el_address), 16) == expected_balance) - - # Final assertion - final_balance = int(self.rethrpc.eth_getBalance(el_address), 16) - self.debug(f"Strata Balance after deposits: {final_balance}") - assert final_balance == expected_balance, "Strata balance after deposit is not as expected" + wait_until( + lambda: int(self.rethrpc.eth_getBalance(el_address), 16) == expected_balance, + error_with="Strata balance after deposit is not as expected", + ) def withdraw( self, @@ -192,7 +190,7 @@ def __estimate_withdraw_gas(self, deposit_amount, el_address, change_address_pk) } return self.web3.eth.estimate_gas(transaction) - def __make_drt(self, ctx: flexitest.RunContext, el_address, musig_bridge_pk): + def make_drt(self, ctx: flexitest.RunContext, el_address, musig_bridge_pk): """ Deposit Request Transaction """