From 9cdcc7ffd8f11277eb8342a813555530edf3c322 Mon Sep 17 00:00:00 2001 From: Andrew Kirillov Date: Tue, 10 Dec 2024 16:31:18 -0800 Subject: [PATCH] workers: event-manager: manager: create event manager executor --- Cargo.lock | 4 + core/src/main.rs | 2 +- workers/event-manager/Cargo.toml | 6 ++ workers/event-manager/src/error.rs | 15 +++- workers/event-manager/src/lib.rs | 1 + workers/event-manager/src/manager.rs | 111 +++++++++++++++++++++++++++ workers/event-manager/src/worker.rs | 55 +++++++++---- 7 files changed, 176 insertions(+), 18 deletions(-) create mode 100644 workers/event-manager/src/manager.rs diff --git a/Cargo.lock b/Cargo.lock index 15927c93..b5fa479c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3583,9 +3583,13 @@ dependencies = [ "async-trait", "circuit-types 0.1.0", "common 0.1.0", + "constants 0.1.0", "job-types", "libp2p", + "serde_json", "tokio", + "tracing", + "util 0.1.0", ] [[package]] diff --git a/core/src/main.rs b/core/src/main.rs index 41793107..ddb2b0c3 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -24,7 +24,7 @@ use chain_events::listener::{OnChainEventListener, OnChainEventListenerConfig}; use common::worker::{new_worker_failure_channel, watch_worker, Worker}; use common::{default_wrapper::default_option, types::new_cancel_channel}; use constants::{in_bootstrap_mode, VERSION}; -use event_manager::worker::{EventManager, EventManagerConfig}; +use event_manager::{manager::EventManager, worker::EventManagerConfig}; use external_api::bus_message::SystemBusMessage; use gossip_server::{server::GossipServer, worker::GossipServerConfig}; use handshake_manager::{manager::HandshakeManager, worker::HandshakeManagerConfig}; diff --git a/workers/event-manager/Cargo.toml b/workers/event-manager/Cargo.toml index 73f42bbd..6b037da2 100644 --- a/workers/event-manager/Cargo.toml +++ b/workers/event-manager/Cargo.toml @@ -13,5 +13,11 @@ libp2p = { workspace = true } # === Workspace Dependencies === # common = { path = "../../common" } +constants = { path = "../../constants" } circuit-types = { path = "../../circuit-types" } job-types = { path = "../job-types" } +util = { path = "../../util" } + +# === Misc Dependencies === # +tracing = { workspace = true } +serde_json = { workspace = true } diff --git a/workers/event-manager/src/error.rs b/workers/event-manager/src/error.rs index da408df3..b70ff68e 100644 --- a/workers/event-manager/src/error.rs +++ b/workers/event-manager/src/error.rs @@ -2,4 +2,17 @@ /// An error that occurred in the event manager #[derive(Clone, Debug)] -pub enum EventManagerError {} +pub enum EventManagerError { + /// The event manager was cancelled + Cancelled(String), + /// The event export address is invalid + InvalidEventExportAddr(String), + /// An error occurred while connecting to the event export socket + SocketConnection(String), + /// An error occurred while serializing an event + Serialize(String), + /// An error occurred while writing to the event export socket + SocketWrite(String), + /// An error occurred while setting up the event manager + SetupError(String), +} diff --git a/workers/event-manager/src/lib.rs b/workers/event-manager/src/lib.rs index 46043885..efa8d9d2 100644 --- a/workers/event-manager/src/lib.rs +++ b/workers/event-manager/src/lib.rs @@ -11,4 +11,5 @@ #![deny(clippy::needless_pass_by_ref_mut)] pub mod error; +pub mod manager; pub mod worker; diff --git a/workers/event-manager/src/manager.rs b/workers/event-manager/src/manager.rs new file mode 100644 index 00000000..93335165 --- /dev/null +++ b/workers/event-manager/src/manager.rs @@ -0,0 +1,111 @@ +//! The core event manager logic, the main loop that receives events +//! and exports them to the configured address + +use std::{path::Path, thread::JoinHandle}; + +use common::types::CancelChannel; +use constants::in_bootstrap_mode; +use job_types::event_manager::EventManagerReceiver; +use libp2p::{multiaddr::Protocol, Multiaddr}; +use tokio::{io::AsyncWriteExt, net::UnixStream}; +use tracing::{info, warn}; +use util::{err_str, runtime::sleep_forever_async}; + +use crate::{error::EventManagerError, worker::EventManagerConfig}; + +// ------------- +// | Constants | +// ------------- + +/// The error message for when the event export address is not a Unix socket +const ERR_NON_UNIX_EVENT_EXPORT_ADDRESS: &str = + "Only Unix socket event export addresses are currently supported"; + +// ---------------------- +// | Manager / Executor | +// ---------------------- + +/// The event manager worker +pub struct EventManager { + /// The event manager executor + pub executor: Option, + /// The handle on the event manager + pub handle: Option>, +} + +/// Manages the exporting of events to the configured address +pub struct EventManagerExecutor { + /// The channel on which to receive events + event_queue: EventManagerReceiver, + /// The address to export relayer events to + event_export_addr: Option, + /// The channel on which the coordinator may cancel event manager execution + cancel_channel: CancelChannel, +} + +impl EventManagerExecutor { + /// Constructs a new event manager executor + pub fn new(config: EventManagerConfig) -> Self { + let EventManagerConfig { event_queue, event_export_addr, cancel_channel } = config; + + Self { event_queue, event_export_addr, cancel_channel } + } + + /// Constructs the export sink for the event manager. + /// + /// Currently, only Unix socket export addresses are supported. + pub async fn construct_export_sink(&mut self) -> Result, EventManagerError> { + if self.event_export_addr.is_none() { + return Ok(None); + } + + let mut event_export_addr = self.event_export_addr.take().unwrap(); + let unix_path = + match event_export_addr.pop().expect("event export address must not be empty") { + Protocol::Unix(path) => path.to_string(), + _ => { + return Err(EventManagerError::InvalidEventExportAddr( + ERR_NON_UNIX_EVENT_EXPORT_ADDRESS.to_string(), + )) + }, + }; + + let socket = UnixStream::connect(Path::new(&unix_path)) + .await + .map_err(err_str!(EventManagerError::SocketConnection))?; + + Ok(Some(socket)) + } + + /// The main execution loop; receives events and exports them to the + /// configured sink + pub async fn execution_loop(mut self) -> Result<(), EventManagerError> { + // If the node is running in bootstrap mode, sleep forever + if in_bootstrap_mode() { + sleep_forever_async().await; + } + + let disabled = self.event_export_addr.is_none(); + let mut sink = self.construct_export_sink().await?; + + loop { + tokio::select! { + Some(event) = self.event_queue.recv() => { + if disabled { + warn!("EventManager received event while disabled, ignoring..."); + continue; + } + + let sink = sink.as_mut().unwrap(); + let event_bytes = serde_json::to_vec(&event).map_err(err_str!(EventManagerError::Serialize))?; + sink.write_all(&event_bytes).await.map_err(err_str!(EventManagerError::SocketWrite))?; + }, + + _ = self.cancel_channel.changed() => { + info!("EventManager received cancel signal, shutting down..."); + return Err(EventManagerError::Cancelled("received cancel signal".to_string())); + } + } + } + } +} diff --git a/workers/event-manager/src/worker.rs b/workers/event-manager/src/worker.rs index 06864e05..7f404b1e 100644 --- a/workers/event-manager/src/worker.rs +++ b/workers/event-manager/src/worker.rs @@ -1,13 +1,29 @@ //! Defines the worker implementation for the event manager -use std::thread::JoinHandle; +use std::thread::{Builder, JoinHandle}; use async_trait::async_trait; use common::{types::CancelChannel, worker::Worker}; use job_types::event_manager::EventManagerReceiver; use libp2p::Multiaddr; +use tokio::runtime::Builder as RuntimeBuilder; +use tracing::info; +use util::err_str; -use crate::error::EventManagerError; +use crate::{ + error::EventManagerError, + manager::{EventManager, EventManagerExecutor}, +}; + +// ------------- +// | Constants | +// ------------- + +/// The number of threads to use for the event manager. +/// +/// We only need one thread as no parallel tasks are spawned +/// by the event manager. +const EVENT_MANAGER_N_THREADS: usize = 1; // ---------- // | Config | @@ -24,25 +40,14 @@ pub struct EventManagerConfig { pub cancel_channel: CancelChannel, } -// ---------- -// | Worker | -// ---------- - -/// The event manager worker -pub struct EventManager { - /// The configuration for the event manager - pub config: EventManagerConfig, - /// The handle on the event manager - pub handle: Option>, -} - #[async_trait] impl Worker for EventManager { type WorkerConfig = EventManagerConfig; type Error = EventManagerError; async fn new(config: Self::WorkerConfig) -> Result { - Ok(Self { config, handle: None }) + let executor = EventManagerExecutor::new(config); + Ok(Self { executor: Some(executor), handle: None }) } fn name(&self) -> String { @@ -54,7 +59,7 @@ impl Worker for EventManager { } fn join(&mut self) -> Vec> { - vec![] + vec![self.handle.take().unwrap()] } fn cleanup(&mut self) -> Result<(), Self::Error> { @@ -62,6 +67,24 @@ impl Worker for EventManager { } fn start(&mut self) -> Result<(), Self::Error> { + info!("Starting event manager executor..."); + + let executor = self.executor.take().unwrap(); + let executor_handle = Builder::new() + .name("event-manager-executor-main".to_string()) + .spawn(move || { + let runtime = RuntimeBuilder::new_multi_thread() + .worker_threads(EVENT_MANAGER_N_THREADS) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(executor.execution_loop()).err().unwrap() + }) + .map_err(err_str!(EventManagerError::SetupError))?; + + self.handle = Some(executor_handle); + Ok(()) } }