Skip to content

Commit

Permalink
node-support: event-export-sidecar: create event export sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillo committed Dec 14, 2024
1 parent 5e200f5 commit c4f2584
Show file tree
Hide file tree
Showing 13 changed files with 366 additions and 35 deletions.
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"gossip-api",
"mock-node",
"node-support/snapshot-sidecar",
"node-support/event-export-sidecar",
"node-support/bootloader",
"renegade-crypto",
"state",
Expand Down
16 changes: 8 additions & 8 deletions config/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ pub struct Cli {
/// https://github.com/renegade-fi/relayer-extensions/tree/master/compliance/compliance-api
#[clap(long, value_parser)]
pub compliance_service_url: Option<String>,
/// The URL to export relayer events to.
/// If ommitted, the event manager is disabled.
#[clap(long, value_parser)]
pub event_export_url: Option<String>,

// ----------------------------
// | Networking Configuration |
Expand Down Expand Up @@ -175,10 +179,6 @@ pub struct Cli {
// TODO: Unset default `true` once event export implementation is complete
#[clap(long, value_parser, default_value = "true")]
pub record_historical_state: bool,
/// The address to export relayer events to, in multiaddr format.
/// If ommitted, the event manager is disabled.
#[clap(long, value_parser)]
pub event_export_addr: Option<String>,
/// The maximum number of wallet operations a user is allowed to perform per hour
///
/// Defaults to 500
Expand Down Expand Up @@ -306,6 +306,9 @@ pub struct RelayerConfig {
/// The API of the compliance service must match that defined here:
/// https://github.com/renegade-fi/relayer-extensions/tree/master/compliance/compliance-api
pub compliance_service_url: Option<String>,
/// The URL to export relayer events to.
/// If ommitted, the event manager is disabled.
pub event_export_url: Option<Url>,

// ----------------------------
// | Networking Configuration |
Expand Down Expand Up @@ -357,9 +360,6 @@ pub struct RelayerConfig {
pub raft_snapshot_path: String,
/// Whether to record historical state locally
pub record_historical_state: bool,
/// The address to export relayer events to, in multiaddr format.
/// If ommitted, the event manager is disabled.
pub event_export_addr: Option<Multiaddr>,
/// The maximum number of wallet operations a user is allowed to perform per
/// hour
pub wallet_task_rate_limit: u32,
Expand Down Expand Up @@ -485,7 +485,7 @@ impl Clone for RelayerConfig {
db_path: self.db_path.clone(),
raft_snapshot_path: self.raft_snapshot_path.clone(),
record_historical_state: self.record_historical_state,
event_export_addr: self.event_export_addr.clone(),
event_export_url: self.event_export_url.clone(),
wallet_task_rate_limit: self.wallet_task_rate_limit,
min_transfer_amount: self.min_transfer_amount,
max_merkle_staleness: self.max_merkle_staleness,
Expand Down
9 changes: 4 additions & 5 deletions config/src/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ pub(crate) fn parse_config_from_args(cli_args: Cli) -> Result<RelayerConfig, Str
parsed_bootstrap_addrs.push((WrappedPeerId(peer_id), parsed_addr));
}

// Parse the event export address, if there is one
let event_export_addr: Option<Multiaddr> = cli_args
.event_export_addr
.map(|addr| addr.parse().expect("Invalid address passed as --event-export-addr"));
// Parse the event export URL, if there is one
let event_export_url: Option<Url> =
cli_args.event_export_url.map(|url| url.parse().expect("Invalid event export URL"));

// Parse the price reporter URL, if there is one
let price_reporter_url = cli_args
Expand Down Expand Up @@ -130,7 +129,7 @@ pub(crate) fn parse_config_from_args(cli_args: Cli) -> Result<RelayerConfig, Str
db_path: cli_args.db_path,
raft_snapshot_path: cli_args.raft_snapshot_path,
record_historical_state: cli_args.record_historical_state,
event_export_addr,
event_export_url,
wallet_task_rate_limit: cli_args.wallet_task_rate_limit,
min_transfer_amount: cli_args.min_transfer_amount,
bind_addr: cli_args.bind_addr,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async fn main() -> Result<(), CoordinatorError> {
// Start the event manager
let (event_manager_cancel_sender, event_manager_cancel_receiver) = new_cancel_channel();
let mut event_manager = EventManager::new(EventManagerConfig {
event_export_addr: args.event_export_addr,
event_export_url: args.event_export_url,
event_queue: event_manager_receiver,
cancel_channel: event_manager_cancel_receiver,
})
Expand Down
26 changes: 26 additions & 0 deletions node-support/event-export-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "event-export-sidecar"
version = "0.1.0"
edition = "2021"

[dependencies]
# === Async + Runtime === #
tokio = { workspace = true, features = ["full"] }

# === Networking Dependencies === #
reqwest = { version = "0.11", features = ["json"] }

# === Workspace Dependencies === #
common = { path = "../../common" }
config = { path = "../../config" }
external-api = { path = "../../external-api", features = ["auth"] }
job-types = { path = "../../workers/job-types" }
event-manager = { path = "../../workers/event-manager" }

# === Misc Dependencies === #
url = "2.4"
clap = { version = "4", features = ["derive"] }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
eyre = { workspace = true }
111 changes: 111 additions & 0 deletions node-support/event-export-sidecar/src/event_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//! A managed Unix listener that removes the socket file when dropped
use std::{fs, io, path::Path};

use event_manager::manager::extract_unix_socket_path;
use eyre::{eyre, Error};
use job_types::event_manager::RelayerEvent;
use tokio::net::{UnixListener, UnixStream};
use tracing::{error, info, warn};
use url::Url;

use crate::hse_client::HseClient;

// -------------
// | Constants |
// -------------

/// The maximum message size to read from the event export socket
const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB

// ----------------
// | Event Socket |
// ----------------

/// A managed Unix socket that listens for events on a given path
/// and submits them to the historical state engine.
///
/// The socket file is removed when the socket is dropped.
pub struct EventSocket {
/// The underlying Unix socket
socket: UnixStream,

/// The path to the Unix socket
path: String,

/// The historical state engine client
hse_client: HseClient,
}

impl EventSocket {
/// Creates a new event socket from the given URL
pub async fn new(url: &Url, hse_client: HseClient) -> Result<Self, Error> {
let path = extract_unix_socket_path(url)?;
let socket = Self::establish_socket_connection(&path).await?;
Ok(Self { socket, path, hse_client })
}

/// Sets up a Unix socket listening on the given path
/// and awaits a single connection on it
async fn establish_socket_connection(path: &str) -> Result<UnixStream, Error> {
let listener = UnixListener::bind(Path::new(path))?;

// We only expect one connection, so we can just block on it
info!("Waiting for event export socket connection...");
match listener.accept().await {
Ok((socket, _)) => Ok(socket),
Err(e) => Err(eyre!("error accepting Unix socket connection: {e}")),
}
}

/// Listens for events on the socket and submits them to the historical
/// state engine
pub async fn listen_for_events(&self) -> Result<(), Error> {
loop {
// Wait for the socket to be readable
self.socket.readable().await?;

let mut buf = [0; MAX_MESSAGE_SIZE];

// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match self.socket.try_read(&mut buf) {
Ok(0) => {
warn!("Event export socket closed");
return Ok(());
},
Ok(n) => {
let msg = &buf[..n];
if let Err(e) = self.handle_relayer_event(msg).await {
// Events that fail to be submitted are effectively dropped here.
// We can consider retry logic or a local dead-letter queue, but
// for now we keep things simple.
error!("Error handling relayer event: {e}");
}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
},
Err(e) => {
return Err(e.into());
},
}
}
}

/// Handles an event received from the event export socket
async fn handle_relayer_event(&self, msg: &[u8]) -> Result<(), Error> {
let event = serde_json::from_slice::<RelayerEvent>(msg)?;
self.hse_client.submit_event(&event).await?;

Ok(())
}
}

impl Drop for EventSocket {
fn drop(&mut self) {
if let Err(e) = fs::remove_file(&self.path) {
warn!("Failed to remove Unix socket file: {}", e);
}
}
}
99 changes: 99 additions & 0 deletions node-support/event-export-sidecar/src/hse_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! A client for the historical state engine
use std::time::Duration;

use common::types::wallet::keychain::HmacKey;
use external_api::auth::add_expiring_auth_to_headers;
use eyre::{eyre, Error};
use job_types::event_manager::RelayerEvent;
use reqwest::{header::HeaderMap, Client, Method, Response};
use serde::Serialize;

// -------------
// | Constants |
// -------------

/// The buffer to add to the expiration timestamp for the signature
const SIG_EXPIRATION_BUFFER_MS: u64 = 5_000; // 5 seconds

/// The path to submit events to
const EVENT_SUBMISSION_PATH: &str = "/event";

// ----------
// | Client |
// ----------

/// A client for the historical state engine
pub struct HseClient {
/// The base URL of the historical state engine
base_url: String,
/// The auth key for the historical state engine
auth_key: HmacKey,
}

impl HseClient {
/// Create a new historical state engine client
pub fn new(base_url: String, auth_key: HmacKey) -> Self {
Self { base_url, auth_key }
}

/// Submit an event to the historical state engine
pub async fn submit_event(&self, event: &RelayerEvent) -> Result<(), Error> {
send_authenticated_request(
&format!("{}{}", self.base_url, EVENT_SUBMISSION_PATH),
EVENT_SUBMISSION_PATH,
Method::POST,
event,
&self.auth_key,
)
.await
.map(|_| ())
}
}

// -----------
// | Helpers |
// -----------

/// Send a request w/ an expiring auth header
async fn send_authenticated_request<Req: Serialize>(
url: &str,
path: &str,
method: Method,
body: &Req,
key: &HmacKey,
) -> Result<Response, Error> {
let expiration = Duration::from_millis(SIG_EXPIRATION_BUFFER_MS);

let body_bytes = serde_json::to_vec(body).expect("failed to serialize request body");

let mut headers = HeaderMap::new();
add_expiring_auth_to_headers(path, &mut headers, &body_bytes, key, expiration);

let route = format!("{}{}", url, path);
let response = send_request(&route, method, body, headers).await?;
Ok(response)
}

/// Send a basic HTTP request
async fn send_request<Req: Serialize>(
route: &str,
method: Method,
body: &Req,
headers: HeaderMap,
) -> Result<Response, Error> {
let response = Client::new()
.request(method, route)
.headers(headers)
.json(body)
.send()
.await
.map_err(|e| eyre!("Failed to send request: {e}"))?;

// Check if the request was successful
if !response.status().is_success() {
return Err(eyre!("Request failed with status: {}", response.status()));
}

Ok(response)
}
Loading

0 comments on commit c4f2584

Please sign in to comment.