Skip to content

Commit

Permalink
Merge pull request #672 from sudo-sturbia/mark-commit-offsets
Browse files Browse the repository at this point in the history
pkg/kgo: allow marking using an offset
  • Loading branch information
twmb authored May 26, 2024
2 parents 2bc29e6 + 929d564 commit 8807955
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,44 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
}
}

// MarkCommitOffsets marks offsets to be available for autocommitting. This
// function is only useful if you use the AutoCommitMarks config option, see
// the documentation on that option for more details. This function does not
// allow rewinds.
func (cl *Client) MarkCommitOffsets(unmarked map[string]map[int32]EpochOffset) {
g := cl.consumer.g
if g == nil || !cl.cfg.autocommitMarks {
return
}

// protect g.uncommitted map
g.mu.Lock()
defer g.mu.Unlock()

if g.uncommitted == nil {
g.uncommitted = make(uncommitted)
}

for topic, partitions := range unmarked {
curPartitions := g.uncommitted[topic]
if curPartitions == nil {
curPartitions = make(map[int32]uncommit)
g.uncommitted[topic] = curPartitions
}

for partition, newHead := range partitions {
current := curPartitions[partition]
if current.head.Less(newHead) {
curPartitions[partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
head: newHead,
}
}
}
}
}

// CommitUncommittedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has uncommitted offsets.
// Retryable errors are retried up to the configured retry limit, and any
Expand Down

0 comments on commit 8807955

Please sign in to comment.