Skip to content

Commit

Permalink
fix: check offsets across topics for lag
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor committed Jan 17, 2024
1 parent b0e944c commit cda17e3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
1 change: 0 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ builds:

archives:
- format: tar.gz
rlcp: true
files:
- config.toml.sample
- README.md
Expand Down
12 changes: 4 additions & 8 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,25 +349,21 @@ func waitTries(ctx context.Context, b time.Duration) {
}
}

// thresholdExceeded checks if the difference between the offsets is breaching the threshold
// thresholdExceeded checks if the difference between the sum of offsets in all topics is breaching the threshold
func thresholdExceeded(offsetsX, offsetsY kadm.ListedOffsets, max int64) bool {
var diff int64
for t, po := range offsetsX {
for p, x := range po {
y, ok := offsetsY.Lookup(t, p)
if !ok {
continue
}

// check if the difference is breaching threshold
if y.Offset < x.Offset {
if (x.Offset - y.Offset) >= max {
return true
}
}
diff += (x.Offset - y.Offset)
}
}

return false
return diff >= max
}

// isCurrentNode checks if group is active and assigned the topics
Expand Down

0 comments on commit cda17e3

Please sign in to comment.