Skip to content

Commit

Permalink
ensure missing lags do not affect states (#136)
Browse files Browse the repository at this point in the history
* ensure missing lags do not affect states

* more defaults
  • Loading branch information
mensfeld authored Sep 18, 2023
1 parent 43f6d25 commit d857b38
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Karafka Web changelog

## 0.7.3 (2023-09-18)
- [Improvement] Mitigate a case where a race-condition during upgrade would crash data.

## 0.7.2 (2023-09-18)
- [Improvement] Display hidden by accident errors for OSS metrics.
- [Improvement] Use a five second cache for non-production environments to improve dev experience.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
karafka-web (0.7.2)
karafka-web (0.7.3)
erubi (~> 1.4)
karafka (>= 2.2.3, < 3.0.0)
karafka-core (>= 2.2.2, < 3.0.0)
Expand Down
6 changes: 3 additions & 3 deletions lib/karafka/web/processing/consumers/aggregators/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ def materialize_consumers_groups_current_state
.reject(&:negative?)

lags_stored = partitions_data
.map { |p_details| p_details.fetch(:lag_stored) }
.map { |p_details| p_details.fetch(:lag_stored, -1) }
.reject(&:negative?)

offsets_hi = partitions_data
.map { |p_details| p_details.fetch(:hi_offset) }
.map { |p_details| p_details.fetch(:hi_offset, -1) }
.reject(&:negative?)

# Last stable offsets freeze durations - we pick the max freeze to indicate
# the longest open transaction that potentially may be hanging
ls_offsets_fd = partitions_data
.map { |p_details| p_details.fetch(:ls_offset_fd) }
.map { |p_details| p_details.fetch(:ls_offset_fd, 0) }
.reject(&:negative?)

# If there is no lag that would not be negative, it means we did not mark
Expand Down
4 changes: 2 additions & 2 deletions lib/karafka/web/processing/consumers/aggregators/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def refresh_current_stats
stats[:listeners] += report_process[:listeners] || 0
stats[:processes] += 1
stats[:rss] += report_process[:memory_usage]
stats[:lag] += lags.reject(&:negative?).sum
stats[:lag_stored] += lags_stored.reject(&:negative?).sum
stats[:lag] += lags.compact.reject(&:negative?).sum
stats[:lag_stored] += lags_stored.compact.reject(&:negative?).sum
utilization += report_stats[:utilization]
end

Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
module Karafka
module Web
# Current gem version
VERSION = '0.7.2'
VERSION = '0.7.3'
end
end

0 comments on commit d857b38

Please sign in to comment.