Skip to content

Commit

Permalink
workers: job-types: event_manager: add descriptor methods to relayer …
Browse files Browse the repository at this point in the history
…events
  • Loading branch information
akirillo committed Dec 19, 2024
1 parent 407e394 commit 9adf9e6
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
14 changes: 13 additions & 1 deletion util/src/telemetry/datadog/formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing_opentelemetry::OtelData;

use tracing_serde::AsSerde;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields};
use tracing_subscriber::registry::{LookupSpan, SpanRef};

/// A trace or span ID in the format expected by Datadog
Expand Down Expand Up @@ -122,6 +122,18 @@ where
serializer = visitor.take_serializer()?;

if let Some(ref span_ref) = ctx.lookup_current() {
// Record current span fields, inspired by https://github.com/tokio-rs/tracing/blob/tracing-subscriber-0.3.19/tracing-subscriber/src/fmt/format/json.rs#L168-L207
if let Some(fmt_fields) = span_ref.extensions().get::<FormattedFields<N>>() {
if let Ok(serde_json::Value::Object(fields)) =
serde_json::from_str::<serde_json::Value>(fmt_fields)
{
for (key, value) in fields {
serializer.serialize_entry(&key, &value)?;
}
}
}

// Record trace ID and span ID, if present
if let Some(trace_info) = lookup_trace_info(span_ref) {
serializer.serialize_entry("dd.span_id", &trace_info.span_id)?;
serializer.serialize_entry("dd.trace_id", &trace_info.trace_id)?;
Expand Down
76 changes: 76 additions & 0 deletions workers/job-types/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,47 @@ pub enum RelayerEvent {
ExternalMatch(ExternalMatchEvent),
}

impl RelayerEvent {
/// Returns the event ID
pub fn event_id(&self) -> Uuid {
match self {
RelayerEvent::WalletCreation(event) => event.event_id,
RelayerEvent::ExternalTransfer(event) => event.event_id,
RelayerEvent::OrderPlacement(event) => event.event_id,
RelayerEvent::OrderUpdate(event) => event.event_id,
RelayerEvent::OrderCancellation(event) => event.event_id,
RelayerEvent::Match(event) => event.event_id,
RelayerEvent::ExternalMatch(event) => event.event_id,
}
}

/// Returns the event timestamp
pub fn event_timestamp(&self) -> SystemTime {
match self {
RelayerEvent::WalletCreation(event) => event.event_timestamp,
RelayerEvent::ExternalTransfer(event) => event.event_timestamp,
RelayerEvent::OrderPlacement(event) => event.event_timestamp,
RelayerEvent::OrderUpdate(event) => event.event_timestamp,
RelayerEvent::OrderCancellation(event) => event.event_timestamp,
RelayerEvent::Match(event) => event.event_timestamp,
RelayerEvent::ExternalMatch(event) => event.event_timestamp,
}
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
match self {
RelayerEvent::WalletCreation(event) => event.describe(),
RelayerEvent::ExternalTransfer(event) => event.describe(),
RelayerEvent::OrderPlacement(event) => event.describe(),
RelayerEvent::OrderUpdate(event) => event.describe(),
RelayerEvent::OrderCancellation(event) => event.describe(),
RelayerEvent::Match(event) => event.describe(),
RelayerEvent::ExternalMatch(event) => event.describe(),
}
}
}

// --------------------------
// | Individual Event Types |
// --------------------------
Expand All @@ -82,6 +123,11 @@ impl WalletCreationEvent {
let (event_id, event_timestamp) = get_event_id_and_timestamp();
Self { event_id, event_timestamp, wallet_id, symmetric_key }
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("WalletCreation({})", self.event_id)
}
}

/// An external transfer event
Expand All @@ -104,6 +150,11 @@ impl ExternalTransferEvent {
let (event_id, event_timestamp) = get_event_id_and_timestamp();
Self { event_id, event_timestamp, wallet_id, transfer }
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("ExternalTransfer({})", self.event_id)
}
}

/// An order placement event
Expand Down Expand Up @@ -135,6 +186,11 @@ impl OrderPlacementEvent {
let (event_id, event_timestamp) = get_event_id_and_timestamp();
Self { event_id, event_timestamp, wallet_id, order_id, order, matching_pool }
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("OrderPlacement({})", self.event_id)
}
}

/// An order update event
Expand Down Expand Up @@ -166,6 +222,11 @@ impl OrderUpdateEvent {
let (event_id, event_timestamp) = get_event_id_and_timestamp();
Self { event_id, event_timestamp, wallet_id, order_id, order, matching_pool }
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("OrderUpdate({})", self.event_id)
}
}

/// An order cancellation event
Expand Down Expand Up @@ -208,6 +269,11 @@ impl OrderCancellationEvent {
amount_filled,
}
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("OrderCancellation({})", self.event_id)
}
}

/// A match event
Expand Down Expand Up @@ -276,6 +342,11 @@ impl MatchEvent {
fee_take1,
}
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("Match({})", self.event_id)
}
}

/// An external match event
Expand Down Expand Up @@ -327,6 +398,11 @@ impl ExternalMatchEvent {
external_fee_take,
}
}

/// Returns a human-readable description of the event
pub fn describe(&self) -> String {
format!("ExternalMatch({})", self.event_id)
}
}

// -----------
Expand Down

0 comments on commit 9adf9e6

Please sign in to comment.