Skip to content

Commit

Permalink
node-support: event-export-sidecar: set FIFO queue params
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillo committed Dec 20, 2024
1 parent 3110f03 commit 20d4557
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node-support/event-export-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ aws-sdk-sqs = "1.50.0"

# === Workspace Dependencies === #
config = { path = "../../config" }
job-types = { path = "../../workers/job-types" }
event-manager = { path = "../../workers/event-manager" }

# === Misc Dependencies === #
url = "2.4"
clap = { version = "4", features = ["derive"] }
tracing = { workspace = true }
eyre = { workspace = true }
serde_json = { workspace = true }
14 changes: 13 additions & 1 deletion node-support/event-export-sidecar/src/event_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use aws_config::Region;
use aws_sdk_sqs::Client as SqsClient;
use event_manager::manager::extract_unix_socket_path;
use eyre::{eyre, Error};
use job_types::event_manager::RelayerEvent;
use tokio::net::{UnixListener, UnixStream};
use tracing::{error, info, warn};
use url::Url;
Expand Down Expand Up @@ -101,8 +102,19 @@ impl EventSocket {

/// Handles an event received from the event export socket
async fn handle_relayer_event(&self, msg: Vec<u8>) -> Result<(), Error> {
let event: RelayerEvent = serde_json::from_slice(&msg)?;
let event_id = event.event_id();
let wallet_id = event.wallet_id();

let msg = String::from_utf8(msg)?;
self.sqs_client.send_message().queue_url(&self.queue_url).message_body(msg).send().await?;
self.sqs_client
.send_message()
.queue_url(&self.queue_url)
.message_deduplication_id(event_id)
.message_group_id(wallet_id)
.message_body(msg)
.send()
.await?;

Ok(())
}
Expand Down
13 changes: 13 additions & 0 deletions workers/job-types/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ impl RelayerEvent {
}
}

/// Returns the ID of the wallet for which the event occurred
pub fn wallet_id(&self) -> WalletIdentifier {
match self {
RelayerEvent::WalletCreation(event) => event.wallet_id,
RelayerEvent::ExternalTransfer(event) => event.wallet_id,
RelayerEvent::OrderPlacement(event) => event.wallet_id,
RelayerEvent::OrderUpdate(event) => event.wallet_id,
RelayerEvent::OrderCancellation(event) => event.wallet_id,
RelayerEvent::Fill(event) => event.wallet_id,
RelayerEvent::ExternalFill(event) => event.internal_wallet_id,
}
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
match self {
Expand Down

0 comments on commit 20d4557

Please sign in to comment.