Skip to content

Commit

Permalink
Merge pull request #736 from twmb/726
Browse files Browse the repository at this point in the history
kgo: avoid / wakeup lingering if we hit max bytes or max records
  • Loading branch information
twmb authored May 26, 2024
2 parents eed3183 + db24bbf commit ad5b742
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
20 changes: 20 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type producer struct {
// field on recBufs that is toggled on flush. If we did, then a new
// recBuf could be created and records sent to while we are flushing.
flushing atomicI32 // >0 if flushing, can Flush many times concurrently
blocked atomicI32 // >0 if over max recs or bytes

aborting atomicI32 // >0 if aborting, can abort many times concurrently

Expand Down Expand Up @@ -425,13 +426,20 @@ func (cl *Client) produce(
"over_max_records", overMaxRecs,
"over_max_bytes", overMaxBytes,
)
// Before we potentially unlinger, add that we are blocked.
// Lingering always checks blocked, so we will not start a
// linger while we are blocked. We THEN wakeup anything that
// is actively lingering.
cl.producer.blocked.Add(1)
cl.unlingerDueToMaxRecsBuffered()
// If the client ctx cancels or the produce ctx cancels, we
// need to un-count our buffering of this record. We also need
// to drain a slot from the waitBuffer chan, which could be
// sent to right when we are erroring.
drainBuffered := func(err error) {
p.promiseRecord(promisedRec{ctx, promise, r}, err)
<-p.waitBuffer
cl.producer.blocked.Add(-1)
}
if !block || cl.cfg.manualFlushing {
drainBuffered(ErrMaxBuffered)
Expand Down Expand Up @@ -963,6 +971,18 @@ func (cl *Client) waitUnknownTopic(
})
}

func (cl *Client) unlingerDueToMaxRecsBuffered() {
if cl.cfg.linger <= 0 {
return
}
for _, parts := range cl.producer.topics.load() {
for _, part := range parts.load().partitions {
part.records.unlingerAndManuallyDrain()
}
}
cl.cfg.logger.Log(LogLevelDebug, "unlingered all partitions due to hitting max buffered")
}

// Flush hangs waiting for all buffered records to be flushed, stopping all
// lingers if necessary.
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ func (recBuf *recBuf) tryStopLingerForDraining() bool {

// Begins a linger timer unless the producer is being flushed.
func (recBuf *recBuf) lockedMaybeStartLinger() bool {
if recBuf.cl.producer.flushing.Load() > 0 {
if recBuf.cl.producer.flushing.Load() > 0 || recBuf.cl.producer.blocked.Load() > 0 {
return false
}
recBuf.lingering = time.AfterFunc(recBuf.cl.cfg.linger, recBuf.sink.maybeDrain)
Expand Down

0 comments on commit ad5b742

Please sign in to comment.