diff --git a/Cargo.lock b/Cargo.lock index bf26c94b7..00505d5ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3555,6 +3555,25 @@ dependencies = [ "yansi", ] +[[package]] +name = "event-export-sidecar" +version = "0.1.0" +dependencies = [ + "clap 4.5.21", + "common 0.1.0", + "config", + "event-manager", + "external-api 0.1.0", + "eyre", + "job-types", + "reqwest 0.11.27", + "serde", + "serde_json", + "tokio", + "tracing", + "url", +] + [[package]] name = "event-listener" version = "5.3.1" @@ -3585,10 +3604,11 @@ dependencies = [ "common 0.1.0", "constants 0.1.0", "job-types", - "libp2p", "serde_json", + "thiserror", "tokio", "tracing", + "url", "util 0.1.0", ] diff --git a/Cargo.toml b/Cargo.toml index 4ffabc62b..6f93fda7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "gossip-api", "mock-node", "node-support/snapshot-sidecar", + "node-support/event-export-sidecar", "node-support/bootloader", "renegade-crypto", "state", diff --git a/config/src/cli.rs b/config/src/cli.rs index cefc55881..e6d0f8668 100644 --- a/config/src/cli.rs +++ b/config/src/cli.rs @@ -99,6 +99,10 @@ pub struct Cli { /// https://github.com/renegade-fi/relayer-extensions/tree/master/compliance/compliance-api #[clap(long, value_parser)] pub compliance_service_url: Option, + /// The URL to export relayer events to. + /// If ommitted, the event manager is disabled. + #[clap(long, value_parser)] + pub event_export_url: Option, // ---------------------------- // | Networking Configuration | @@ -175,10 +179,6 @@ pub struct Cli { // TODO: Unset default `true` once event export implementation is complete #[clap(long, value_parser, default_value = "true")] pub record_historical_state: bool, - /// The address to export relayer events to, in multiaddr format. - /// If ommitted, the event manager is disabled. - #[clap(long, value_parser)] - pub event_export_addr: Option, /// The maximum number of wallet operations a user is allowed to perform per hour /// /// Defaults to 500 @@ -306,6 +306,9 @@ pub struct RelayerConfig { /// The API of the compliance service must match that defined here: /// https://github.com/renegade-fi/relayer-extensions/tree/master/compliance/compliance-api pub compliance_service_url: Option, + /// The URL to export relayer events to. + /// If ommitted, the event manager is disabled. + pub event_export_url: Option, // ---------------------------- // | Networking Configuration | @@ -357,9 +360,6 @@ pub struct RelayerConfig { pub raft_snapshot_path: String, /// Whether to record historical state locally pub record_historical_state: bool, - /// The address to export relayer events to, in multiaddr format. - /// If ommitted, the event manager is disabled. - pub event_export_addr: Option, /// The maximum number of wallet operations a user is allowed to perform per /// hour pub wallet_task_rate_limit: u32, @@ -485,7 +485,7 @@ impl Clone for RelayerConfig { db_path: self.db_path.clone(), raft_snapshot_path: self.raft_snapshot_path.clone(), record_historical_state: self.record_historical_state, - event_export_addr: self.event_export_addr.clone(), + event_export_url: self.event_export_url.clone(), wallet_task_rate_limit: self.wallet_task_rate_limit, min_transfer_amount: self.min_transfer_amount, max_merkle_staleness: self.max_merkle_staleness, diff --git a/config/src/parsing.rs b/config/src/parsing.rs index bff2ec7b9..a36e0198d 100644 --- a/config/src/parsing.rs +++ b/config/src/parsing.rs @@ -96,10 +96,9 @@ pub(crate) fn parse_config_from_args(cli_args: Cli) -> Result = cli_args - .event_export_addr - .map(|addr| addr.parse().expect("Invalid address passed as --event-export-addr")); + // Parse the event export URL, if there is one + let event_export_url: Option = + cli_args.event_export_url.map(|url| url.parse().expect("Invalid event export URL")); // Parse the price reporter URL, if there is one let price_reporter_url = cli_args @@ -130,7 +129,7 @@ pub(crate) fn parse_config_from_args(cli_args: Cli) -> Result Result<(), CoordinatorError> { // Start the event manager let (event_manager_cancel_sender, event_manager_cancel_receiver) = new_cancel_channel(); let mut event_manager = EventManager::new(EventManagerConfig { - event_export_addr: args.event_export_addr, + event_export_url: args.event_export_url, event_queue: event_manager_receiver, cancel_channel: event_manager_cancel_receiver, }) diff --git a/node-support/event-export-sidecar/Cargo.toml b/node-support/event-export-sidecar/Cargo.toml new file mode 100644 index 000000000..23ef6ae96 --- /dev/null +++ b/node-support/event-export-sidecar/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "event-export-sidecar" +version = "0.1.0" +edition = "2021" + +[dependencies] +# === Async + Runtime === # +tokio = { workspace = true, features = ["full"] } + +# === Networking Dependencies === # +reqwest = { version = "0.11", features = ["json"] } + +# === Workspace Dependencies === # +common = { path = "../../common" } +config = { path = "../../config" } +external-api = { path = "../../external-api", features = ["auth"] } +job-types = { path = "../../workers/job-types" } +event-manager = { path = "../../workers/event-manager" } + +# === Misc Dependencies === # +url = "2.4" +clap = { version = "4", features = ["derive"] } +tracing = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +eyre = { workspace = true } diff --git a/node-support/event-export-sidecar/src/event_socket.rs b/node-support/event-export-sidecar/src/event_socket.rs new file mode 100644 index 000000000..1669b886c --- /dev/null +++ b/node-support/event-export-sidecar/src/event_socket.rs @@ -0,0 +1,111 @@ +//! A managed Unix listener that removes the socket file when dropped + +use std::{fs, io, path::Path}; + +use event_manager::manager::extract_unix_socket_path; +use eyre::{eyre, Error}; +use job_types::event_manager::RelayerEvent; +use tokio::net::{UnixListener, UnixStream}; +use tracing::{error, info, warn}; +use url::Url; + +use crate::hse_client::HseClient; + +// ------------- +// | Constants | +// ------------- + +/// The maximum message size to read from the event export socket +const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB + +// ---------------- +// | Event Socket | +// ---------------- + +/// A managed Unix socket that listens for events on a given path +/// and submits them to the historical state engine. +/// +/// The socket file is removed when the socket is dropped. +pub struct EventSocket { + /// The underlying Unix socket + socket: UnixStream, + + /// The path to the Unix socket + path: String, + + /// The historical state engine client + hse_client: HseClient, +} + +impl EventSocket { + /// Creates a new event socket from the given URL + pub async fn new(url: &Url, hse_client: HseClient) -> Result { + let path = extract_unix_socket_path(url)?; + let socket = Self::establish_socket_connection(&path).await?; + Ok(Self { socket, path, hse_client }) + } + + /// Sets up a Unix socket listening on the given path + /// and awaits a single connection on it + async fn establish_socket_connection(path: &str) -> Result { + let listener = UnixListener::bind(Path::new(path))?; + + // We only expect one connection, so we can just block on it + info!("Waiting for event export socket connection..."); + match listener.accept().await { + Ok((socket, _)) => Ok(socket), + Err(e) => Err(eyre!("error accepting Unix socket connection: {e}")), + } + } + + /// Listens for events on the socket and submits them to the historical + /// state engine + pub async fn listen_for_events(&self) -> Result<(), Error> { + loop { + // Wait for the socket to be readable + self.socket.readable().await?; + + let mut buf = [0; MAX_MESSAGE_SIZE]; + + // Try to read data, this may still fail with `WouldBlock` + // if the readiness event is a false positive. + match self.socket.try_read(&mut buf) { + Ok(0) => { + warn!("Event export socket closed"); + return Ok(()); + }, + Ok(n) => { + let msg = &buf[..n]; + if let Err(e) = self.handle_relayer_event(msg).await { + // Events that fail to be submitted are effectively dropped here. + // We can consider retry logic or a local dead-letter queue, but + // for now we keep things simple. + error!("Error handling relayer event: {e}"); + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + }, + Err(e) => { + return Err(e.into()); + }, + } + } + } + + /// Handles an event received from the event export socket + async fn handle_relayer_event(&self, msg: &[u8]) -> Result<(), Error> { + let event = serde_json::from_slice::(msg)?; + self.hse_client.submit_event(&event).await?; + + Ok(()) + } +} + +impl Drop for EventSocket { + fn drop(&mut self) { + if let Err(e) = fs::remove_file(&self.path) { + warn!("Failed to remove Unix socket file: {}", e); + } + } +} diff --git a/node-support/event-export-sidecar/src/hse_client.rs b/node-support/event-export-sidecar/src/hse_client.rs new file mode 100644 index 000000000..369db92b4 --- /dev/null +++ b/node-support/event-export-sidecar/src/hse_client.rs @@ -0,0 +1,99 @@ +//! A client for the historical state engine + +use std::time::Duration; + +use common::types::wallet::keychain::HmacKey; +use external_api::auth::add_expiring_auth_to_headers; +use eyre::{eyre, Error}; +use job_types::event_manager::RelayerEvent; +use reqwest::{header::HeaderMap, Client, Method, Response}; +use serde::Serialize; + +// ------------- +// | Constants | +// ------------- + +/// The buffer to add to the expiration timestamp for the signature +const SIG_EXPIRATION_BUFFER_MS: u64 = 5_000; // 5 seconds + +/// The path to submit events to +const EVENT_SUBMISSION_PATH: &str = "/event"; + +// ---------- +// | Client | +// ---------- + +/// A client for the historical state engine +pub struct HseClient { + /// The base URL of the historical state engine + base_url: String, + /// The auth key for the historical state engine + auth_key: HmacKey, +} + +impl HseClient { + /// Create a new historical state engine client + pub fn new(base_url: String, auth_key: HmacKey) -> Self { + Self { base_url, auth_key } + } + + /// Submit an event to the historical state engine + pub async fn submit_event(&self, event: &RelayerEvent) -> Result<(), Error> { + send_authenticated_request( + &format!("{}{}", self.base_url, EVENT_SUBMISSION_PATH), + EVENT_SUBMISSION_PATH, + Method::POST, + event, + &self.auth_key, + ) + .await + .map(|_| ()) + } +} + +// ----------- +// | Helpers | +// ----------- + +/// Send a request w/ an expiring auth header +async fn send_authenticated_request( + url: &str, + path: &str, + method: Method, + body: &Req, + key: &HmacKey, +) -> Result { + let expiration = Duration::from_millis(SIG_EXPIRATION_BUFFER_MS); + + let body_bytes = serde_json::to_vec(body).expect("failed to serialize request body"); + + let mut headers = HeaderMap::new(); + add_expiring_auth_to_headers(path, &mut headers, &body_bytes, key, expiration); + + let route = format!("{}{}", url, path); + let response = send_request(&route, method, body, headers).await?; + Ok(response) +} + +/// Send a basic HTTP request +async fn send_request( + route: &str, + method: Method, + body: &Req, + headers: HeaderMap, +) -> Result { + let response = Client::new() + .request(method, route) + .headers(headers) + .json(body) + .send() + .await + .map_err(|e| eyre!("Failed to send request: {e}"))?; + + // Check if the request was successful + if !response.status().is_success() { + return Err(eyre!("Request failed with status: {}", response.status())); + } + + Ok(response) +} diff --git a/node-support/event-export-sidecar/src/main.rs b/node-support/event-export-sidecar/src/main.rs new file mode 100644 index 000000000..549ce5896 --- /dev/null +++ b/node-support/event-export-sidecar/src/main.rs @@ -0,0 +1,63 @@ +//! A sidecar process that re-exports relayer events for historical state +//! persistence + +#![allow(incomplete_features)] +#![deny(missing_docs)] +#![deny(unsafe_code)] +#![deny(clippy::missing_docs_in_private_items)] +#![deny(clippy::needless_pass_by_value)] +#![deny(clippy::needless_pass_by_ref_mut)] + +mod event_socket; +mod hse_client; + +use clap::Parser; +use common::types::wallet::keychain::HmacKey; +use config::parsing::parse_config_from_file; +use event_socket::EventSocket; +use eyre::Error; +use hse_client::HseClient; +use tracing::{info, warn}; + +// ------- +// | CLI | +// ------- + +/// The event export sidecar CLI +#[derive(Debug, Parser)] +struct Cli { + /// The path to the relayer's config + #[clap(long)] + config_path: String, + /// The historical state engine URL + #[clap(long)] + hse_url: String, + /// The historical state engine auth key, in base64 format + #[clap(long)] + hse_key: String, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // Parse CLI & config + let cli = Cli::parse(); + let relayer_config = + parse_config_from_file(&cli.config_path).expect("could not parse relayer config"); + relayer_config.configure_telemetry().expect("failed to configure telemetry"); + + if relayer_config.event_export_url.is_none() { + warn!("Event export disabled, not creating event sidecar"); + return Ok(()); + } + + // Construct HSE client + let hse_key = HmacKey::from_base64_string(&cli.hse_key).expect("invalid hse key"); + let hse_client = HseClient::new(cli.hse_url, hse_key); + + let event_socket = + EventSocket::new(&relayer_config.event_export_url.unwrap(), hse_client).await?; + + info!("Event export sidecar connected to socket, awaiting events..."); + + event_socket.listen_for_events().await +} diff --git a/workers/event-manager/Cargo.toml b/workers/event-manager/Cargo.toml index 6b037da26..20cac70d4 100644 --- a/workers/event-manager/Cargo.toml +++ b/workers/event-manager/Cargo.toml @@ -8,9 +8,6 @@ edition = "2021" async-trait = { workspace = true } tokio = { workspace = true } -# === Networking === # -libp2p = { workspace = true } - # === Workspace Dependencies === # common = { path = "../../common" } constants = { path = "../../constants" } @@ -19,5 +16,7 @@ job-types = { path = "../job-types" } util = { path = "../../util" } # === Misc Dependencies === # +url = "2.4" tracing = { workspace = true } serde_json = { workspace = true } +thiserror = { version = "1.0.61" } diff --git a/workers/event-manager/src/error.rs b/workers/event-manager/src/error.rs index b70ff68ee..8a43e5625 100644 --- a/workers/event-manager/src/error.rs +++ b/workers/event-manager/src/error.rs @@ -1,18 +1,26 @@ //! Defines errors for the event manager +use thiserror::Error; + /// An error that occurred in the event manager -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Error)] pub enum EventManagerError { /// The event manager was cancelled + #[error("event manager cancelled: {0}")] Cancelled(String), /// The event export address is invalid + #[error("invalid event export address: {0}")] InvalidEventExportAddr(String), /// An error occurred while connecting to the event export socket + #[error("error connecting to event export socket: {0}")] SocketConnection(String), /// An error occurred while serializing an event + #[error("error serializing event: {0}")] Serialize(String), /// An error occurred while writing to the event export socket + #[error("error writing to event export socket: {0}")] SocketWrite(String), /// An error occurred while setting up the event manager + #[error("error setting up event manager: {0}")] SetupError(String), } diff --git a/workers/event-manager/src/manager.rs b/workers/event-manager/src/manager.rs index 933351653..cf79a8f64 100644 --- a/workers/event-manager/src/manager.rs +++ b/workers/event-manager/src/manager.rs @@ -6,9 +6,9 @@ use std::{path::Path, thread::JoinHandle}; use common::types::CancelChannel; use constants::in_bootstrap_mode; use job_types::event_manager::EventManagerReceiver; -use libp2p::{multiaddr::Protocol, Multiaddr}; use tokio::{io::AsyncWriteExt, net::UnixStream}; use tracing::{info, warn}; +use url::Url; use util::{err_str, runtime::sleep_forever_async}; use crate::{error::EventManagerError, worker::EventManagerConfig}; @@ -17,6 +17,9 @@ use crate::{error::EventManagerError, worker::EventManagerConfig}; // | Constants | // ------------- +/// The Unix socket URL scheme +const UNIX_SCHEME: &str = "unix"; + /// The error message for when the event export address is not a Unix socket const ERR_NON_UNIX_EVENT_EXPORT_ADDRESS: &str = "Only Unix socket event export addresses are currently supported"; @@ -38,7 +41,7 @@ pub struct EventManagerExecutor { /// The channel on which to receive events event_queue: EventManagerReceiver, /// The address to export relayer events to - event_export_addr: Option, + event_export_addr: Option, /// The channel on which the coordinator may cancel event manager execution cancel_channel: CancelChannel, } @@ -46,7 +49,8 @@ pub struct EventManagerExecutor { impl EventManagerExecutor { /// Constructs a new event manager executor pub fn new(config: EventManagerConfig) -> Self { - let EventManagerConfig { event_queue, event_export_addr, cancel_channel } = config; + let EventManagerConfig { event_queue, event_export_url: event_export_addr, cancel_channel } = + config; Self { event_queue, event_export_addr, cancel_channel } } @@ -59,16 +63,7 @@ impl EventManagerExecutor { return Ok(None); } - let mut event_export_addr = self.event_export_addr.take().unwrap(); - let unix_path = - match event_export_addr.pop().expect("event export address must not be empty") { - Protocol::Unix(path) => path.to_string(), - _ => { - return Err(EventManagerError::InvalidEventExportAddr( - ERR_NON_UNIX_EVENT_EXPORT_ADDRESS.to_string(), - )) - }, - }; + let unix_path = extract_unix_socket_path(&self.event_export_addr.take().unwrap())?; let socket = UnixStream::connect(Path::new(&unix_path)) .await @@ -109,3 +104,13 @@ impl EventManagerExecutor { } } } + +/// Extracts a Unix socket path from the event export URL +pub fn extract_unix_socket_path(event_export_url: &Url) -> Result { + match event_export_url.scheme() { + UNIX_SCHEME => Ok(event_export_url.path().to_string()), + _ => Err(EventManagerError::InvalidEventExportAddr( + ERR_NON_UNIX_EVENT_EXPORT_ADDRESS.to_string(), + )), + } +} diff --git a/workers/event-manager/src/worker.rs b/workers/event-manager/src/worker.rs index 7f404b1e9..e3036ffab 100644 --- a/workers/event-manager/src/worker.rs +++ b/workers/event-manager/src/worker.rs @@ -5,9 +5,9 @@ use std::thread::{Builder, JoinHandle}; use async_trait::async_trait; use common::{types::CancelChannel, worker::Worker}; use job_types::event_manager::EventManagerReceiver; -use libp2p::Multiaddr; use tokio::runtime::Builder as RuntimeBuilder; use tracing::info; +use url::Url; use util::err_str; use crate::{ @@ -31,8 +31,8 @@ const EVENT_MANAGER_N_THREADS: usize = 1; /// The configuration for the event manager pub struct EventManagerConfig { - /// The address to export relayer events to - pub event_export_addr: Option, + /// The URL to export relayer events to + pub event_export_url: Option, /// The queue on which to receive events pub event_queue: EventManagerReceiver, /// The channel on which the coordinator may mandate that the