Skip to content

Commit

Permalink
workers: task-driver: emit events from tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillo committed Dec 10, 2024
1 parent 18a7987 commit 64eff84
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 17 deletions.
3 changes: 2 additions & 1 deletion core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() -> Result<(), CoordinatorError> {
let (proof_generation_worker_sender, proof_generation_worker_receiver) =
new_proof_manager_queue();
let (task_sender, task_receiver) = new_task_driver_queue();
let (_event_manager_sender, event_manager_receiver) = new_event_manager_queue();
let (event_manager_sender, event_manager_receiver) = new_event_manager_queue();

// Construct a global state
let (state_failure_send, mut state_failure_recv) = new_worker_failure_channel();
Expand Down Expand Up @@ -181,6 +181,7 @@ async fn main() -> Result<(), CoordinatorError> {
arbitrum_client.clone(),
network_sender.clone(),
proof_generation_worker_sender.clone(),
event_manager_sender,
system_bus.clone(),
global_state.clone(),
);
Expand Down
7 changes: 7 additions & 0 deletions mock-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::Future;
use gossip_server::{server::GossipServer, worker::GossipServerConfig};
use handshake_manager::{manager::HandshakeManager, worker::HandshakeManagerConfig};
use job_types::{
event_manager::{new_event_manager_queue, EventManagerQueue, EventManagerReceiver},
gossip_server::{
new_gossip_server_queue, GossipServerJob, GossipServerQueue, GossipServerReceiver,
},
Expand Down Expand Up @@ -117,6 +118,8 @@ pub struct MockNodeController {
price_queue: (PriceReporterQueue, DefaultOption<PriceReporterReceiver>),
/// The proof generation queue
proof_queue: (ProofManagerQueue, DefaultOption<ProofManagerReceiver>),
/// The event manager queue
event_queue: (EventManagerQueue, DefaultOption<EventManagerReceiver>),
/// The task manager queue
task_queue: (TaskDriverQueue, DefaultOption<TaskDriverReceiver>),
}
Expand All @@ -132,6 +135,7 @@ impl MockNodeController {
let (handshake_send, handshake_recv) = new_handshake_manager_queue();
let (price_sender, price_recv) = new_price_reporter_queue();
let (proof_gen_sender, proof_gen_recv) = new_proof_manager_queue();
let (event_sender, event_recv) = new_event_manager_queue();
let (task_sender, task_recv) = new_task_driver_queue();

Self {
Expand All @@ -146,6 +150,7 @@ impl MockNodeController {
handshake_queue: (handshake_send, default_option(handshake_recv)),
price_queue: (price_sender, default_option(price_recv)),
proof_queue: (proof_gen_sender, default_option(proof_gen_recv)),
event_queue: (event_sender, default_option(event_recv)),
task_queue: (task_sender, default_option(task_recv)),
}
}
Expand Down Expand Up @@ -294,6 +299,7 @@ impl MockNodeController {
self.arbitrum_client.clone().expect("Arbitrum client not initialized");
let network_queue = self.network_queue.0.clone();
let proof_queue = self.proof_queue.0.clone();
let event_queue = self.event_queue.0.clone();
let bus = self.bus.clone();
let state = self.state.clone().expect("State not initialized");

Expand All @@ -303,6 +309,7 @@ impl MockNodeController {
arbitrum_client,
network_queue,
proof_queue,
event_queue,
bus,
state,
);
Expand Down
16 changes: 8 additions & 8 deletions workers/job-types/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ pub struct MatchEvent {
pub execution_price: TimestampedPrice,
/// The match result
pub match_result: MatchResult,
/// The fees paid on the base asset in this match
pub base_fee_take: FeeTake,
/// The fees paid on the quote asset in this match
pub quote_fee_take: FeeTake,
/// The fees paid by the first party in this match
pub fee_take0: FeeTake,
/// The fees paid by the second party in this match
pub fee_take1: FeeTake,
}

/// An external match event
Expand All @@ -188,8 +188,8 @@ pub struct ExternalMatchEvent {
pub execution_price: TimestampedPrice,
/// The external match result
pub external_match_result: ExternalMatchResult,
/// The fees paid on the base asset in this match
pub base_fee_take: FeeTake,
/// The fees paid on the quote asset in this match
pub quote_fee_take: FeeTake,
/// The fees paid by the internal party
pub internal_fee_take: FeeTake,
/// The fees paid by the external party
pub external_fee_take: FeeTake,
}
3 changes: 3 additions & 0 deletions workers/task-driver/integration/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use constants::Scalar;
use ethers::{middleware::Middleware, types::Address};
use eyre::Result;
use job_types::{
event_manager::EventManagerQueue,
network_manager::NetworkManagerQueue,
proof_manager::ProofManagerQueue,
task_driver::{new_task_notification, TaskDriverJob, TaskDriverQueue, TaskDriverReceiver},
Expand Down Expand Up @@ -215,6 +216,7 @@ pub fn new_mock_task_driver(
arbitrum_client: ArbitrumClient,
network_queue: NetworkManagerQueue,
proof_queue: ProofManagerQueue,
event_queue: EventManagerQueue,
state: State,
) {
let bus = SystemBus::new();
Expand All @@ -234,6 +236,7 @@ pub fn new_mock_task_driver(
arbitrum_client,
network_queue,
proof_queue,
event_queue,
state,
};

Expand Down
10 changes: 10 additions & 0 deletions workers/task-driver/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use ethers::{
};
use helpers::new_mock_task_driver;
use job_types::{
event_manager::{new_event_manager_queue, EventManagerReceiver},
network_manager::{new_network_manager_queue, NetworkManagerReceiver},
proof_manager::{new_proof_manager_queue, ProofManagerJob},
task_driver::{new_task_driver_queue, TaskDriverQueue},
Expand Down Expand Up @@ -98,6 +99,10 @@ struct IntegrationTestArgs {
///
/// Held here to avoid closing the channel on `Drop`
_network_receiver: Arc<NetworkManagerReceiver>,
/// A receiver for the event manager's work queue
///
/// Held here to avoid closing the channel on `Drop`
_event_receiver: Arc<EventManagerReceiver>,
/// A reference to the global state of the mock proof manager
state: State,
/// The job queue for the mock proof manager
Expand Down Expand Up @@ -126,13 +131,17 @@ impl From<CliArgs> for IntegrationTestArgs {
let (task_queue, task_recv) = new_task_driver_queue();
let state = block_current(setup_global_state_mock(task_queue.clone()));

// Create a mock event sender and receiver
let (event_queue, event_receiver) = new_event_manager_queue();

// Start a task driver
new_mock_task_driver(
task_recv,
task_queue.clone(),
arbitrum_client.clone(),
network_sender,
proof_job_queue.clone(),
event_queue,
state.clone(),
);

Expand All @@ -156,6 +165,7 @@ impl From<CliArgs> for IntegrationTestArgs {
permit2_addr,
arbitrum_client,
_network_receiver: Arc::new(network_receiver),
_event_receiver: Arc::new(event_receiver),
proof_job_queue,
state,
task_queue,
Expand Down
1 change: 1 addition & 0 deletions workers/task-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl TaskExecutor {
arbitrum_client: config.arbitrum_client,
network_queue: config.network_queue,
proof_queue: config.proof_queue,
event_queue: config.event_queue,
task_queue: config.task_queue_sender,
state: config.state,
bus: config.system_bus.clone(),
Expand Down
23 changes: 22 additions & 1 deletion workers/task-driver/src/tasks/create_new_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use core::panic;
use std::error::Error;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::time::SystemTime;

use arbitrum_client::client::ArbitrumClient;
use async_trait::async_trait;
Expand All @@ -21,12 +22,15 @@ use circuits::zk_circuits::valid_wallet_create::{
use common::types::tasks::NewWalletTaskDescriptor;
use common::types::{proof_bundles::ValidWalletCreateBundle, wallet::Wallet};
use constants::Scalar;
use job_types::event_manager::{EventManagerQueue, RelayerEvent, WalletCreationEvent};
use job_types::proof_manager::{ProofJob, ProofManagerQueue};
use renegade_metrics::labels::NUM_NEW_WALLETS_METRIC;
use serde::Serialize;
use state::error::StateError;
use state::State;
use tracing::instrument;
use util::err_str;
use uuid::Uuid;

use crate::task_state::StateWrapper;
use crate::traits::{Task, TaskContext, TaskError, TaskState};
Expand Down Expand Up @@ -161,6 +165,8 @@ pub struct NewWalletTask {
pub proof_manager_work_queue: ProofManagerQueue,
/// The state of the task's execution
pub task_state: NewWalletTaskState,
/// The event manager queue
pub event_queue: EventManagerQueue,
}

// -----------------------
Expand All @@ -182,6 +188,7 @@ impl Task for NewWalletTask {
global_state: ctx.state,
proof_manager_work_queue: ctx.proof_queue,
task_state: NewWalletTaskState::Pending, // Initialize to the initial state
event_queue: ctx.event_queue,
})
}

Expand All @@ -207,8 +214,10 @@ impl Task for NewWalletTask {
// Find the authentication path via contract events, and index this
// in the global state
self.find_merkle_path().await?;
self.task_state = NewWalletTaskState::Completed;
self.emit_event()?;
metrics::counter!(NUM_NEW_WALLETS_METRIC).increment(1);

self.task_state = NewWalletTaskState::Completed;
},
NewWalletTaskState::Completed => {
panic!("step() called in completed state")
Expand Down Expand Up @@ -301,4 +310,16 @@ impl NewWalletTask {
},
)
}

/// Emit a wallet creation event to the event manager
fn emit_event(&self) -> Result<(), NewWalletTaskError> {
let event = RelayerEvent::WalletCreation(WalletCreationEvent {
event_id: Uuid::new_v4(),
event_timestamp: SystemTime::now(),
wallet_id: self.wallet.wallet_id,
symmetric_key: self.wallet.key_chain.symmetric_key().to_base64_string(),
});

self.event_queue.send(event).map_err(err_str!(NewWalletTaskError::SendMessage))
}
}
46 changes: 45 additions & 1 deletion workers/task-driver/src/tasks/settle_match_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::error::Error;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::time::Duration;
use std::time::{Duration, SystemTime};

use crate::task_state::StateWrapper;
use crate::tasks::ERR_AWAITING_PROOF;
Expand All @@ -26,6 +26,7 @@ use common::types::tasks::SettleExternalMatchTaskDescriptor;
use common::types::wallet::{OrderIdentifier, WalletIdentifier};
use common::types::TimestampedPrice;
use external_api::bus_message::SystemBusMessage;
use job_types::event_manager::{EventManagerQueue, ExternalMatchEvent, RelayerEvent};
use job_types::proof_manager::{ProofJob, ProofManagerQueue};
use serde::Serialize;
use state::error::StateError;
Expand All @@ -34,6 +35,7 @@ use system_bus::SystemBus;
use tracing::{info, instrument, warn};
use util::arbitrum::get_protocol_fee;
use util::matching_engine::{apply_match_to_shares, compute_fee_obligation};
use uuid::Uuid;

use super::ERR_NO_VALIDITY_PROOF;

Expand Down Expand Up @@ -118,6 +120,8 @@ pub enum SettleMatchExternalTaskError {
ProofLinking(String),
/// An error interacting with the relayer state
State(String),
/// An error sending an event to the event manager
SendEvent(String),
}

impl SettleMatchExternalTaskError {
Expand All @@ -144,6 +148,12 @@ impl SettleMatchExternalTaskError {
pub fn state<T: ToString>(msg: T) -> Self {
Self::State(msg.to_string())
}

/// Construct a `SendEvent` error
#[allow(clippy::needless_pass_by_value)]
pub fn send_event<T: ToString>(msg: T) -> Self {
Self::SendEvent(msg.to_string())
}
}

impl TaskError for SettleMatchExternalTaskError {
Expand Down Expand Up @@ -203,6 +213,8 @@ pub struct SettleMatchExternalTask {
proof_queue: ProofManagerQueue,
/// The state of the task
task_state: SettleMatchExternalTaskState,
/// The event queue to send events to
event_queue: EventManagerQueue,
}

#[async_trait]
Expand Down Expand Up @@ -242,6 +254,7 @@ impl Task for SettleMatchExternalTask {
proof_queue: ctx.proof_queue,
bus: ctx.bus,
task_state: SettleMatchExternalTaskState::Pending,
event_queue: ctx.event_queue,
})
}

Expand Down Expand Up @@ -280,6 +293,7 @@ impl Task for SettleMatchExternalTask {

SettleMatchExternalTaskState::RefreshingWallet => {
self.refresh_wallet().await?;
self.emit_event()?;
self.task_state = SettleMatchExternalTaskState::Completed
},

Expand Down Expand Up @@ -494,4 +508,34 @@ impl SettleMatchExternalTask {
renegade_metrics::record_match_volume(match_res, true /* is_external_match */);
Ok(())
}

/// Emit an external match event to the event manager
fn emit_event(&self) -> Result<(), SettleMatchExternalTaskError> {
let commitments_witness = &self.internal_order_validity_witness.commitment_witness;
let internal_party_order_side = commitments_witness.order.side;
let relayer_fee = commitments_witness.relayer_fee;

let internal_fee_take =
compute_fee_obligation(relayer_fee, internal_party_order_side, &self.match_res);
let external_fee_take = compute_fee_obligation(
FixedPoint::default(),
internal_party_order_side.opposite(),
&self.match_res,
);

let external_match_result = self.match_res.clone().into();

let event = RelayerEvent::ExternalMatch(ExternalMatchEvent {
event_id: Uuid::new_v4(),
event_timestamp: SystemTime::now(),
internal_wallet_id: self.internal_wallet_id,
internal_order_id: self.internal_order_id,
execution_price: self.execution_price,
external_match_result,
internal_fee_take,
external_fee_take,
});

self.event_queue.send(event).map_err(SettleMatchExternalTaskError::send_event)
}
}
Loading

0 comments on commit 64eff84

Please sign in to comment.