Skip to content

Commit

Permalink
Various BaseProducer::flush fixes (#748)
Browse files Browse the repository at this point in the history
* Refactor `BaseProducer::poll` to not return early, but instead
continue processing events until the passed timeout.

Refactor `BaseProducer::flush` to spend most time in `librdkafka`,
and whatever is left in `BaseProducer::poll`.

* Simplify and rely on cast truncation.

* Fix logic error so that we always poll at least once even when
timeout is `Duration::ZERO`.

* Introduce Deadline type to simplify `BaseProducer::flush` and
`BaseProducer::poll`.

* Add `From<Timeout>` impl for `Deadline`

* Ensure we always call `poll_event` at least once, even if we have
`Timeout::After<Duration::ZERO>` for a non-blocking call.

* Allow Deadline to express `Timeout::Never` losslessly.

* Refactor poll loop to be more idiomatic.

* Centralize clamping logic to Deadline.

* Remove extraneous From impl.

* Simplify `BaseProducer::poll` to rely on `From` impl.

* Don't block forever in poll when flushing.

* Remove this clamp, in favor of relying on remaining_millis_i32.

* Ensure we always poll even if we get a timeout from flush.

* Update changelog reflecting behavior change in `BaseProducer::poll`.
  • Loading branch information
davidblewett committed Dec 12, 2024
1 parent bd70a6e commit c8e48e3
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 34 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
* Fix test dependency on docker compose.
* Backwards incompatible: `ClientContext::log` now accepts a single `LogRecord`
* Add `LogRecord::context` to contain the logging context for the given message
* Address wakeup races introduced by pivoting to the event API.
* Update `BaseProducer::poll` to not return early, and instead continue
looping until the passed timeout is reached.

## 0.36.2 (2024-01-16)

Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ impl<C: ClientContext> Client<C> {
&self.context
}

pub(crate) fn poll_event(
pub(crate) fn poll_event<T: Into<Timeout>>(
&self,
queue: &NativeQueue,
timeout: Timeout,
timeout: T,
) -> EventPollResult<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
Expand Down
61 changes: 30 additions & 31 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{IntoOpaque, NativePtr, Timeout};
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};

pub use crate::message::DeliveryResult;

Expand Down Expand Up @@ -338,7 +338,6 @@ where
client: Client<C>,
queue: NativeQueue,
_partitioner: PhantomData<Part>,
min_poll_interval: Timeout,
}

impl<C, Part> BaseProducer<C, Part>
Expand All @@ -353,7 +352,6 @@ where
client,
queue,
_partitioner: PhantomData,
min_poll_interval: Timeout::After(Duration::from_millis(100)),
}
}

Expand All @@ -362,19 +360,25 @@ where
/// Regular calls to `poll` are required to process the events and execute
/// the message delivery callbacks.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
let event = self.client().poll_event(&self.queue, timeout.into());
if let EventPollResult::Event(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
_ => {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_string_lossy()
};
warn!("Ignored event '{}' on base producer poll", evname);
let deadline: Deadline = timeout.into().into();
loop {
let event = self.client().poll_event(&self.queue, &deadline);
if let EventPollResult::Event(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
_ => {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_string_lossy()
};
warn!("Ignored event '{}' on base producer poll", evname);
}
}
}
if deadline.elapsed() {
break;
}
}
}

Expand Down Expand Up @@ -494,26 +498,21 @@ where
// As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for
// the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout.
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let mut timeout = timeout.into();
loop {
let op_timeout = std::cmp::min(timeout, self.min_poll_interval);
if self.in_flight_count() > 0 {
unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) };
self.poll(op_timeout);
let deadline: Deadline = timeout.into().into();
while self.in_flight_count() > 0 && !deadline.elapsed() {
let ret = unsafe {
rdsys::rd_kafka_flush(self.client().native_ptr(), deadline.remaining_millis_i32())
};
if let Deadline::Never = &deadline {
self.poll(Timeout::After(Duration::ZERO));
} else {
return Ok(());
self.poll(&deadline);
}

if op_timeout >= timeout {
let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) };
if ret.is_error() {
return Err(KafkaError::Flush(ret.into()));
} else {
return Ok(());
}
}
timeout -= op_timeout;
if ret.is_error() {
return Err(KafkaError::Flush(ret.into()));
};
}
Ok(())
}

fn purge(&self, flags: PurgeConfig) {
Expand Down
57 changes: 56 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Utility functions and types.
use std::cmp;
use std::ffi::CStr;
use std::fmt;
use std::future::Future;
Expand All @@ -12,7 +13,7 @@ use std::slice;
use std::sync::Arc;
#[cfg(feature = "naive-runtime")]
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

#[cfg(feature = "naive-runtime")]
use futures_channel::oneshot;
Expand All @@ -31,6 +32,40 @@ pub fn get_rdkafka_version() -> (i32, String) {
(version_number, c_str.to_string_lossy().into_owned())
}

pub(crate) enum Deadline {
At(Instant),
Never,
}

impl Deadline {
// librdkafka's flush api requires an i32 millisecond timeout
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);

pub(crate) fn new(duration: Option<Duration>) -> Self {
if let Some(d) = duration {
Self::At(Instant::now() + d)
} else {
Self::Never
}
}

pub(crate) fn remaining(&self) -> Duration {
if let Deadline::At(i) = self {
*i - Instant::now()
} else {
Duration::MAX
}
}

pub(crate) fn remaining_millis_i32(&self) -> i32 {
cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32
}

pub(crate) fn elapsed(&self) -> bool {
self.remaining() <= Duration::ZERO
}
}

/// Specifies a timeout for a Kafka operation.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum Timeout {
Expand Down Expand Up @@ -76,6 +111,26 @@ impl std::ops::SubAssign for Timeout {
}
}

impl From<Timeout> for Deadline {
fn from(t: Timeout) -> Deadline {
if let Timeout::After(dur) = t {
Deadline::new(Some(dur))
} else {
Deadline::new(None)
}
}
}

impl From<&Deadline> for Timeout {
fn from(d: &Deadline) -> Timeout {
if let Deadline::Never = d {
Timeout::Never
} else {
Timeout::After(d.remaining())
}
}
}

impl From<Duration> for Timeout {
fn from(d: Duration) -> Timeout {
Timeout::After(d)
Expand Down

0 comments on commit c8e48e3

Please sign in to comment.