Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: store heartbeat metric in RaftMetrics and RaftDataMetrics #1177

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::error::RPCError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
Expand Down Expand Up @@ -223,7 +224,7 @@ where
let res = self.do_main(rx_shutdown).instrument(span).await;

// Flush buffered metrics
self.report_metrics(None);
self.report_metrics(None, None);

// Safe unwrap: res is Result<Infallible, _>
let err = res.unwrap_err();
Expand Down Expand Up @@ -257,7 +258,7 @@ where
self.run_engine_commands().await?;

// Initialize metrics.
self.report_metrics(None);
self.report_metrics(None, None);

self.runtime_loop(rx_shutdown).await
}
Expand Down Expand Up @@ -514,18 +515,36 @@ where

#[tracing::instrument(level = "debug", skip_all)]
pub fn flush_metrics(&mut self) {
let leader_metrics = if let Some(leader) = self.engine.leader.as_ref() {
let prog = &leader.progress;
Some(prog.iter().map(|(id, p)| (*id, *p.borrow())).collect())
let (replication, heartbeat) = if let Some(leader) = self.engine.leader.as_ref() {
let replication_prog = &leader.progress;
let replication = Some(replication_prog.iter().map(|(id, p)| (*id, *p.borrow())).collect());

let clock_prog = &leader.clock_progress;
let heartbeat = Some(
clock_prog
.iter()
.map(|(id, opt_t)| {
let millis_since_last_ack = opt_t.map(|t| t.elapsed().as_millis() as u64);

(*id, millis_since_last_ack)
})
.collect(),
);

(replication, heartbeat)
} else {
None
(None, None)
};
self.report_metrics(leader_metrics);
self.report_metrics(replication, heartbeat);
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C>>) {
pub(crate) fn report_metrics(
&mut self,
replication: Option<ReplicationMetrics<C>>,
heartbeat: Option<HeartbeatMetrics<C>>,
) {
let last_quorum_acked = self.last_quorum_acked_time();
let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);

Expand All @@ -551,6 +570,7 @@ where
current_leader,
millis_since_quorum_ack,
membership_config: membership_config.clone(),
heartbeat: heartbeat.clone(),

// --- replication ---
replication: replication.clone(),
Expand All @@ -563,6 +583,7 @@ where
purged: st.io_purged().copied(),
millis_since_quorum_ack,
replication,
heartbeat,
};

let server_metrics = RaftServerMetrics {
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.

pub(crate) mod display_btreemap_opt_value;
pub(crate) mod display_instant;
pub(crate) mod display_option;
pub(crate) mod display_result;
pub(crate) mod display_slice;

pub(crate) use display_btreemap_opt_value::DisplayBTreeMapOptValue;
#[allow(unused_imports)]
pub(crate) use display_btreemap_opt_value::DisplayBtreeMapOptValueExt;
#[allow(unused_imports)]
pub(crate) use display_instant::DisplayInstant;
pub(crate) use display_instant::DisplayInstantExt;
Expand Down
81 changes: 81 additions & 0 deletions openraft/src/display_ext/display_btreemap_opt_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::collections::BTreeMap;
use std::fmt;

use super::DisplayOption;

/// Implement `Display` for `BTreeMap<K, Option<V>>` if `K` and `V` are `Display`.
///
/// It formats a key-value pair in a string like "{key}:{opt_value}", and
/// concatenates the key-value pairs with comma.
///
/// For how to format the `opt_value`, see [`DisplayOption`].
pub(crate) struct DisplayBTreeMapOptValue<'a, K: fmt::Display, V: fmt::Display>(pub &'a BTreeMap<K, Option<V>>);

impl<'a, K: fmt::Display, V: fmt::Display> fmt::Display for DisplayBTreeMapOptValue<'a, K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let len = self.0.len();
for (idx, (key, value)) in self.0.iter().enumerate() {
write!(f, "{}:{}", key, DisplayOption(value))?;
if idx + 1 != len {
write!(f, ",")?;
}
}

Ok(())
}
}

#[allow(unused)]
pub(crate) trait DisplayBtreeMapOptValueExt<'a, K: fmt::Display, V: fmt::Display> {
fn display(&'a self) -> DisplayBTreeMapOptValue<'a, K, V>;
}

impl<K, V> DisplayBtreeMapOptValueExt<'_, K, V> for BTreeMap<K, Option<V>>
where
K: fmt::Display,
V: fmt::Display,
{
fn display(&self) -> DisplayBTreeMapOptValue<K, V> {
DisplayBTreeMapOptValue(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_display_btreemap_opt_value() {
let map = (1..=3).map(|num| (num, Some(num))).collect::<BTreeMap<_, _>>();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1,2:2,3:3");
}

#[test]
fn test_display_empty_map() {
let map = BTreeMap::<i32, Option<i32>>::new();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "");
}

#[test]
fn test_display_btreemap_opt_value_with_1_item() {
let map = (1..=1).map(|num| (num, Some(num))).collect::<BTreeMap<_, _>>();
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1");
}

#[test]
fn test_display_btreemap_opt_value_with_none() {
let mut map = BTreeMap::new();
map.insert(1, Some(1));
map.insert(2, None);
map.insert(3, Some(3));
let display = DisplayBTreeMapOptValue(&map);

assert_eq!(display.to_string(), "1:1,2:None,3:3");
}
}
3 changes: 3 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;

pub(crate) type ReplicationMetrics<C> = BTreeMap<NodeIdOf<C>, Option<LogIdOf<C>>>;
/// Heartbeat metrics, a mapping between a node's ID and milliseconds since the
/// last acknowledged heartbeat or replication to this node.
pub(crate) type HeartbeatMetrics<C> = BTreeMap<NodeIdOf<C>, Option<u64>>;
37 changes: 27 additions & 10 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::fmt;
use std::sync::Arc;

use crate::core::ServerState;
use crate::display_ext::DisplayBTreeMapOptValue;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::LogId;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -72,6 +74,15 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
///
/// This field records a mapping between a node's ID and milliseconds since
/// the last acknowledged heartbeat or replication to this node.
///
/// This duration can be used by applications to guess if a follwer/learner
/// node is offline, longer duration suggests higher possibility of that.
pub heartbeat: Option<HeartbeatMetrics<C>>,

// ---
// --- replication ---
// ---
Expand Down Expand Up @@ -101,14 +112,12 @@ where C: RaftTypeConfig
write!(f, ", ")?;
write!(
f,
"membership:{}, snapshot:{}, purged:{}, replication:{{{}}}",
"membership:{}, snapshot:{}, purged:{}, replication:{{{}}}, heartbeat:{{{}}}",
self.membership_config,
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;

write!(f, "}}")?;
Expand Down Expand Up @@ -136,6 +145,7 @@ where C: RaftTypeConfig
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
heartbeat: None,
}
}
}
Expand Down Expand Up @@ -165,6 +175,15 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
pub millis_since_quorum_ack: Option<u64>,

pub replication: Option<ReplicationMetrics<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
///
/// This field records a mapping between a node's ID and milliseconds since
/// the last acknowledged heartbeat or replication to this node.
///
/// This duration can be used by applications to guess if a follwer/learner
/// node is offline, longer duration suggests higher possibility of that.
pub heartbeat: Option<HeartbeatMetrics<C>>,
}

impl<C> fmt::Display for RaftDataMetrics<C>
Expand All @@ -175,16 +194,14 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}, heartbeat:{{{}}}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;

write!(f, "}}")?;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ where C: RaftTypeConfig {
current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))),
heartbeat: None,

snapshot: None,
replication: None,
Expand Down
73 changes: 73 additions & 0 deletions tests/tests/metrics/t10_server_metrics_and_data_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::type_config::TypeConfigExt;
use openraft::Config;
use openraft_memstore::TypeConfig;
#[allow(unused_imports)]
use pretty_assertions::assert_eq;
#[allow(unused_imports)]
Expand Down Expand Up @@ -64,6 +66,77 @@ async fn server_metrics_and_data_metrics() -> Result<()> {
Ok(())
}

/// Test if heartbeat metrics work
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn heartbeat_metrics() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
heartbeat_interval: 50,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let leader = router.get_raft_handle(&0)?;

tracing::info!(log_index, "--- trigger heartbeat; heartbeat metrics refreshes");
let refreshed_node1;
let refreshed_node2;
{
leader.trigger().heartbeat().await?;
let metrics = leader
.wait(timeout())
.metrics(
|metrics| {
let heartbeat = metrics
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");
let node1 = heartbeat.get(&1).unwrap().unwrap();
let node2 = heartbeat.get(&2).unwrap().unwrap();

(node1 < 100) && (node2 < 100)
},
"millis_since_quorum_ack refreshed",
)
.await?;

let heartbeat = metrics
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");
refreshed_node1 = heartbeat.get(&1).unwrap().unwrap();
refreshed_node2 = heartbeat.get(&2).unwrap().unwrap();
}

tracing::info!(log_index, "--- sleep 500 ms, the interval should extend");
{
TypeConfig::sleep(Duration::from_millis(500)).await;

let metrics = leader.metrics();
let metrics_ref = metrics.borrow();
let heartbeat = metrics_ref
.heartbeat
.as_ref()
.expect("expect heartbeat to be Some as metrics come from the leader node");

let greater_node1 = heartbeat.get(&1).unwrap().unwrap();
let greater_node2 = heartbeat.get(&2).unwrap().unwrap();

assert!(greater_node1 > refreshed_node1);
assert!(greater_node2 > refreshed_node2);
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(500))
}
Loading