Skip to content

Commit

Permalink
workers: event-manager: manager: create event manager executor
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillo committed Dec 14, 2024
1 parent d98a42f commit 5e200f5
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 81 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use chain_events::listener::{OnChainEventListener, OnChainEventListenerConfig};
use common::worker::{new_worker_failure_channel, watch_worker, Worker};
use common::{default_wrapper::default_option, types::new_cancel_channel};
use constants::{in_bootstrap_mode, VERSION};
use event_manager::worker::{EventManager, EventManagerConfig};
use event_manager::{manager::EventManager, worker::EventManagerConfig};
use external_api::bus_message::SystemBusMessage;
use gossip_server::{server::GossipServer, worker::GossipServerConfig};
use handshake_manager::{manager::HandshakeManager, worker::HandshakeManagerConfig};
Expand Down
6 changes: 6 additions & 0 deletions workers/event-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,11 @@ libp2p = { workspace = true }

# === Workspace Dependencies === #
common = { path = "../../common" }
constants = { path = "../../constants" }
circuit-types = { path = "../../circuit-types" }
job-types = { path = "../job-types" }
util = { path = "../../util" }

# === Misc Dependencies === #
tracing = { workspace = true }
serde_json = { workspace = true }
15 changes: 14 additions & 1 deletion workers/event-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,17 @@
/// An error that occurred in the event manager
#[derive(Clone, Debug)]
pub enum EventManagerError {}
pub enum EventManagerError {
/// The event manager was cancelled
Cancelled(String),
/// The event export address is invalid
InvalidEventExportAddr(String),
/// An error occurred while connecting to the event export socket
SocketConnection(String),
/// An error occurred while serializing an event
Serialize(String),
/// An error occurred while writing to the event export socket
SocketWrite(String),
/// An error occurred while setting up the event manager
SetupError(String),
}
1 change: 1 addition & 0 deletions workers/event-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
#![deny(clippy::needless_pass_by_ref_mut)]

pub mod error;
pub mod manager;
pub mod worker;
111 changes: 111 additions & 0 deletions workers/event-manager/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//! The core event manager logic, the main loop that receives events
//! and exports them to the configured address
use std::{path::Path, thread::JoinHandle};

use common::types::CancelChannel;
use constants::in_bootstrap_mode;
use job_types::event_manager::EventManagerReceiver;
use libp2p::{multiaddr::Protocol, Multiaddr};
use tokio::{io::AsyncWriteExt, net::UnixStream};
use tracing::{info, warn};
use util::{err_str, runtime::sleep_forever_async};

use crate::{error::EventManagerError, worker::EventManagerConfig};

// -------------
// | Constants |
// -------------

/// The error message for when the event export address is not a Unix socket
const ERR_NON_UNIX_EVENT_EXPORT_ADDRESS: &str =
"Only Unix socket event export addresses are currently supported";

// ----------------------
// | Manager / Executor |
// ----------------------

/// The event manager worker
pub struct EventManager {
/// The event manager executor
pub executor: Option<EventManagerExecutor>,
/// The handle on the event manager
pub handle: Option<JoinHandle<EventManagerError>>,
}

/// Manages the exporting of events to the configured address
pub struct EventManagerExecutor {
/// The channel on which to receive events
event_queue: EventManagerReceiver,
/// The address to export relayer events to
event_export_addr: Option<Multiaddr>,
/// The channel on which the coordinator may cancel event manager execution
cancel_channel: CancelChannel,
}

impl EventManagerExecutor {
/// Constructs a new event manager executor
pub fn new(config: EventManagerConfig) -> Self {
let EventManagerConfig { event_queue, event_export_addr, cancel_channel } = config;

Self { event_queue, event_export_addr, cancel_channel }
}

/// Constructs the export sink for the event manager.
///
/// Currently, only Unix socket export addresses are supported.
pub async fn construct_export_sink(&mut self) -> Result<Option<UnixStream>, EventManagerError> {
if self.event_export_addr.is_none() {
return Ok(None);
}

let mut event_export_addr = self.event_export_addr.take().unwrap();
let unix_path =
match event_export_addr.pop().expect("event export address must not be empty") {
Protocol::Unix(path) => path.to_string(),
_ => {
return Err(EventManagerError::InvalidEventExportAddr(
ERR_NON_UNIX_EVENT_EXPORT_ADDRESS.to_string(),
))
},
};

let socket = UnixStream::connect(Path::new(&unix_path))
.await
.map_err(err_str!(EventManagerError::SocketConnection))?;

Ok(Some(socket))
}

/// The main execution loop; receives events and exports them to the
/// configured sink
pub async fn execution_loop(mut self) -> Result<(), EventManagerError> {
// If the node is running in bootstrap mode, sleep forever
if in_bootstrap_mode() {
sleep_forever_async().await;
}

let disabled = self.event_export_addr.is_none();
let mut sink = self.construct_export_sink().await?;

loop {
tokio::select! {
Some(event) = self.event_queue.recv() => {
if disabled {
warn!("EventManager received event while disabled, ignoring...");
continue;
}

let sink = sink.as_mut().unwrap();
let event_bytes = serde_json::to_vec(&event).map_err(err_str!(EventManagerError::Serialize))?;
sink.write_all(&event_bytes).await.map_err(err_str!(EventManagerError::SocketWrite))?;
},

_ = self.cancel_channel.changed() => {
info!("EventManager received cancel signal, shutting down...");
return Err(EventManagerError::Cancelled("received cancel signal".to_string()));
}
}
}
}
}
55 changes: 39 additions & 16 deletions workers/event-manager/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
//! Defines the worker implementation for the event manager
use std::thread::JoinHandle;
use std::thread::{Builder, JoinHandle};

use async_trait::async_trait;
use common::{types::CancelChannel, worker::Worker};
use job_types::event_manager::EventManagerReceiver;
use libp2p::Multiaddr;
use tokio::runtime::Builder as RuntimeBuilder;
use tracing::info;
use util::err_str;

use crate::error::EventManagerError;
use crate::{
error::EventManagerError,
manager::{EventManager, EventManagerExecutor},
};

// -------------
// | Constants |
// -------------

/// The number of threads to use for the event manager.
///
/// We only need one thread as no parallel tasks are spawned
/// by the event manager.
const EVENT_MANAGER_N_THREADS: usize = 1;

// ----------
// | Config |
Expand All @@ -24,25 +40,14 @@ pub struct EventManagerConfig {
pub cancel_channel: CancelChannel,
}

// ----------
// | Worker |
// ----------

/// The event manager worker
pub struct EventManager {
/// The configuration for the event manager
pub config: EventManagerConfig,
/// The handle on the event manager
pub handle: Option<JoinHandle<EventManagerError>>,
}

#[async_trait]
impl Worker for EventManager {
type WorkerConfig = EventManagerConfig;
type Error = EventManagerError;

async fn new(config: Self::WorkerConfig) -> Result<Self, Self::Error> {
Ok(Self { config, handle: None })
let executor = EventManagerExecutor::new(config);
Ok(Self { executor: Some(executor), handle: None })
}

fn name(&self) -> String {
Expand All @@ -54,14 +59,32 @@ impl Worker for EventManager {
}

fn join(&mut self) -> Vec<JoinHandle<Self::Error>> {
vec![]
vec![self.handle.take().unwrap()]
}

fn cleanup(&mut self) -> Result<(), Self::Error> {
Ok(())
}

fn start(&mut self) -> Result<(), Self::Error> {
info!("Starting event manager executor...");

let executor = self.executor.take().unwrap();
let executor_handle = Builder::new()
.name("event-manager-executor-main".to_string())
.spawn(move || {
let runtime = RuntimeBuilder::new_multi_thread()
.worker_threads(EVENT_MANAGER_N_THREADS)
.enable_all()
.build()
.unwrap();

runtime.block_on(executor.execution_loop()).err().unwrap()
})
.map_err(err_str!(EventManagerError::SetupError))?;

self.handle = Some(executor_handle);

Ok(())
}
}
Loading

0 comments on commit 5e200f5

Please sign in to comment.