From 597780713c482f117b0b51058df83c38fd183850 Mon Sep 17 00:00:00 2001 From: Andrew Kirillov Date: Fri, 20 Dec 2024 15:08:47 -0800 Subject: [PATCH] workers: event-manager, node-support: event-export-sidecar: implement framed unix socket --- Cargo.lock | 4 ++ node-support/event-export-sidecar/Cargo.toml | 2 + .../event-export-sidecar/src/event_socket.rs | 62 +++++++------------ node-support/event-export-sidecar/src/main.rs | 2 +- workers/event-manager/Cargo.toml | 2 + workers/event-manager/src/manager.rs | 22 +++++-- .../src/tasks/settle_match_internal.rs | 6 +- 7 files changed, 54 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86c946a4..2e004218 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3599,6 +3599,8 @@ dependencies = [ "job-types", "serde_json", "tokio", + "tokio-stream", + "tokio-util 0.7.12", "tracing", "url", ] @@ -3632,10 +3634,12 @@ dependencies = [ "circuit-types 0.1.0", "common 0.1.0", "constants 0.1.0", + "futures", "job-types", "serde_json", "thiserror", "tokio", + "tokio-util 0.7.12", "tracing", "url", "util 0.1.0", diff --git a/node-support/event-export-sidecar/Cargo.toml b/node-support/event-export-sidecar/Cargo.toml index 069158c7..92330c2c 100644 --- a/node-support/event-export-sidecar/Cargo.toml +++ b/node-support/event-export-sidecar/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] # === Async + Runtime Dependencies === # tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1" # === AWS Dependencies === # aws-config = { version = "1.1.4", features = ["behavior-version-latest"] } @@ -22,3 +23,4 @@ clap = { version = "4", features = ["derive"] } tracing = { workspace = true } eyre = { workspace = true } serde_json = { workspace = true } +tokio-util = "0.7" diff --git a/node-support/event-export-sidecar/src/event_socket.rs b/node-support/event-export-sidecar/src/event_socket.rs index 6314a62a..44834a92 100644 --- a/node-support/event-export-sidecar/src/event_socket.rs +++ b/node-support/event-export-sidecar/src/event_socket.rs @@ -1,6 +1,6 @@ //! A managed Unix listener that removes the socket file when dropped -use std::{fs, io, path::Path}; +use std::{fs, path::Path}; use aws_config::Region; use aws_sdk_sqs::Client as SqsClient; @@ -8,15 +8,17 @@ use event_manager::manager::extract_unix_socket_path; use eyre::{eyre, Error}; use job_types::event_manager::RelayerEvent; use tokio::net::{UnixListener, UnixStream}; +use tokio_stream::StreamExt; +use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use tracing::{error, info, warn}; use url::Url; -// ------------- -// | Constants | -// ------------- +// --------------------- +// | Convenience Types | +// --------------------- -/// The maximum message size to read from the event export socket -const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB +/// A framed stream over a Unix socket +type FramedUnixStream = FramedRead; // ---------------- // | Event Socket | @@ -28,7 +30,7 @@ const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB /// The socket file is removed when the socket is dropped. pub struct EventSocket { /// The underlying Unix socket - socket: UnixStream, + socket: FramedUnixStream, /// The path to the Unix socket path: String, @@ -54,50 +56,34 @@ impl EventSocket { /// Sets up a Unix socket listening on the given path /// and awaits a single connection on it - async fn establish_socket_connection(path: &str) -> Result { + async fn establish_socket_connection(path: &str) -> Result { let listener = UnixListener::bind(Path::new(path))?; // We only expect one connection, so we can just block on it info!("Waiting for event export socket connection..."); match listener.accept().await { - Ok((socket, _)) => Ok(socket), + Ok((socket, _)) => { + let framed_socket = FramedRead::new(socket, LengthDelimitedCodec::new()); + Ok(framed_socket) + }, Err(e) => Err(eyre!("error accepting Unix socket connection: {e}")), } } /// Listens for events on the socket and submits them to the historical /// state engine - pub async fn listen_for_events(&self) -> Result<(), Error> { - loop { - // Wait for the socket to be readable - self.socket.readable().await?; - - let mut buf = [0; MAX_MESSAGE_SIZE]; - - // Try to read data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - match self.socket.try_read(&mut buf) { - Ok(0) => { - warn!("Event export socket closed"); - return Ok(()); - }, - Ok(n) => { - let msg = &buf[..n]; - if let Err(e) = self.handle_relayer_event(msg.to_vec()).await { - // Events that fail to be submitted are effectively dropped here. - // We can consider retry logic or a local dead-letter queue, but - // for now we keep things simple. - error!("Error handling relayer event: {e}"); - } - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - }, - Err(e) => { - return Err(e.into()); - }, + pub async fn listen_for_events(&mut self) -> Result<(), Error> { + while let Some(msg) = self.socket.try_next().await? { + if let Err(e) = self.handle_relayer_event(msg.to_vec()).await { + // Events that fail to be submitted are effectively dropped here. + // We can consider retry logic or a local dead-letter queue, but + // for now we keep things simple. + error!("Error handling relayer event: {e}"); } } + + warn!("Event export socket closed"); + Ok(()) } /// Handles an event received from the event export socket diff --git a/node-support/event-export-sidecar/src/main.rs b/node-support/event-export-sidecar/src/main.rs index 436a296f..7f9f6dca 100644 --- a/node-support/event-export-sidecar/src/main.rs +++ b/node-support/event-export-sidecar/src/main.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), Error> { return Ok(()); } - let event_socket = + let mut event_socket = EventSocket::new(&relayer_config.event_export_url.unwrap(), cli.queue_url, cli.region) .await?; diff --git a/workers/event-manager/Cargo.toml b/workers/event-manager/Cargo.toml index 20cac70d..53d0bbcd 100644 --- a/workers/event-manager/Cargo.toml +++ b/workers/event-manager/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" # === Async + Concurrency === # async-trait = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } # === Workspace Dependencies === # common = { path = "../../common" } @@ -20,3 +21,4 @@ url = "2.4" tracing = { workspace = true } serde_json = { workspace = true } thiserror = { version = "1.0.61" } +tokio-util = "0.7" diff --git a/workers/event-manager/src/manager.rs b/workers/event-manager/src/manager.rs index cf79a8f6..435faa4c 100644 --- a/workers/event-manager/src/manager.rs +++ b/workers/event-manager/src/manager.rs @@ -5,8 +5,10 @@ use std::{path::Path, thread::JoinHandle}; use common::types::CancelChannel; use constants::in_bootstrap_mode; +use futures::sink::SinkExt; use job_types::event_manager::EventManagerReceiver; -use tokio::{io::AsyncWriteExt, net::UnixStream}; +use tokio::net::UnixStream; +use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; use tracing::{info, warn}; use url::Url; use util::{err_str, runtime::sleep_forever_async}; @@ -24,6 +26,13 @@ const UNIX_SCHEME: &str = "unix"; const ERR_NON_UNIX_EVENT_EXPORT_ADDRESS: &str = "Only Unix socket event export addresses are currently supported"; +// --------------------- +// | Convenience Types | +// --------------------- + +/// A framed sink over a Unix socket +type FramedUnixSink = FramedWrite; + // ---------------------- // | Manager / Executor | // ---------------------- @@ -58,7 +67,9 @@ impl EventManagerExecutor { /// Constructs the export sink for the event manager. /// /// Currently, only Unix socket export addresses are supported. - pub async fn construct_export_sink(&mut self) -> Result, EventManagerError> { + pub async fn construct_export_sink( + &mut self, + ) -> Result, EventManagerError> { if self.event_export_addr.is_none() { return Ok(None); } @@ -69,7 +80,9 @@ impl EventManagerExecutor { .await .map_err(err_str!(EventManagerError::SocketConnection))?; - Ok(Some(socket)) + let framed_socket = FramedWrite::new(socket, LengthDelimitedCodec::new()); + + Ok(Some(framed_socket)) } /// The main execution loop; receives events and exports them to the @@ -93,7 +106,8 @@ impl EventManagerExecutor { let sink = sink.as_mut().unwrap(); let event_bytes = serde_json::to_vec(&event).map_err(err_str!(EventManagerError::Serialize))?; - sink.write_all(&event_bytes).await.map_err(err_str!(EventManagerError::SocketWrite))?; + + sink.send(event_bytes.into()).await.map_err(err_str!(EventManagerError::SocketWrite))?; }, _ = self.cancel_channel.changed() => { diff --git a/workers/task-driver/src/tasks/settle_match_internal.rs b/workers/task-driver/src/tasks/settle_match_internal.rs index 618248ba..98436b07 100644 --- a/workers/task-driver/src/tasks/settle_match_internal.rs +++ b/workers/task-driver/src/tasks/settle_match_internal.rs @@ -260,7 +260,7 @@ impl Task for SettleMatchInternalTask { SettleMatchInternalTaskState::UpdatingValidityProofs => { self.update_proofs().await?; - self.emit_event()?; + self.emit_events()?; record_match_volume(&self.match_result, false /* is_external_match */); self.task_state = SettleMatchInternalTaskState::Completed; @@ -555,8 +555,8 @@ impl SettleMatchInternalTask { }) } - /// Emit a match event to the event manager - fn emit_event(&self) -> Result<(), SettleMatchInternalTaskError> { + /// Emit a pair of fill events to the event manager + fn emit_events(&self) -> Result<(), SettleMatchInternalTaskError> { let commitment_witness0 = &self.order1_validity_witness.commitment_witness; let commitment_witness1 = &self.order2_validity_witness.commitment_witness;