Skip to content

Commit

Permalink
workers: event-manager, node-support: event-export-sidecar: implement…
Browse files Browse the repository at this point in the history
… framed unix socket
  • Loading branch information
akirillo committed Dec 20, 2024
1 parent 20d4557 commit 5977807
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 46 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node-support/event-export-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
# === Async + Runtime Dependencies === #
tokio = { workspace = true, features = ["full"] }
tokio-stream = "0.1"

# === AWS Dependencies === #
aws-config = { version = "1.1.4", features = ["behavior-version-latest"] }
Expand All @@ -22,3 +23,4 @@ clap = { version = "4", features = ["derive"] }
tracing = { workspace = true }
eyre = { workspace = true }
serde_json = { workspace = true }
tokio-util = "0.7"
62 changes: 24 additions & 38 deletions node-support/event-export-sidecar/src/event_socket.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
//! A managed Unix listener that removes the socket file when dropped
use std::{fs, io, path::Path};
use std::{fs, path::Path};

use aws_config::Region;
use aws_sdk_sqs::Client as SqsClient;
use event_manager::manager::extract_unix_socket_path;
use eyre::{eyre, Error};
use job_types::event_manager::RelayerEvent;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tracing::{error, info, warn};
use url::Url;

// -------------
// | Constants |
// -------------
// ---------------------
// | Convenience Types |
// ---------------------

/// The maximum message size to read from the event export socket
const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB
/// A framed stream over a Unix socket
type FramedUnixStream = FramedRead<UnixStream, LengthDelimitedCodec>;

// ----------------
// | Event Socket |
Expand All @@ -28,7 +30,7 @@ const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB
/// The socket file is removed when the socket is dropped.
pub struct EventSocket {
/// The underlying Unix socket
socket: UnixStream,
socket: FramedUnixStream,

/// The path to the Unix socket
path: String,
Expand All @@ -54,50 +56,34 @@ impl EventSocket {

/// Sets up a Unix socket listening on the given path
/// and awaits a single connection on it
async fn establish_socket_connection(path: &str) -> Result<UnixStream, Error> {
async fn establish_socket_connection(path: &str) -> Result<FramedUnixStream, Error> {
let listener = UnixListener::bind(Path::new(path))?;

// We only expect one connection, so we can just block on it
info!("Waiting for event export socket connection...");
match listener.accept().await {
Ok((socket, _)) => Ok(socket),
Ok((socket, _)) => {
let framed_socket = FramedRead::new(socket, LengthDelimitedCodec::new());
Ok(framed_socket)
},
Err(e) => Err(eyre!("error accepting Unix socket connection: {e}")),
}
}

/// Listens for events on the socket and submits them to the historical
/// state engine
pub async fn listen_for_events(&self) -> Result<(), Error> {
loop {
// Wait for the socket to be readable
self.socket.readable().await?;

let mut buf = [0; MAX_MESSAGE_SIZE];

// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match self.socket.try_read(&mut buf) {
Ok(0) => {
warn!("Event export socket closed");
return Ok(());
},
Ok(n) => {
let msg = &buf[..n];
if let Err(e) = self.handle_relayer_event(msg.to_vec()).await {
// Events that fail to be submitted are effectively dropped here.
// We can consider retry logic or a local dead-letter queue, but
// for now we keep things simple.
error!("Error handling relayer event: {e}");
}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
},
Err(e) => {
return Err(e.into());
},
pub async fn listen_for_events(&mut self) -> Result<(), Error> {
while let Some(msg) = self.socket.try_next().await? {
if let Err(e) = self.handle_relayer_event(msg.to_vec()).await {
// Events that fail to be submitted are effectively dropped here.
// We can consider retry logic or a local dead-letter queue, but
// for now we keep things simple.
error!("Error handling relayer event: {e}");
}
}

warn!("Event export socket closed");
Ok(())
}

/// Handles an event received from the event export socket
Expand Down
2 changes: 1 addition & 1 deletion node-support/event-export-sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<(), Error> {
return Ok(());
}

let event_socket =
let mut event_socket =
EventSocket::new(&relayer_config.event_export_url.unwrap(), cli.queue_url, cli.region)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions workers/event-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
# === Async + Concurrency === #
async-trait = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }

# === Workspace Dependencies === #
common = { path = "../../common" }
Expand All @@ -20,3 +21,4 @@ url = "2.4"
tracing = { workspace = true }
serde_json = { workspace = true }
thiserror = { version = "1.0.61" }
tokio-util = "0.7"
22 changes: 18 additions & 4 deletions workers/event-manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::{path::Path, thread::JoinHandle};

use common::types::CancelChannel;
use constants::in_bootstrap_mode;
use futures::sink::SinkExt;
use job_types::event_manager::EventManagerReceiver;
use tokio::{io::AsyncWriteExt, net::UnixStream};
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
use tracing::{info, warn};
use url::Url;
use util::{err_str, runtime::sleep_forever_async};
Expand All @@ -24,6 +26,13 @@ const UNIX_SCHEME: &str = "unix";
const ERR_NON_UNIX_EVENT_EXPORT_ADDRESS: &str =
"Only Unix socket event export addresses are currently supported";

// ---------------------
// | Convenience Types |
// ---------------------

/// A framed sink over a Unix socket
type FramedUnixSink = FramedWrite<UnixStream, LengthDelimitedCodec>;

// ----------------------
// | Manager / Executor |
// ----------------------
Expand Down Expand Up @@ -58,7 +67,9 @@ impl EventManagerExecutor {
/// Constructs the export sink for the event manager.
///
/// Currently, only Unix socket export addresses are supported.
pub async fn construct_export_sink(&mut self) -> Result<Option<UnixStream>, EventManagerError> {
pub async fn construct_export_sink(
&mut self,
) -> Result<Option<FramedUnixSink>, EventManagerError> {
if self.event_export_addr.is_none() {
return Ok(None);
}
Expand All @@ -69,7 +80,9 @@ impl EventManagerExecutor {
.await
.map_err(err_str!(EventManagerError::SocketConnection))?;

Ok(Some(socket))
let framed_socket = FramedWrite::new(socket, LengthDelimitedCodec::new());

Ok(Some(framed_socket))
}

/// The main execution loop; receives events and exports them to the
Expand All @@ -93,7 +106,8 @@ impl EventManagerExecutor {

let sink = sink.as_mut().unwrap();
let event_bytes = serde_json::to_vec(&event).map_err(err_str!(EventManagerError::Serialize))?;
sink.write_all(&event_bytes).await.map_err(err_str!(EventManagerError::SocketWrite))?;

sink.send(event_bytes.into()).await.map_err(err_str!(EventManagerError::SocketWrite))?;
},

_ = self.cancel_channel.changed() => {
Expand Down
6 changes: 3 additions & 3 deletions workers/task-driver/src/tasks/settle_match_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Task for SettleMatchInternalTask {

SettleMatchInternalTaskState::UpdatingValidityProofs => {
self.update_proofs().await?;
self.emit_event()?;
self.emit_events()?;
record_match_volume(&self.match_result, false /* is_external_match */);

self.task_state = SettleMatchInternalTaskState::Completed;
Expand Down Expand Up @@ -555,8 +555,8 @@ impl SettleMatchInternalTask {
})
}

/// Emit a match event to the event manager
fn emit_event(&self) -> Result<(), SettleMatchInternalTaskError> {
/// Emit a pair of fill events to the event manager
fn emit_events(&self) -> Result<(), SettleMatchInternalTaskError> {
let commitment_witness0 = &self.order1_validity_witness.commitment_witness;
let commitment_witness1 = &self.order2_validity_witness.commitment_witness;

Expand Down

0 comments on commit 5977807

Please sign in to comment.