Skip to content

Commit

Permalink
Expose a close_queue and closed methods
Browse files Browse the repository at this point in the history
If you have a consumer wrapping this one (FFI cases), the outer consumer must close
the queue and serve the events via Poll. Otherwise it will hang forever
as prior to calling close there's a rebalance & rdkafka awaits a
response before continuing.
  • Loading branch information
scanterog committed Nov 1, 2023
1 parent 8da2790 commit 2f63754
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,27 @@ where
PartitionQueue::new(self.clone(), queue)
})
}

/// Close the queue used by a consumer.
/// Only exposed for advanced usage of this API and should not be used under normal circumstances.
pub fn close_queue(&self) -> KafkaResult<()> {
let err = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue(
self.client.native_ptr(),
self.queue.ptr(),
))
};
if err.is_error() {
Err(KafkaError::ConsumerQueueClose(err.code()))
} else {
Ok(())
}
}

/// Returns true if the consumer is closed, else false.
pub fn closed(&self) -> bool {
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
}
}

impl<C> Consumer<C> for BaseConsumer<C>
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ pub enum KafkaError {
ClientCreation(String),
/// Consumer commit failed.
ConsumerCommit(RDKafkaErrorCode),
/// Consumer queue close failed.
ConsumerQueueClose(RDKafkaErrorCode),
/// Flushing failed
Flush(RDKafkaErrorCode),
/// Global error.
Expand Down Expand Up @@ -204,6 +206,9 @@ impl fmt::Debug for KafkaError {
KafkaError::ConsumerCommit(err) => {
write!(f, "KafkaError (Consumer commit error: {})", err)
}
KafkaError::ConsumerQueueClose(err) => {
write!(f, "KafkaError (Consumer queue close error: {})", err)
}
KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
KafkaError::GroupListFetch(err) => {
Expand Down Expand Up @@ -255,6 +260,7 @@ impl fmt::Display for KafkaError {
}
KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
KafkaError::ConsumerQueueClose(err) => write!(f, "Consumer queue close error: {}", err),
KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
KafkaError::Global(err) => write!(f, "Global error: {}", err),
KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
Expand Down Expand Up @@ -288,6 +294,7 @@ impl Error for KafkaError {
KafkaError::ClientConfig(..) => None,
KafkaError::ClientCreation(_) => None,
KafkaError::ConsumerCommit(err) => Some(err),
KafkaError::ConsumerQueueClose(err) => Some(err),
KafkaError::Flush(err) => Some(err),
KafkaError::Global(err) => Some(err),
KafkaError::GroupListFetch(err) => Some(err),
Expand Down Expand Up @@ -327,6 +334,7 @@ impl KafkaError {
KafkaError::ClientConfig(..) => None,
KafkaError::ClientCreation(_) => None,
KafkaError::ConsumerCommit(err) => Some(*err),
KafkaError::ConsumerQueueClose(err) => Some(*err),
KafkaError::Flush(err) => Some(*err),
KafkaError::Global(err) => Some(*err),
KafkaError::GroupListFetch(err) => Some(*err),
Expand Down

0 comments on commit 2f63754

Please sign in to comment.