Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in Produce() / TryProduce() when kgo.MaxBufferedBytes() is configured #777

Closed
pracucci opened this issue Jul 16, 2024 · 2 comments · Fixed by #787
Closed

Deadlock in Produce() / TryProduce() when kgo.MaxBufferedBytes() is configured #777

pracucci opened this issue Jul 16, 2024 · 2 comments · Fixed by #787
Labels
bug Something isn't working has pr

Comments

@pracucci
Copy link
Contributor

pracucci commented Jul 16, 2024

In Mimir we found a deadlock in Produce() and TryProduce() when kgo.MaxBufferedBytes() is configured and the limit hit.

The gist of the root cause of such deadlock is that there's no guarantee that all goroutines waiting for p.waitBuffer will be released, because there are conditions under which the number of messages sent to p.waitBuffer channel is different than the number of goroutines waiting for it.

Details

Produce() and TryProduce() wait for p.waitBuffer when the buffer limit is reached:

drainBuffered := func(err error) {
p.promiseRecord(promisedRec{ctx, promise, r}, err)
<-p.waitBuffer
cl.producer.blocked.Add(-1)
}
if !block || cl.cfg.manualFlushing {
drainBuffered(ErrMaxBuffered)
return
}
select {
case <-p.waitBuffer:

A message is published to p.waitBuffer each time a record completes, if the client detect that such record was over the bytes limit:

// Capture user size before potential modification by the promise.
userSize := pr.userSize()
nowBufBytes := p.bufferedBytes.Add(-userSize)
nowBufRecs := p.bufferedRecords.Add(-1)
wasOverMaxRecs := nowBufRecs >= cl.cfg.maxBufferedRecords
wasOverMaxBytes := cl.cfg.maxBufferedBytes > 0 && nowBufBytes+userSize > cl.cfg.maxBufferedBytes
// We call the promise before finishing the record; this allows users
// of Flush to know that all buffered records are completely done
// before Flush returns.
pr.promise(pr.Record, err)
if wasOverMaxRecs || wasOverMaxBytes {
p.waitBuffer <- struct{}{}
} else if nowBufRecs == 0 && p.flushing.Load() > 0 {

In case of "max buffered records" (which does not suffer this issue) each record has the same "weight": each record accounts for +1 in the p.bufferedRecords accumulator. However, in the case of "max buffered bytes" limit, different records don't have the same "weight", because each record has a different userSize(). This means that the computation of wasOverMaxBytes is screwed up, depending on which record gets completed first.

Example

Let's assume we set kgo.MaxBufferedBytes() to 100 bytes and the following workflow happen in order:

  1. Produce() record A (100 bytes)
  2. Produce() record B (50 bytes), waiting for buffer to free
  3. Produce() record C (50 bytes), waiting for buffer to free
  4. Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
  5. Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer

Reproduction test

I've written a test reproducing the deadlock:
https://github.com/twmb/franz-go/compare/pracucci:reproduce-produce-deadlock

@twmb
Copy link
Owner

twmb commented Jul 17, 2024

Good find. Offhand, I think the fix is to convert this to a sync.Cond and have every waiter be notified and inspect the conditions, but I'll have to stare a bit at how to do this.

@twmb twmb added the bug Something isn't working label Jul 17, 2024
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
@twmb
Copy link
Owner

twmb commented Jul 22, 2024

If you're able to see why #787 fails in CI, that may help accelerate a fix here. So far, the error is not reproducing locally for me.

@twmb twmb added the has pr label Jul 22, 2024
twmb added a commit that referenced this issue Jul 22, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes; the fix for #777 could not really be
possible unless we avoid counting them. Specifically, we need to have a
goroutine looping with a sync.Cond that checks *IF* we add the record,
will we still be blocked? This allows us to wake up all blocked
goroutines always (unlike one at a time, the problem this issue points
out), and each goroutine can check under a lock if they still do not
fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
twmb added a commit that referenced this issue Jul 29, 2024
Copying from the issue,
"""
1) Produce() record A (100 bytes)
2) Produce() record B (50 bytes), waiting for buffer to free
3) Produce() record C (50 bytes), waiting for buffer to free
4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer
"""

The fix requires adding a lock while producing. This reuses the existing
lock on the `producer` type. This can lead to a few more spurious
wakeups in other functions that use this same mutex, but that's fine.

The prior algorithm counted anything to produce immediately into the
buffered records and bytes fields; the fix for #777 could not really be
possible unless we avoid counting the "buffered" aspect right away.
Specifically, we need to have a goroutine looping with a sync.Cond that
checks *IF* we add the record, will we still be blocked? This allows us
to wake up all blocked goroutines always (unlike one at a time, the
problem this issue points out), and each goroutine can check under a
lock if they still do not fit.

This also fixes an unreported bug where, if a record WOULD be blocked
but fails early due to no topic / not in a transaction while in a
transactional client, the serial promise finishing goroutine would
deadlock.

Closes #777.
@twmb twmb closed this as completed in #787 Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working has pr
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants