Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the # of messages prioritized in the queue to the http resp… #5043

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions rust/main/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ anyhow = "1.0"
async-trait = "0.1"
async-rwlock = "1.3"
auto_impl = "1.0"
axum = "0.6.1"
axum = { version = "0.6.1", features = ["macros"] }
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
backtrace = "0.3"
base64 = "0.21.2"
bigdecimal = "0.4.2"
Expand Down Expand Up @@ -152,6 +152,7 @@ typetag = "0.2"
uint = "0.9.5"
ureq = { version = "2.4", default-features = false }
url = "2.3"
uuid = { version = "1.11.0", features = ["v4"] }
walkdir = "2"
warp = "0.3"
which = "4.3"
Expand Down
2 changes: 2 additions & 0 deletions rust/main/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ tokio = { workspace = true, features = [
] }
tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing-test.workspace = true
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
tracing.workspace = true
typetag.workspace = true
uuid.workspace = true

hyperlane-core = { path = "../../hyperlane-core", features = [
"agent",
Expand Down
69 changes: 58 additions & 11 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};
use derive_new::new;
use hyperlane_core::{PendingOperation, PendingOperationStatus, QueueOperation};
use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tokio::sync::{broadcast::Receiver, mpsc, Mutex};
use tracing::{debug, info, instrument};

use crate::settings::matching_list::MatchingList;
use crate::server::{MessageRetryRequest, MessageRetryResponse};

pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>;

Expand All @@ -16,7 +16,8 @@ pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: Arc<Mutex<Receiver<MatchingList>>>,
retry_rx: Arc<Mutex<Receiver<MessageRetryRequest>>>,
retry_tx: mpsc::Sender<MessageRetryResponse>,
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
#[new(default)]
pub queue: OperationPriorityQueue,
}
Expand Down Expand Up @@ -74,27 +75,55 @@ impl OpQueue {
// The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task
// that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now.
let mut message_retry_requests = vec![];

while let Ok(message_id) = self.retry_rx.lock().await.try_recv() {
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
message_retry_requests.push(message_id);
let uuid = message_id.uuid.clone();
message_retry_requests.push((
message_id,
MessageRetryResponse {
uuid,
processed: 0,
matched: 0,
},
));
}

if message_retry_requests.is_empty() {
return;
}

let mut queue = self.queue.lock().await;
let queue_length = queue.len();

let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut op)| {
if message_retry_requests.iter().any(|r| r.op_matches(&op)) {
let matches = message_retry_requests
.iter()
.any(|(retry_req, _)| retry_req.pattern.op_matches(&op));
if matches {
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realizing it'd be useful to include the retry request in this log

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't really log the specific retry request because we only check if some retry request matches the operation.
Unless we want to log each retry request that matches in the .map()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, though we could take advanced of the filter result and accumulate it into a vec of maps, so we include all request IDs that matched in the log. It'd also be cleaner to refactor everything inside the .map(|Reverse(mut op)| { into its own function, since it's getting quite crowded

operation = %op,
queue_label = %self.queue_metrics_label,
"Retrying OpQueue operation"
);
op.reset_attempts()
op.reset_attempts();
}
// update retry metrics
for (retry_req, req_metric) in message_retry_requests.iter_mut() {
if retry_req.pattern.op_matches(&op) {
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
req_metric.matched += 1;
}
}
Reverse(op)
})
.collect();

tracing::debug!("Sending retry request metrics back");
for (_, mut req_metric) in message_retry_requests {
req_metric.processed = queue_length;
let _ = self.retry_tx.send(req_metric).await;
}
queue.append(&mut reprioritized_queue);
}

Expand All @@ -112,7 +141,10 @@ impl OpQueue {

#[cfg(test)]
pub mod test {
use crate::settings::matching_list::MatchingList;

use super::*;

use hyperlane_core::{
HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack,
HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
Expand Down Expand Up @@ -317,15 +349,19 @@ pub mod test {
async fn test_multiple_op_queues_message_id() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);
let (retry_response_tx, _rx) = mpsc::channel(100);

let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Arc::new(Mutex::new(broadcaster.subscribe())),
retry_response_tx.clone(),
);
let mut op_queue_2 = OpQueue::new(
metrics,
queue_metrics_label,
Arc::new(Mutex::new(broadcaster.subscribe())),
retry_response_tx,
);

// Add some operations to the queue with increasing `next_attempt_after` values
Expand Down Expand Up @@ -363,10 +399,16 @@ pub mod test {

// Retry by message ids
broadcaster
.send(MatchingList::with_message_id(op_ids[1]))
.send(MessageRetryRequest {
uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(),
pattern: MatchingList::with_message_id(op_ids[1]),
})
.unwrap();
broadcaster
.send(MatchingList::with_message_id(op_ids[2]))
.send(MessageRetryRequest {
uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(),
pattern: MatchingList::with_message_id(op_ids[2]),
})
.unwrap();

// Pop elements from queue 1
Expand Down Expand Up @@ -396,10 +438,14 @@ pub mod test {
async fn test_destination_domain() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let (retry_response_tx, _rx) = mpsc::channel(100);

let mut op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Arc::new(Mutex::new(broadcaster.subscribe())),
retry_response_tx,
);

// Add some operations to the queue with increasing `next_attempt_after` values
Expand All @@ -424,9 +470,10 @@ pub mod test {

// Retry by domain
broadcaster
.send(MatchingList::with_destination_domain(
destination_domain_2.id(),
))
.send(MessageRetryRequest {
uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(),
pattern: MatchingList::with_destination_domain(destination_domain_2.id()),
})
.unwrap();

// Pop elements from queue
Expand Down
9 changes: 7 additions & 2 deletions rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use hyperlane_core::{
};

use crate::msg::pending_message::CONFIRM_DELAY;
use crate::settings::matching_list::MatchingList;
use crate::server::MessageRetryRequest;
use crate::server::MessageRetryResponse;

use super::op_queue::OpQueue;
use super::op_queue::OperationPriorityQueue;
Expand Down Expand Up @@ -105,7 +106,8 @@ impl SerialSubmitter {
pub fn new(
domain: HyperlaneDomain,
rx: mpsc::UnboundedReceiver<QueueOperation>,
retry_op_transmitter: Sender<MatchingList>,
retry_op_transmitter: Sender<MessageRetryRequest>,
retry_op_response_transmitter: mpsc::Sender<MessageRetryResponse>,
metrics: SerialSubmitterMetrics,
max_batch_size: u32,
task_monitor: TaskMonitor,
Expand All @@ -114,16 +116,19 @@ impl SerialSubmitter {
metrics.submitter_queue_length.clone(),
"prepare_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
retry_op_response_transmitter.clone(),
);
let submit_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"submit_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
retry_op_response_transmitter.clone(),
);
let confirm_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"confirm_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
retry_op_response_transmitter,
);

Self {
Expand Down
8 changes: 6 additions & 2 deletions rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,21 @@ impl BaseAgent for Relayer {
}));
tasks.push(console_server.instrument(info_span!("Tokio console server")));
}
let sender = BroadcastSender::<MatchingList>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
let mut prep_queues = HashMap::with_capacity(self.destination_chains.len());

let (retry_op_response_tx, retry_op_response_rx) =
mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE);
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) = mpsc::unbounded_channel::<QueueOperation>();
send_channels.insert(dest_domain.id(), send_channel);
let serial_submitter = SerialSubmitter::new(
dest_domain.clone(),
receive_channel,
sender.clone(),
retry_op_response_tx.clone(),
SerialSubmitterMetrics::new(&self.core.metrics, dest_domain),
// Default to submitting one message at a time if there is no batch config
self.core.settings.chains[dest_domain.name()]
Expand Down Expand Up @@ -385,7 +389,7 @@ impl BaseAgent for Relayer {
);
}
// run server
let custom_routes = relayer_server::Server::new()
let custom_routes = relayer_server::Server::new(Some(retry_op_response_rx))
.with_op_retry(sender.clone())
.with_message_queue(prep_queues)
.routes();
Expand Down
5 changes: 4 additions & 1 deletion rust/main/agents/relayer/src/server/list_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,20 @@ mod tests {
use axum::http::StatusCode;
use hyperlane_core::KnownHyperlaneDomain;
use std::{cmp::Reverse, net::SocketAddr, sync::Arc};
use tokio::sync::{self, Mutex};
use tokio::sync::{self, mpsc, Mutex};

const DUMMY_DOMAIN: KnownHyperlaneDomain = KnownHyperlaneDomain::Arbitrum;

fn setup_test_server() -> (SocketAddr, OperationPriorityQueue) {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);
let (retry_response_tx, _rx) = mpsc::channel(100);

let op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Arc::new(Mutex::new(broadcaster.subscribe())),
retry_response_tx,
);
let mut op_queues_map = HashMap::new();
op_queues_map.insert(DUMMY_DOMAIN as u32, op_queue.queue.clone());
Expand Down
Loading
Loading