Skip to content

Commit

Permalink
feat(metrics): fragment level streaming metrics (part 1) (#12634)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Oct 7, 2023
1 parent a46292e commit 04f33a1
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 73 deletions.
2 changes: 2 additions & 0 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ pub struct ActorContext {
pub id: ActorId,
pub fragment_id: u32,

// TODO(eric): these seem to be useless now?
last_mem_val: Arc<AtomicUsize>,
cur_mem_val: Arc<AtomicUsize>,
total_mem_val: Arc<TrAdder<i64>>,

pub streaming_metrics: Arc<StreamingMetrics>,
pub error_suppressor: Arc<Mutex<ErrorSuppressor>>,
}
Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl<S: StateStore> ColumnDeduplicater<S> {
.map(|_| BitmapBuilder::zeroed(column.len()))
.collect_vec();
let actor_id_str = ctx.id.to_string();
let fragment_id_str = ctx.fragment_id.to_string();
let table_id_str = dedup_table.table_id().to_string();
for (datum_idx, (op, datum)) in ops.iter().zip_eq_fast(column.iter()).enumerate() {
// skip if this item is hidden to all agg calls (this is likely to happen)
Expand All @@ -85,7 +86,7 @@ impl<S: StateStore> ColumnDeduplicater<S> {
self.metrics_info
.metrics
.agg_distinct_total_cache_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// TODO(yuhao): avoid this `contains`.
// https://github.com/risingwavelabs/risingwave/issues/9233
Expand All @@ -95,7 +96,7 @@ impl<S: StateStore> ColumnDeduplicater<S> {
self.metrics_info
.metrics
.agg_distinct_cache_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// load from table into the cache
let counts = if let Some(counts_row) =
Expand Down Expand Up @@ -190,11 +191,12 @@ impl<S: StateStore> ColumnDeduplicater<S> {
// WARN: if you want to change to batching the write to table. please remember to change
// `self.cache.evict()` too.
let actor_id_str = ctx.id.to_string();
let fragment_id_str = ctx.fragment_id.to_string();
let table_id_str = dedup_table.table_id().to_string();
self.metrics_info
.metrics
.agg_distinct_cached_entry_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.cache.len() as i64);
self.cache.evict();
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl RemoteInput {
let msg_res = Message::from_protobuf(&msg);
metrics
.actor_sampled_deserialize_duration_ns
.with_label_values(&[&down_actor_id])
.with_label_values(&[&down_actor_id, &down_fragment_id])
.inc_by(start_time.elapsed().as_nanos() as u64);
msg_res
} else {
Expand Down
27 changes: 15 additions & 12 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Update the metrics.
let actor_id_str = this.actor_ctx.id.to_string();
let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.intermediate_state_table.table_id().to_string();
let metric_dirty_count = this
.metrics
.agg_dirty_group_count
.with_label_values(&[&table_id_str, &actor_id_str]);
let metric_dirty_count = this.metrics.agg_dirty_group_count.with_label_values(&[
&table_id_str,
&actor_id_str,
&fragment_id_str,
]);
let metric_dirty_heap_size = this
.metrics
.agg_dirty_group_heap_size
.with_label_values(&[&table_id_str, &actor_id_str]);
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]);
let new_group_size = agg_group.estimated_size();
if let Some(old_group_size) = old_group_size {
match new_group_size.cmp(&old_group_size) {
Expand Down Expand Up @@ -448,29 +450,30 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
) {
// Update metrics.
let actor_id_str = this.actor_ctx.id.to_string();
let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.intermediate_state_table.table_id().to_string();
this.metrics
.agg_lookup_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.lookup_miss_count);
vars.stats.lookup_miss_count = 0;
this.metrics
.agg_total_lookup_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.total_lookup_count);
vars.stats.total_lookup_count = 0;
this.metrics
.agg_cached_keys
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(vars.agg_group_cache.len() as i64);
this.metrics
.agg_chunk_lookup_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.chunk_lookup_miss_count);
vars.stats.chunk_lookup_miss_count = 0;
this.metrics
.agg_chunk_total_lookup_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.chunk_total_lookup_count);
vars.stats.chunk_total_lookup_count = 0;

Expand Down Expand Up @@ -552,11 +555,11 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
vars.dirty_groups_heap_size.set(0);
this.metrics
.agg_dirty_group_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(0);
this.metrics
.agg_dirty_group_heap_size
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(0);

// Yield the remaining rows in chunk builder.
Expand Down
15 changes: 9 additions & 6 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
pk_contained_in_jk_l,
metrics.clone(),
ctx.id,
ctx.fragment_id,
"left",
),
join_key_indices: join_key_indices_l,
Expand Down Expand Up @@ -666,6 +667,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
pk_contained_in_jk_r,
metrics.clone(),
ctx.id,
ctx.fragment_id,
"right",
),
join_key_indices: join_key_indices_r,
Expand Down Expand Up @@ -712,6 +714,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
// The first barrier message should be propagated.
yield Message::Barrier(barrier);
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
let mut start_time = Instant::now();

while let Some(msg) = aligned_stream
Expand All @@ -721,7 +724,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
{
self.metrics
.join_actor_input_waiting_duration_ns
.with_label_values(&[&actor_id_str])
.with_label_values(&[&actor_id_str, &fragment_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
match msg? {
AlignedMessage::WatermarkLeft(watermark) => {
Expand Down Expand Up @@ -761,7 +764,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
left_time += left_start_time.elapsed();
self.metrics
.join_match_duration_ns
.with_label_values(&[&actor_id_str, "left"])
.with_label_values(&[&actor_id_str, &fragment_id_str, "left"])
.inc_by(left_time.as_nanos() as u64);
}
AlignedMessage::Right(chunk) => {
Expand All @@ -787,7 +790,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
right_time += right_start_time.elapsed();
self.metrics
.join_match_duration_ns
.with_label_values(&[&actor_id_str, "right"])
.with_label_values(&[&actor_id_str, &fragment_id_str, "right"])
.inc_by(right_time.as_nanos() as u64);
}
AlignedMessage::Barrier(barrier) => {
Expand Down Expand Up @@ -817,17 +820,17 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
// chunk would never be selected.
// self.metrics
// .join_cached_rows
// .with_label_values(&[&actor_id_str, side])
// .with_label_values(&[&actor_id_str, &fragment_id_str, side])
// .set(ht.cached_rows() as i64);
self.metrics
.join_cached_entries
.with_label_values(&[&actor_id_str, side])
.with_label_values(&[&actor_id_str, &fragment_id_str, side])
.set(ht.entry_count() as i64);
}

self.metrics
.join_match_duration_ns
.with_label_values(&[&actor_id_str, "barrier"])
.with_label_values(&[&actor_id_str, &fragment_id_str, "barrier"])
.inc_by(barrier_start_time.elapsed().as_nanos() as u64);
yield Message::Barrier(barrier);
}
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,11 @@ impl<S: StateStore> LookupExecutor<S> {
.into_owned_row();
let table_id_str = self.arrangement.storage_table.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
self.ctx
.streaming_metrics
.lookup_total_query_cache_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
return Ok(result.iter().cloned().collect_vec());
Expand All @@ -384,7 +385,7 @@ impl<S: StateStore> LookupExecutor<S> {
self.ctx
.streaming_metrics
.lookup_cache_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();

tracing::trace!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
Expand Down Expand Up @@ -433,7 +434,7 @@ impl<S: StateStore> LookupExecutor<S> {
self.ctx
.streaming_metrics
.lookup_cached_entry_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.lookup_cache.len() as i64);

Ok(all_rows.into_inner())
Expand Down
10 changes: 9 additions & 1 deletion src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::monitor::StreamingMetrics;
use crate::task::{ActorId, AtomicU64Ref};
use crate::task::{ActorId, AtomicU64Ref, FragmentId};

type DegreeType = u64;

Expand Down Expand Up @@ -161,6 +161,7 @@ pub struct JoinHashMapMetrics {
metrics: Arc<StreamingMetrics>,
/// Basic information
actor_id: String,
fragment_id: String,
join_table_id: String,
degree_table_id: String,
side: &'static str,
Expand All @@ -175,13 +176,15 @@ impl JoinHashMapMetrics {
pub fn new(
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
fragment_id: FragmentId,
side: &'static str,
join_table_id: u32,
degree_table_id: u32,
) -> Self {
Self {
metrics,
actor_id: actor_id.to_string(),
fragment_id: fragment_id.to_string(),
join_table_id: join_table_id.to_string(),
degree_table_id: degree_table_id.to_string(),
side,
Expand All @@ -199,6 +202,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
&self.fragment_id,
])
.inc_by(self.lookup_miss_count as u64);
self.metrics
Expand All @@ -208,6 +212,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
&self.fragment_id,
])
.inc_by(self.total_lookup_count as u64);
self.metrics
Expand All @@ -217,6 +222,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
&self.fragment_id,
])
.inc_by(self.insert_cache_miss_count as u64);
self.total_lookup_count = 0;
Expand Down Expand Up @@ -284,6 +290,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
pk_contained_in_jk: bool,
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
fragment_id: FragmentId,
side: &'static str,
) -> Self {
let alloc = StatsAlloc::new(Global).shared();
Expand Down Expand Up @@ -335,6 +342,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
metrics: JoinHashMapMetrics::new(
metrics,
actor_id,
fragment_id,
side,
join_table_id,
degree_table_id,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl MergeExecutor {
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
.with_label_values(&[&actor_id_str])
.with_label_values(&[&actor_id_str, &fragment_id_str])
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
Expand Down
Loading

0 comments on commit 04f33a1

Please sign in to comment.