Skip to content

Commit

Permalink
Feature: store heartbeat metric in RaftMetrics and RaftDataMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC authored and drmingdrmer committed Jul 22, 2024
1 parent 0325f7d commit d857a85
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 18 deletions.
37 changes: 29 additions & 8 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::error::RPCError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
Expand Down Expand Up @@ -223,7 +224,7 @@ where
let res = self.do_main(rx_shutdown).instrument(span).await;

// Flush buffered metrics
self.report_metrics(None);
self.report_metrics(None, None);

// Safe unwrap: res is Result<Infallible, _>
let err = res.unwrap_err();
Expand Down Expand Up @@ -257,7 +258,7 @@ where
self.run_engine_commands().await?;

// Initialize metrics.
self.report_metrics(None);
self.report_metrics(None, None);

self.runtime_loop(rx_shutdown).await
}
Expand Down Expand Up @@ -514,18 +515,36 @@ where

#[tracing::instrument(level = "debug", skip_all)]
pub fn flush_metrics(&mut self) {
let leader_metrics = if let Some(leader) = self.engine.leader.as_ref() {
let prog = &leader.progress;
Some(prog.iter().map(|(id, p)| (*id, *p.borrow())).collect())
let (replication, heartbeat) = if let Some(leader) = self.engine.leader.as_ref() {
let replication_prog = &leader.progress;
let replication = Some(replication_prog.iter().map(|(id, p)| (*id, *p.borrow())).collect());

let clock_prog = &leader.clock_progress;
let heartbeat = Some(
clock_prog
.iter()
.map(|(id, opt_t)| {
let millis_since_last_ack = opt_t.map(|t| t.elapsed().as_millis() as u64);

(*id, millis_since_last_ack)
})
.collect(),
);

(replication, heartbeat)
} else {
None
(None, None)
};
self.report_metrics(leader_metrics);
self.report_metrics(replication, heartbeat);
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C>>) {
pub(crate) fn report_metrics(
&mut self,
replication: Option<ReplicationMetrics<C>>,
heartbeat: Option<HeartbeatMetrics<C>>,
) {
let last_quorum_acked = self.last_quorum_acked_time();
let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);

Expand All @@ -551,6 +570,7 @@ where
current_leader,
millis_since_quorum_ack,
membership_config: membership_config.clone(),
heartbeat: heartbeat.clone(),

// --- replication ---
replication: replication.clone(),
Expand All @@ -563,6 +583,7 @@ where
purged: st.io_purged().copied(),
millis_since_quorum_ack,
replication,
heartbeat,
};

let server_metrics = RaftServerMetrics {
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
pub(crate) mod display_btreemap_opt_value;
pub(crate) mod display_instant;
pub(crate) mod display_option;
pub(crate) mod display_result;
pub(crate) mod display_slice;

pub(crate) use display_btreemap_opt_value::DisplayBTreeMapOptValue;
#[allow(unused_imports)]
pub(crate) use display_btreemap_opt_value::DisplayBtreeMapOptValueExt;
#[allow(unused_imports)]
pub(crate) use display_instant::DisplayInstant;
pub(crate) use display_instant::DisplayInstantExt;
Expand Down
81 changes: 81 additions & 0 deletions openraft/src/display_ext/display_btreemap_opt_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::collections::BTreeMap;
use std::fmt;

use super::DisplayOption;

/// Implement `Display` for `BTreeMap<K, Option<V>>` if `K` and `V` are `Display`.
///
/// It formats a key-value pair in a string like "{key}:{opt_value}", and
/// concatenates the key-value pairs with comma.
///
/// For how to format the `opt_value`, see [`DisplayOption`].
pub(crate) struct DisplayBTreeMapOptValue<'a, K: fmt::Display, V: fmt::Display>(pub &'a BTreeMap<K, Option<V>>);

impl<'a, K: fmt::Display, V: fmt::Display> fmt::Display for DisplayBTreeMapOptValue<'a, K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let len = self.0.len();
for (idx, (key, value)) in self.0.iter().enumerate() {
write!(f, "{}:{}", key, DisplayOption(value))?;
if idx + 1 != len {
write!(f, ",")?;
}
}

Ok(())
}
}

#[allow(unused)]
pub(crate) trait DisplayBtreeMapOptValueExt<'a, K: fmt::Display, V: fmt::Display> {
fn display(&'a self) -> DisplayBTreeMapOptValue<'a, K, V>;
}

impl<K, V> DisplayBtreeMapOptValueExt<'_, K, V> for BTreeMap<K, Option<V>>
where
K: fmt::Display,
V: fmt::Display,
{
fn display(&self) -> DisplayBTreeMapOptValue<K, V> {
DisplayBTreeMapOptValue(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_display_btreemap_opt_value() {
let map = (1..=3).map(|num| (num, Some(num))).collect::<BTreeMap<_, _>>();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1,2:2,3:3");
}

#[test]
fn test_display_empty_map() {
let map = BTreeMap::<i32, Option<i32>>::new();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "");
}

#[test]
fn test_display_btreemap_opt_value_with_1_item() {
let map = (1..=1).map(|num| (num, Some(num))).collect::<BTreeMap<_, _>>();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1");
}

#[test]
fn test_display_btreemap_opt_value_with_none() {
let mut map = BTreeMap::new();
map.insert(1, Some(1));
map.insert(2, None);
map.insert(3, Some(3));
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1,2:None,3:3");
}
}
3 changes: 3 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;

pub(crate) type ReplicationMetrics<C> = BTreeMap<NodeIdOf<C>, Option<LogIdOf<C>>>;
/// Heartbeat metrics, a mapping between a node's ID and milliseconds since the
/// last acknowledged heartbeat or replication to this node.
pub(crate) type HeartbeatMetrics<C> = BTreeMap<NodeIdOf<C>, Option<u64>>;
37 changes: 27 additions & 10 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::fmt;
use std::sync::Arc;

use crate::core::ServerState;
use crate::display_ext::DisplayBTreeMapOptValue;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::LogId;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -72,6 +74,15 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
///
/// This field records a mapping between a node's ID and milliseconds since
/// the last acknowledged heartbeat or replication to this node.
///
/// This duration can be used by applications to guess if a follwer/learner
/// node is offline, longer duration suggests higher possibility of that.
pub heartbeat: Option<HeartbeatMetrics<C>>,

// ---
// --- replication ---
// ---
Expand Down Expand Up @@ -101,14 +112,12 @@ where C: RaftTypeConfig
write!(f, ", ")?;
write!(
f,
"membership:{}, snapshot:{}, purged:{}, replication:{{{}}}",
"membership:{}, snapshot:{}, purged:{}, replication:{{{}}}, heartbeat:{{{}}}",
self.membership_config,
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;

write!(f, "}}")?;
Expand Down Expand Up @@ -136,6 +145,7 @@ where C: RaftTypeConfig
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
heartbeat: None,
}
}
}
Expand Down Expand Up @@ -165,6 +175,15 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
pub millis_since_quorum_ack: Option<u64>,

pub replication: Option<ReplicationMetrics<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
///
/// This field records a mapping between a node's ID and milliseconds since
/// the last acknowledged heartbeat or replication to this node.
///
/// This duration can be used by applications to guess if a follwer/learner
/// node is offline, longer duration suggests higher possibility of that.
pub heartbeat: Option<HeartbeatMetrics<C>>,
}

impl<C> fmt::Display for RaftDataMetrics<C>
Expand All @@ -175,16 +194,14 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}, heartbeat:{{{}}}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;

write!(f, "}}")?;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ where C: RaftTypeConfig {
current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))),
heartbeat: None,

snapshot: None,
replication: None,
Expand Down
73 changes: 73 additions & 0 deletions tests/tests/metrics/t10_server_metrics_and_data_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::type_config::TypeConfigExt;
use openraft::Config;
use openraft_memstore::TypeConfig;
#[allow(unused_imports)]
use pretty_assertions::assert_eq;
#[allow(unused_imports)]
Expand Down Expand Up @@ -64,6 +66,77 @@ async fn server_metrics_and_data_metrics() -> Result<()> {
Ok(())
}

/// Test if heartbeat metrics work
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn heartbeat_metrics() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
heartbeat_interval: 50,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let leader = router.get_raft_handle(&0)?;

tracing::info!(log_index, "--- trigger heartbeat; heartbeat metrics refreshes");
let refreshed_node1;
let refreshed_node2;
{
leader.trigger().heartbeat().await?;
let metrics = leader
.wait(timeout())
.metrics(
|metrics| {
let heartbeat = metrics
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");
let node1 = heartbeat.get(&1).unwrap().unwrap();
let node2 = heartbeat.get(&2).unwrap().unwrap();

(node1 < 100) && (node2 < 100)
},
"millis_since_quorum_ack refreshed",
)
.await?;

let heartbeat = metrics
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");
refreshed_node1 = heartbeat.get(&1).unwrap().unwrap();
refreshed_node2 = heartbeat.get(&2).unwrap().unwrap();
}

tracing::info!(log_index, "--- sleep 500 ms, the interval should extend");
{
TypeConfig::sleep(Duration::from_millis(500)).await;

let metrics = leader.metrics();
let metrics_ref = metrics.borrow();
let heartbeat = metrics_ref
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");

let greater_node1 = heartbeat.get(&1).unwrap().unwrap();
let greater_node2 = heartbeat.get(&2).unwrap().unwrap();

assert!(greater_node1 > refreshed_node1);
assert!(greater_node2 > refreshed_node2);
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(500))
}

0 comments on commit d857a85

Please sign in to comment.