Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various BaseProducer::flush fixes #748

Merged
merged 15 commits into from
Dec 12, 2024
Merged

Conversation

davidblewett
Copy link
Collaborator

Refactor BaseProducer::poll to not return early, but instead continue processing events until the passed timeout.

Refactor BaseProducer::flush to spend most time in librdkafka, and whatever is left in BaseProducer::poll.

continue processing events until the passed timeout.

Refactor `BaseProducer::flush` to spend most time in `librdkafka`,
and whatever is left in `BaseProducer::poll`.
src/util.rs Outdated

impl From<Deadline> for Timeout {
fn from(d: Deadline) -> Timeout {
if let Deadline::Never = d {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could use impl From<&Deadline> for Timeout here to not duplicate the logic?

src/util.rs Outdated Show resolved Hide resolved
@davidblewett davidblewett force-pushed the davidblewett/producer-flush branch from 91bf1a6 to 2672739 Compare December 12, 2024 17:03
@davidblewett davidblewett force-pushed the davidblewett/producer-flush branch from e77b3ac to ad3e065 Compare December 12, 2024 18:44
@davidblewett davidblewett merged commit 87b1d90 into master Dec 12, 2024
5 of 9 checks passed
@davidblewett davidblewett deleted the davidblewett/producer-flush branch December 12, 2024 18:58
davidblewett added a commit that referenced this pull request Dec 12, 2024
* Refactor `BaseProducer::poll` to not return early, but instead
continue processing events until the passed timeout.

Refactor `BaseProducer::flush` to spend most time in `librdkafka`,
and whatever is left in `BaseProducer::poll`.

* Simplify and rely on cast truncation.

* Fix logic error so that we always poll at least once even when
timeout is `Duration::ZERO`.

* Introduce Deadline type to simplify `BaseProducer::flush` and
`BaseProducer::poll`.

* Add `From<Timeout>` impl for `Deadline`

* Ensure we always call `poll_event` at least once, even if we have
`Timeout::After<Duration::ZERO>` for a non-blocking call.

* Allow Deadline to express `Timeout::Never` losslessly.

* Refactor poll loop to be more idiomatic.

* Centralize clamping logic to Deadline.

* Remove extraneous From impl.

* Simplify `BaseProducer::poll` to rely on `From` impl.

* Don't block forever in poll when flushing.

* Remove this clamp, in favor of relying on remaining_millis_i32.

* Ensure we always poll even if we get a timeout from flush.

* Update changelog reflecting behavior change in `BaseProducer::poll`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants