Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STR-734: OPERATOR REASSIGNMENT #557

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions bin/bridge-client/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Parses command-line arguments for the bridge-client CLI.

use std::fmt::Display;

use argh::FromArgs;
Expand All @@ -9,73 +10,104 @@ use crate::errors::InitError;
#[argh(name = "strata-bridge-client")]
#[argh(description = "The bridge client for Strata")]
pub(crate) struct Cli {
/// Specifies the mode to run the client in: either Operator (alias: op) or Challenger (alias:
/// ch).
#[argh(
positional,
description = "what mode to run the client in, either Operator (alias: op) or Challenger (alias: ch)"
)]
pub mode: String,

/// Path to the directory where RocksDB databases are stored.
/// Defaults to `$HOME/.local/share/strata/` if not specified.
#[argh(
option,
description = "path to the directory where to store the rocksdb databases (default: $HOME/.local/share/strata/)"
)]
pub datadir: Option<String>,

/// Master operator key in xpriv format. Defaults to the environment variable
/// `STRATA_OP_MASTER_XPRIV` if not provided.
#[argh(
option,
description = "xpriv to be used as the master operator's key (default: envvar STRATA_OP_MASTER_XPRIV)"
)]
pub master_xpriv: Option<String>,

/// Path to the file containing the master operator's xpriv.
/// Should not be used with the `--master-xpriv` option or `STRATA_OP_MASTER_XPRIV` environment
/// variable.
#[argh(
option,
description = "path to the file containing the master operator's xpriv (don't use with --master-xpriv or the envvar STRATA_OP_MASTER_XPRIV)"
)]
pub master_xpriv_path: Option<String>,

/// Host address for the RPC server. Defaults to `127.0.0.1` if not specified.
#[argh(
option,
description = "host to run the rpc server on (default: 127.0.0.1)"
)]
pub rpc_host: Option<String>,

/// Port number for the RPC server. Defaults to `4781` if not specified.
#[argh(option, description = "port to run the rpc server on (default: 4781)")]
pub rpc_port: Option<u32>,

/// URL for the Bitcoin RPC endpoint.
#[argh(option, description = "url for the bitcoin RPC")]
pub btc_url: String,

/// Username for accessing the Bitcoin RPC.
#[argh(option, description = "username for bitcoin RPC")]
pub btc_user: String,

/// Password for accessing the Bitcoin RPC.
#[argh(option, description = "password for bitcoin RPC")]
pub btc_pass: String,

/// URL for the Rollup RPC server.
#[argh(option, description = "url for the rollup RPC server")]
pub rollup_url: String,

/// Interval for polling bridge duties in milliseconds.
/// Defaults to the block time according to the Rollup RPC.
#[argh(
option,
description = "bridge duty polling interval in milliseconds (default: block time according to rollup RPC)"
)]
pub duty_interval: Option<u64>, // default: same as rollup block time
pub duty_interval: Option<u64>,

/// Interval for polling bridge messages in milliseconds.
/// Defaults to half the block time according to the Rollup RPC.
#[argh(
option,
description = "bridge message polling interval in milliseconds (default: half of the block time according to the rollup RPC client)"
)]
pub message_interval: Option<u64>, // default: half of the rollup block time
pub message_interval: Option<u64>,

/// Number of retries for RocksDB database operations. Defaults to `3`.
#[argh(
option,
description = "retry count for the rocksdb database (default: 3)"
)]
pub retry_count: Option<u16>,

/// Timeout duration for duties in seconds. Defaults to `600`.
#[argh(
option,
description = "duty timeout duration in seconds (default: 600)"
)]
pub duty_timeout_duration: Option<u64>,
}

/// Represents the operational mode of the client.
///
/// - `Operator`: Handles deposits, withdrawals, and challenges.
/// - `Challenger`: Verifies and challenges Operator claims.
#[derive(Debug, Clone)]
pub(super) enum OperationMode {
/// Run client in Operator mode to handle deposits, withdrawals and challenging.
/// Run client in Operator mode to handle deposits, withdrawals, and challenging.
Operator,

/// Run client in Challenger mode to verify/challenge Operator claims.
Expand Down
1 change: 1 addition & 0 deletions bin/bridge-client/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pub(super) const DEFAULT_RPC_PORT: u32 = 4781; // first 4 digits in the sha256 o

pub(super) const DEFAULT_RPC_HOST: &str = "127.0.0.1";

pub(super) const DEFAULT_DUTY_TIMEOUT_SEC: u64 = 600;
/// The default bridge rocksdb database retry count, if not overridden by the user.
pub(super) const ROCKSDB_RETRY_COUNT: u16 = 3;
14 changes: 12 additions & 2 deletions bin/bridge-client/src/modes/operator/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
use super::{constants::DB_THREAD_COUNT, task_manager::TaskManager};
use crate::{
args::Cli,
constants::{DEFAULT_RPC_HOST, DEFAULT_RPC_PORT, ROCKSDB_RETRY_COUNT},
constants::{
DEFAULT_DUTY_TIMEOUT_SEC, DEFAULT_RPC_HOST, DEFAULT_RPC_PORT, ROCKSDB_RETRY_COUNT,
},
db::open_rocksdb_database,
rpc_server::{self, BridgeRpc},
xpriv::resolve_xpriv,
Expand Down Expand Up @@ -148,9 +150,17 @@
Duration::from_millis,
);

let duty_timeout_duration = Duration::from_secs(
args.duty_timeout_duration
.unwrap_or(DEFAULT_DUTY_TIMEOUT_SEC),
);

Check warning on line 157 in bin/bridge-client/src/modes/operator/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/bootstrap.rs#L153-L157

Added lines #L153 - L157 were not covered by tests
// TODO: wrap these in `strata-tasks`
let duty_task = tokio::spawn(async move {
if let Err(e) = task_manager.start(duty_polling_interval).await {
if let Err(e) = task_manager
.start(duty_polling_interval, duty_timeout_duration)
.await

Check warning on line 162 in bin/bridge-client/src/modes/operator/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/bootstrap.rs#L160-L162

Added lines #L160 - L162 were not covered by tests
{
error!(error = %e, "could not start task manager");
};
});
Expand Down
46 changes: 29 additions & 17 deletions bin/bridge-client/src/modes/operator/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use strata_rpc_types::RpcBridgeDuties;
use strata_state::bridge_duties::{BridgeDuty, BridgeDutyStatus};
use strata_storage::ops::{bridge_duty::BridgeDutyOps, bridge_duty_index::BridgeDutyIndexOps};
use tokio::{task::JoinSet, time::sleep};
use tokio::{
task::JoinSet,
time::{sleep, timeout},
};
use tracing::{error, info, trace, warn};

pub(super) struct TaskManager<L2Client, TxBuildContext, Bcast>
Expand All @@ -36,14 +39,21 @@
TxBuildContext: BuildContext + Sync + Send + 'static,
Bcast: Broadcaster + Sync + Send + 'static,
{
pub(super) async fn start(&self, duty_polling_interval: Duration) -> anyhow::Result<()> {
pub(super) async fn start(
&self,
duty_polling_interval: Duration,
duty_timeout_duration: Duration,
) -> anyhow::Result<()> {
info!(?duty_polling_interval, "Starting to poll for duties");

Check warning on line 47 in bin/bridge-client/src/modes/operator/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/task_manager.rs#L42-L47

Added lines #L42 - L47 were not covered by tests
loop {
let RpcBridgeDuties {
duties,
start_index,
stop_index,
} = self.poll_duties().await?;

info!(num_duties = duties.len(), "got duties");

Check warning on line 55 in bin/bridge-client/src/modes/operator/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/task_manager.rs#L55

Added line #L55 was not covered by tests

let mut handles = JoinSet::new();
for duty in duties {
let exec_handler = self.exec_handler.clone();
Expand All @@ -54,21 +64,23 @@
});
}

let any_failed = handles.join_all().await.iter().any(|res| res.is_err());

// if none of the duties failed, update the duty index so that the
// next batch is fetched in the next poll.
//
// otherwise, don't update the index so that the current batch is refetched and
// ones that were not executed successfully are executed again.
if !any_failed {
info!(%start_index, %stop_index, "updating duty index");
if let Err(e) = self
.bridge_duty_idx_db_ops
.set_index_async(stop_index)
.await
{
error!(error = %e, %start_index, %stop_index, "could not update duty index");
// TODO: There should be timeout duration based on duty and not a common timeout
// duration
if let Ok(any_failed) = timeout(duty_timeout_duration, handles.join_all()).await {

Check warning on line 69 in bin/bridge-client/src/modes/operator/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/task_manager.rs#L69

Added line #L69 was not covered by tests
// if none of the duties failed, update the duty index so that the
// next batch is fetched in the next poll.
//
// otherwise, don't update the index so that the current batch is refetched and
// ones that were not executed successfully are executed again.
if !any_failed.iter().any(|res| res.is_err()) {
info!(%start_index, %stop_index, "updating duty index");
if let Err(e) = self
.bridge_duty_idx_db_ops
.set_index_async(stop_index)
.await

Check warning on line 80 in bin/bridge-client/src/modes/operator/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/task_manager.rs#L75-L80

Added lines #L75 - L80 were not covered by tests
{
error!(error = %e, %start_index, %stop_index, "could not update duty index");
}

Check warning on line 83 in bin/bridge-client/src/modes/operator/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/bridge-client/src/modes/operator/task_manager.rs#L82-L83

Added lines #L82 - L83 were not covered by tests
}
}

Expand Down
13 changes: 8 additions & 5 deletions crates/bridge-exec/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,16 @@
.filter_map(move |msg| {
let msg = borsh::from_slice::<BridgeMessage>(&msg.0);
if let Ok(msg) = msg {
let payload = msg.payload();
let parsed_payload = borsh::from_slice::<Payload>(payload);
let raw_payload = msg.payload();
let payload = borsh::from_slice::<Payload>(raw_payload);
let raw_scope = msg.scope();
let scope = borsh::from_slice::<Scope>(raw_scope);
debug!(?msg, ?payload, ?scope, "got message from the L2 Client");

Check warning on line 302 in crates/bridge-exec/src/handler.rs

View check run for this annotation

Codecov / codecov/patch

crates/bridge-exec/src/handler.rs#L298-L302

Added lines #L298 - L302 were not covered by tests

match parsed_payload {
match payload {

Check warning on line 304 in crates/bridge-exec/src/handler.rs

View check run for this annotation

Codecov / codecov/patch

crates/bridge-exec/src/handler.rs#L304

Added line #L304 was not covered by tests
Ok(payload) => Some((msg.source_id(), payload)),
Err(err) => {
warn!(?scope, ?payload, error=?err, "skipping faulty message payload");
Err(ref error) => {
warn!(?scope, ?payload, ?error, "skipping faulty message payload");

Check warning on line 307 in crates/bridge-exec/src/handler.rs

View check run for this annotation

Codecov / codecov/patch

crates/bridge-exec/src/handler.rs#L306-L307

Added lines #L306 - L307 were not covered by tests
None
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/chaintsn/src/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,13 @@
// Check if the deposit is past the threshold.
if cur_block_height >= dstate.exec_deadline() {
// Pick the next assignee, if there are any.
let new_op_pos = if num_operators > 0 {
let op_off = rng.next_u32() % (num_operators - 1);
(dstate.assignee() + op_off) % num_operators
let new_op_pos = if num_operators > 1 {

Check warning on line 300 in crates/chaintsn/src/transition.rs

View check run for this annotation

Codecov / codecov/patch

crates/chaintsn/src/transition.rs#L300

Added line #L300 was not covered by tests
// Compute a random offset from 1 to (num_operators - 1),
// ensuring we pick a different operator than the current one.
let offset = 1 + (rng.next_u32() % (num_operators - 1));
(dstate.assignee() + offset) % num_operators

Check warning on line 304 in crates/chaintsn/src/transition.rs

View check run for this annotation

Codecov / codecov/patch

crates/chaintsn/src/transition.rs#L303-L304

Added lines #L303 - L304 were not covered by tests
} else {
// If there is only a single operator, we remain with the current assignee.
dstate.assignee()
};

Expand Down
2 changes: 2 additions & 0 deletions functional-tests/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def create_operator(
bitcoind_config: dict,
ctx: flexitest.EnvContext,
message_interval: int,
duty_timeout_duration: int,
):
idx = self.next_idx()
name = f"bridge.{idx}"
Expand All @@ -343,6 +344,7 @@ def create_operator(
"--btc-pass", bitcoind_config["bitcoind_pass"],
"--rollup-url", node_url,
"--message-interval", str(message_interval),
"--duty-timeout-duration", str(duty_timeout_duration),
]
# fmt: on

Expand Down
110 changes: 110 additions & 0 deletions functional-tests/fn_bridge_reassignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import time

import flexitest
from strata_utils import get_balance

import testenv
from constants import UNSPENDABLE_ADDRESS
from rollup_params_cfg import RollupConfig
from utils import get_bridge_pubkey, wait_until, wait_until_with_value


@flexitest.register
class BridgeWithdrawReassignmentTest(testenv.BridgeTestBase):
"""
Makes two DRT deposits, then triggers the withdrawal.
The bridge client associated with assigned operator id is stopped.
After the dispatch assignment duration is over,
Check if new operator is being assigned or not
voidash marked this conversation as resolved.
Show resolved Hide resolved
voidash marked this conversation as resolved.
Show resolved Hide resolved
Ensure that the withdrawal resumes and completes successfully
"""

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env(
testenv.BasicEnvConfig(
101, n_operators=3, pre_fund_addrs=True, duty_timeout_duration=10
)
)

def main(self, ctx: flexitest.RunContext):
address = ctx.env.gen_ext_btc_address()
withdraw_address = ctx.env.gen_ext_btc_address()
el_address = self.eth_account.address
self.debug(f"Address: {address}")
self.debug(f"Change Address: {withdraw_address}")
self.debug(f"EL Address: {el_address}")

cfg: RollupConfig = ctx.env.rollup_cfg()
# D BTC
deposit_amount = cfg.deposit_amount
# BTC Operator's fee for withdrawal
operator_fee = cfg.operator_fee
# BTC extra fee for withdrawal
withdraw_extra_fee = cfg.withdraw_extra_fee
# dispatch assignment duration for reassignment
dispatch_assignment_duration = cfg.dispatch_assignment_dur

btc_url = self.btcrpc.base_url
btc_user = self.btc.get_prop("rpc_user")
btc_password = self.btc.get_prop("rpc_password")
bridge_pk = get_bridge_pubkey(self.seqrpc)
self.debug(f"Bridge pubkey: {bridge_pk}")

original_balance = get_balance(withdraw_address, btc_url, btc_user, btc_password)
self.debug(f"BTC balance before withdraw: {original_balance}")

# Check initial balance is 0
balance = int(self.rethrpc.eth_getBalance(el_address), 16)
assert balance == 0, "Strata balance is not expected (should be zero initially)"

# Perform two deposits
self.deposit(ctx, el_address, bridge_pk)
self.deposit(ctx, el_address, bridge_pk)

# withdraw
self.withdraw(ctx, el_address, withdraw_address)

new_balance = get_balance(withdraw_address, btc_url, btc_user, btc_password)
self.debug(f"BTC balance after withdraw: {new_balance}")

# Check assigned operator
duties = self.seqrpc.strata_getBridgeDuties(0, 0)["duties"]
withdraw_duty = [d for d in duties if d["type"] == "FulfillWithdrawal"][0]
assigned_op_idx = withdraw_duty["payload"]["assigned_operator_idx"]
assigned_operator = ctx.get_service(f"bridge.{assigned_op_idx}")
self.debug(f"Assigned operator index: {assigned_op_idx}")

# Stop assigned operator
self.debug("Stopping assigned operator ...")
assigned_operator.stop()

# Let enough blocks pass so the assignment times out
self.btcrpc.proxy.generatetoaddress(dispatch_assignment_duration, UNSPENDABLE_ADDRESS)
time.sleep(3)

# Re-check duties
duties = self.seqrpc.strata_getBridgeDuties(0, 0)["duties"]
withdraw_duty = [d for d in duties if d["type"] == "FulfillWithdrawal"][0]
new_assigned_op_idx = withdraw_duty["payload"]["assigned_operator_idx"]
new_assigned_operator = ctx.get_service(f"bridge.{new_assigned_op_idx}")
self.debug(f"new assigned operator is {new_assigned_op_idx}")

# Ensure a new operator is assigned
assert new_assigned_operator != assigned_operator, "No new operator was assigned"
assigned_operator.start()
bridge_rpc = assigned_operator.create_rpc()

wait_until(lambda: bridge_rpc.stratabridge_uptime() is not None, timeout=10)

# generate l1 blocks equivalent to dispatch assignment duration
self.btcrpc.proxy.generatetoaddress(dispatch_assignment_duration, UNSPENDABLE_ADDRESS)

difference = deposit_amount - operator_fee - withdraw_extra_fee
new_balance = wait_until_with_value(
lambda: get_balance(withdraw_address, btc_url, btc_user, btc_password),
predicate=lambda v: v == original_balance + difference,
timeout=20,
)

self.debug(f"BTC balance after stopping and starting again: {new_balance}")
return True
Loading
Loading