Skip to content

Commit

Permalink
refactor(metrics): introduce UintGauge to keep consistency (#19896)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Dec 23, 2024
1 parent 3831db7 commit c5eadcc
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 38 deletions.
31 changes: 31 additions & 0 deletions src/common/metrics/src/gauge_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{AtomicU64, GenericGauge};
use prometheus::IntGauge;

/// The integer version of [`prometheus::Gauge`]. Provides better performance if metric values are
/// all unsigned integers.
pub type UintGauge = GenericGauge<AtomicU64>;

#[easy_ext::ext(IntGaugeExt)]
impl IntGauge {
/// Increment the gauge, and return a guard that will decrement the gauge when dropped.
Expand All @@ -39,3 +44,29 @@ impl IntGauge {
Guard::create(self)
}
}

#[easy_ext::ext(UintGaugeExt)]
impl UintGauge {
/// Increment the gauge, and return a guard that will decrement the gauge when dropped.
#[must_use]
pub fn inc_guard(&self) -> impl Drop + '_ {
struct Guard<'a> {
gauge: &'a UintGauge,
}

impl<'a> Guard<'a> {
fn create(gauge: &'a UintGauge) -> Self {
gauge.inc();
Self { gauge }
}
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
self.gauge.dec();
}
}

Guard::create(self)
}
}
3 changes: 3 additions & 0 deletions src/common/metrics/src/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ mod tait {
}
pub use tait::*;

use crate::UintGauge;

pub type LabelGuardedHistogramVec<const N: usize> = LabelGuardedMetricVec<VecBuilderOfHistogram, N>;
pub type LabelGuardedIntCounterVec<const N: usize> =
LabelGuardedMetricVec<VecBuilderOfCounter<AtomicU64>, N>;
Expand All @@ -155,6 +157,7 @@ pub type LabelGuardedGaugeVec<const N: usize> =
pub type LabelGuardedHistogram<const N: usize> = LabelGuardedMetric<Histogram, N>;
pub type LabelGuardedIntCounter<const N: usize> = LabelGuardedMetric<IntCounter, N>;
pub type LabelGuardedIntGauge<const N: usize> = LabelGuardedMetric<IntGauge, N>;
pub type LabelGuardedUintGauge<const N: usize> = LabelGuardedMetric<UintGauge, N>;
pub type LabelGuardedGauge<const N: usize> = LabelGuardedMetric<Gauge, N>;

pub type LabelGuardedLocalHistogram<const N: usize> = LabelGuardedMetric<LocalHistogram, N>;
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ use anyhow::{anyhow, Context};
use async_trait::async_trait;
use moka::future::Cache as MokaCache;
use moka::ops::compute::Op;
use prometheus::core::{AtomicI64, GenericGauge};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaResult;
use rdkafka::{ClientConfig, Offset, TopicPartitionList};
use risingwave_common::bail;
use risingwave_common::metrics::LabelGuardedMetric;
use risingwave_common::metrics::LabelGuardedIntGauge;

use crate::error::{ConnectorError, ConnectorResult};
use crate::source::base::SplitEnumerator;
Expand Down Expand Up @@ -60,7 +59,7 @@ pub struct KafkaSplitEnumerator {
stop_offset: KafkaEnumeratorOffset,

sync_call_timeout: Duration,
high_watermark_metrics: HashMap<i32, LabelGuardedMetric<GenericGauge<AtomicI64>, 2>>,
high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge<2>>,
}

impl KafkaSplitEnumerator {}
Expand Down
9 changes: 3 additions & 6 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use anyhow::Context;
use async_trait::async_trait;
use futures::StreamExt;
use futures_async_stream::try_stream;
use prometheus::core::{AtomicI64, GenericGauge};
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use risingwave_common::metrics::LabelGuardedMetric;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;

use crate::error::ConnectorResult as Result;
Expand Down Expand Up @@ -247,10 +246,8 @@ impl KafkaSplitReader {
)
});

let mut latest_message_id_metrics: HashMap<
String,
LabelGuardedMetric<GenericGauge<AtomicI64>, 3>,
> = HashMap::new();
let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge<3>> =
HashMap::new();

#[for_await]
'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_with_registry,
Expand Down Expand Up @@ -96,8 +96,8 @@ pub struct CursorMetrics {
pub subscription_cursor_query_duration: HistogramVec,
pub subscription_cursor_declare_duration: HistogramVec,
pub subscription_cursor_fetch_duration: HistogramVec,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subsription_cursor_nums: IntGauge,
invalid_subsription_cursor_nums: IntGauge,
subscription_cursor_last_fetch_duration: HistogramVec,
_cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
}
Expand Down Expand Up @@ -199,8 +199,8 @@ struct CursorMetricsCollector {
impl CursorMetricsCollector {
fn new(
session_map: SessionMapRef,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subsription_cursor_nums: IntGauge,
invalid_subsription_cursor_nums: IntGauge,
subscription_cursor_last_fetch_duration: HistogramVec,
) -> Self {
const COLLECT_INTERVAL_SECONDS: u64 = 60;
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/scheduler/distributed/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@

use std::sync::LazyLock;

use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_with_registry,
register_int_counter_with_registry, register_int_gauge_with_registry, Histogram, Registry,
register_int_counter_with_registry, register_int_gauge_with_registry, Histogram, IntGauge,
Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

#[derive(Clone)]
pub struct DistributedQueryMetrics {
pub running_query_num: GenericGauge<AtomicI64>,
pub running_query_num: IntGauge,
pub rejected_query_counter: GenericCounter<AtomicU64>,
pub completed_query_counter: GenericCounter<AtomicU64>,
pub query_latency: Histogram,
Expand Down
17 changes: 7 additions & 10 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use await_tree::InstrumentAwait;
use futures::FutureExt;
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use prometheus::{Histogram, IntGauge};
use risingwave_common::catalog::TableId;
use risingwave_common::metrics::UintGauge;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
use tokio::spawn;
Expand Down Expand Up @@ -62,14 +62,11 @@ pub struct BufferTracker {
flush_threshold: usize,
min_batch_flush_size: usize,
global_buffer: Arc<MemoryLimiter>,
global_upload_task_size: GenericGauge<AtomicU64>,
global_upload_task_size: UintGauge,
}

impl BufferTracker {
pub fn from_storage_opts(
config: &StorageOpts,
global_upload_task_size: GenericGauge<AtomicU64>,
) -> Self {
pub fn from_storage_opts(config: &StorageOpts, global_upload_task_size: UintGauge) -> Self {
let capacity = config.shared_buffer_capacity_mb * (1 << 20);
let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
let shared_buffer_min_batch_flush_size =
Expand All @@ -93,15 +90,15 @@ impl BufferTracker {
Self::new(
usize::MAX,
flush_threshold,
GenericGauge::new("test", "test").unwrap(),
UintGauge::new("test", "test").unwrap(),
min_batch_flush_size,
)
}

fn new(
capacity: usize,
flush_threshold: usize,
global_upload_task_size: GenericGauge<AtomicU64>,
global_upload_task_size: UintGauge,
min_batch_flush_size: usize,
) -> Self {
assert!(capacity >= flush_threshold);
Expand All @@ -116,7 +113,7 @@ impl BufferTracker {
pub fn for_test() -> Self {
Self::from_storage_opts(
&StorageOpts::default(),
GenericGauge::new("test", "test").unwrap(),
UintGauge::new("test", "test").unwrap(),
)
}

Expand All @@ -128,7 +125,7 @@ impl BufferTracker {
&self.global_buffer
}

pub fn global_upload_task_size(&self) -> &GenericGauge<AtomicU64> {
pub fn global_upload_task_size(&self) -> &UintGauge {
&self.global_upload_task_size
}

Expand Down
10 changes: 5 additions & 5 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use std::task::{ready, Context, Poll};
use futures::FutureExt;
use itertools::Itertools;
use more_asserts::assert_gt;
use prometheus::core::{AtomicU64, GenericGauge};
use prometheus::{HistogramTimer, IntGauge};
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::catalog::TableId;
use risingwave_common::metrics::UintGauge;
use risingwave_common::must_match;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, VnodeWatermark, WatermarkDirection,
Expand Down Expand Up @@ -112,14 +112,14 @@ mod uploader_imm {
use std::fmt::Formatter;
use std::ops::Deref;

use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_common::metrics::UintGauge;

use crate::hummock::event_handler::uploader::UploaderContext;
use crate::mem_table::ImmutableMemtable;

pub(super) struct UploaderImm {
inner: ImmutableMemtable,
size_guard: GenericGauge<AtomicU64>,
size_guard: UintGauge,
}

impl UploaderImm {
Expand All @@ -137,7 +137,7 @@ mod uploader_imm {
pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
Self {
inner: imm,
size_guard: GenericGauge::new("test", "test").unwrap(),
size_guard: UintGauge::new("test", "test").unwrap(),
}
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ struct UploadingTask {
join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
task_info: UploadTaskInfo,
spawn_upload_task: SpawnUploadTask,
task_size_guard: GenericGauge<AtomicU64>,
task_size_guard: UintGauge,
task_count_guard: IntGauge,
}

Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/monitor/hummock_state_store_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::{Arc, OnceLock};

use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGauge};
use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
use prometheus::{
exponential_buckets, histogram_opts, proto, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
Expand All @@ -23,7 +23,7 @@ use prometheus::{
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
RelabeledHistogramVec, RelabeledMetricVec,
RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
Expand Down Expand Up @@ -73,9 +73,9 @@ pub struct HummockStateStoreMetrics {
pub spill_task_size_from_sealed: GenericCounter<AtomicU64>,

// uploading task
pub uploader_uploading_task_size: GenericGauge<AtomicU64>,
pub uploader_uploading_task_size: UintGauge,
pub uploader_uploading_task_count: IntGauge,
pub uploader_imm_size: GenericGauge<AtomicU64>,
pub uploader_imm_size: UintGauge,
pub uploader_upload_task_latency: Histogram,
pub uploader_syncing_epoch_count: IntGauge,
pub uploader_wait_poll_latency: Histogram,
Expand Down Expand Up @@ -312,7 +312,7 @@ impl HummockStateStoreMetrics {
)
.unwrap();

let uploader_uploading_task_size = GenericGauge::new(
let uploader_uploading_task_size = UintGauge::new(
"state_store_uploader_uploading_task_size",
"Total size of uploader uploading tasks",
)
Expand All @@ -328,7 +328,7 @@ impl HummockStateStoreMetrics {
)
.unwrap();

let uploader_imm_size = GenericGauge::new(
let uploader_imm_size = UintGauge::new(
"state_store_uploader_imm_size",
"Total size of imms tracked by uploader",
)
Expand Down

0 comments on commit c5eadcc

Please sign in to comment.