From d857b38599253a9438c28ccb7efaf08653d90872 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 18 Sep 2023 21:46:18 +0200 Subject: [PATCH] ensure missing lags do not affect states (#136) * ensure missing lags do not affect states * more defaults --- CHANGELOG.md | 3 +++ Gemfile.lock | 2 +- lib/karafka/web/processing/consumers/aggregators/metrics.rb | 6 +++--- lib/karafka/web/processing/consumers/aggregators/state.rb | 4 ++-- lib/karafka/web/version.rb | 2 +- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6341ef2f..440eaee2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Karafka Web changelog +## 0.7.3 (2023-09-18) +- [Improvement] Mitigate a case where a race-condition during upgrade would crash data. + ## 0.7.2 (2023-09-18) - [Improvement] Display hidden by accident errors for OSS metrics. - [Improvement] Use a five second cache for non-production environments to improve dev experience. diff --git a/Gemfile.lock b/Gemfile.lock index 7f8b46d0..1d457f10 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - karafka-web (0.7.2) + karafka-web (0.7.3) erubi (~> 1.4) karafka (>= 2.2.3, < 3.0.0) karafka-core (>= 2.2.2, < 3.0.0) diff --git a/lib/karafka/web/processing/consumers/aggregators/metrics.rb b/lib/karafka/web/processing/consumers/aggregators/metrics.rb index 0d0a3545..3e58ac3a 100644 --- a/lib/karafka/web/processing/consumers/aggregators/metrics.rb +++ b/lib/karafka/web/processing/consumers/aggregators/metrics.rb @@ -106,17 +106,17 @@ def materialize_consumers_groups_current_state .reject(&:negative?) lags_stored = partitions_data - .map { |p_details| p_details.fetch(:lag_stored) } + .map { |p_details| p_details.fetch(:lag_stored, -1) } .reject(&:negative?) offsets_hi = partitions_data - .map { |p_details| p_details.fetch(:hi_offset) } + .map { |p_details| p_details.fetch(:hi_offset, -1) } .reject(&:negative?) # Last stable offsets freeze durations - we pick the max freeze to indicate # the longest open transaction that potentially may be hanging ls_offsets_fd = partitions_data - .map { |p_details| p_details.fetch(:ls_offset_fd) } + .map { |p_details| p_details.fetch(:ls_offset_fd, 0) } .reject(&:negative?) # If there is no lag that would not be negative, it means we did not mark diff --git a/lib/karafka/web/processing/consumers/aggregators/state.rb b/lib/karafka/web/processing/consumers/aggregators/state.rb index 54bc9661..1b542e0c 100644 --- a/lib/karafka/web/processing/consumers/aggregators/state.rb +++ b/lib/karafka/web/processing/consumers/aggregators/state.rb @@ -152,8 +152,8 @@ def refresh_current_stats stats[:listeners] += report_process[:listeners] || 0 stats[:processes] += 1 stats[:rss] += report_process[:memory_usage] - stats[:lag] += lags.reject(&:negative?).sum - stats[:lag_stored] += lags_stored.reject(&:negative?).sum + stats[:lag] += lags.compact.reject(&:negative?).sum + stats[:lag_stored] += lags_stored.compact.reject(&:negative?).sum utilization += report_stats[:utilization] end diff --git a/lib/karafka/web/version.rb b/lib/karafka/web/version.rb index 1efcd1bc..6cc80e24 100644 --- a/lib/karafka/web/version.rb +++ b/lib/karafka/web/version.rb @@ -3,6 +3,6 @@ module Karafka module Web # Current gem version - VERSION = '0.7.2' + VERSION = '0.7.3' end end