diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index cd76684b..1c915899 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -861,12 +861,6 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { fetchDone := make(chan struct{}) defer func() { <-fetchDone }() - // If cooperative consuming, we may have to resume fetches. See the - // comment on adjustCooperativeFetchOffsets. - if g.cooperative.Load() { - added = g.adjustCooperativeFetchOffsets(added, lost) - } - // Before we fetch offsets, we wait for the user's onAssign callback to // be done. This ensures a few things: // @@ -883,6 +877,18 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // necessarily run onRevoke before returning (because of a fatal // error). s.assign(g, added) + + // If cooperative consuming, we may have to resume fetches. See the + // comment on adjustCooperativeFetchOffsets. + // + // We do this AFTER the user's callback. If we add more partitions + // to `added` that are from a previously canceled fetch, we do NOT + // want to pass those fetch-resumed partitions to the user callback + // again. See #705. + if g.cooperative.Load() { + added = g.adjustCooperativeFetchOffsets(added, lost) + } + <-s.assignDone if len(added) > 0 {