Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rdkafka event API instead of the callback API #617

Merged
merged 24 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
19f32bf
Move to Event-based API
scanterog Aug 11, 2023
b527a3e
Adapt the StreamConsumer to poll the underlying BaseConsumer
scanterog Oct 17, 2023
b897ec9
Pass arc by value rather than reference and fix generic type.
davidblewett Oct 18, 2023
ebe7e9c
Refactor to use references and lifetimes rather than Arc.
davidblewett Oct 20, 2023
39dec28
Work on supporting StreamConsumer via lifetimes instead of Arc.
davidblewett Oct 22, 2023
941cd32
Use Arc for events in BorrowMessage
scanterog Oct 24, 2023
438af77
Adapt producer Flush to the Event API semantics
scanterog Oct 24, 2023
0a36b3d
Explain why the TPL need to be manuallyDrop on the consumer events ha…
scanterog Oct 25, 2023
0b885a5
Add comment for no-op method used on RDKafkaMessage impl of the Kafka…
scanterog Oct 25, 2023
32b0d24
Update doc comment for BorrowedMessage::from_dr_event
scanterog Oct 25, 2023
64d2e32
Replace poll with flush on baseProducer drop
scanterog Oct 25, 2023
f3173d5
StreamConsumer Stream impl fixes for the event API
scanterog Oct 26, 2023
6c8c5f0
Consumer needs to read from earliest otherwise consumer will never re…
scanterog Oct 26, 2023
54893ab
Poll should not return None if timeout has not been reached
scanterog Oct 26, 2023
c7f83a8
Cargo clippy
scanterog Oct 26, 2023
74ff52a
Propagate errors for the consumer
scanterog Oct 27, 2023
bb2aee0
Adapt commit_transaction to the event api
scanterog Oct 27, 2023
3b98f95
Adapt consumer close to the event api
scanterog Oct 30, 2023
2af3671
Allow creating a consumer without group.id
scanterog Oct 30, 2023
4fb2266
Do not panic on transient errors on test_consume_partition_order
scanterog Oct 31, 2023
34fc335
Expose a close_queue and closed methods
scanterog Oct 30, 2023
7202e7b
Use closed and close_queue methods on drop
scanterog Nov 6, 2023
3b13940
Propagate fatal errors
scanterog Nov 6, 2023
978c964
Fix op timeout computation logic on poll_queue
scanterog Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> J
.expect("Failed to start polling thread")
}

type NativeEvent = NativePtr<RDKafkaEvent>;
pub(crate) type NativeEvent = NativePtr<RDKafkaEvent>;

unsafe impl KafkaDrop for RDKafkaEvent {
const TYPE: &'static str = "event";
Expand Down
244 changes: 130 additions & 114 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
//! [`consumer`]: crate::consumer
//! [`producer`]: crate::producer

use std::convert::TryFrom;
use std::error::Error;
use std::ffi::{CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::{c_char, c_void};
use std::os::raw::c_char;
use std::ptr;
use std::slice;
use std::string::ToString;
use std::sync::Arc;

use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::admin::NativeEvent;
use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
Expand Down Expand Up @@ -239,21 +239,6 @@ impl<C: ClientContext> Client<C> {
Arc::as_ptr(&context) as *mut c_void,
)
};
unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
unsafe {
rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>))
};
unsafe {
rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>))
};
if C::ENABLE_REFRESH_OAUTH_TOKEN {
unsafe {
rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
native_config.ptr(),
Some(native_oauth_refresh_cb::<C>),
)
};
}

let client_ptr = unsafe {
let native_config = ManuallyDrop::new(native_config);
Expand Down Expand Up @@ -293,6 +278,128 @@ impl<C: ClientContext> Client<C> {
&self.context
}

pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_ERROR => {
// rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets
// embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event
// for the consumer case in order to return the error to the user.
self.handle_error_event(ev.ptr());
return Some(ev);
}
rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => {
if C::ENABLE_REFRESH_OAUTH_TOKEN {
self.handle_oauth_refresh_event(ev.ptr());
}
}
_ => {
return Some(ev);
}
}
}
None
}

fn handle_log_event(&self, event: *mut RDKafkaEvent) {
let mut fac: *const c_char = std::ptr::null();
let mut str_: *const c_char = std::ptr::null();
let mut level: i32 = 0;
let result = unsafe { rdsys::rd_kafka_event_log(event, &mut fac, &mut str_, &mut level) };
if result == 0 {
let fac = unsafe { CStr::from_ptr(fac).to_string_lossy() };
let log_message = unsafe { CStr::from_ptr(str_).to_string_lossy() };
self.context().log(
RDKafkaLogLevel::from_int(level),
fac.trim(),
log_message.trim(),
);
}
}

fn handle_stats_event(&self, event: *mut RDKafkaEvent) {
let json = unsafe { CStr::from_ptr(rdsys::rd_kafka_event_stats(event)) };
self.context().stats_raw(json.to_bytes());
}

fn handle_error_event(&self, event: *mut RDKafkaEvent) {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event) };
let error = KafkaError::Global(rdkafka_err.into());
let reason =
unsafe { CStr::from_ptr(rdsys::rd_kafka_event_error_string(event)).to_string_lossy() };
self.context().error(error, reason.trim());
}

fn handle_oauth_refresh_event(&self, event: *mut RDKafkaEvent) {
let oauthbearer_config = unsafe { rdsys::rd_kafka_event_config_string(event) };
let res: Result<_, Box<dyn Error>> = (|| {
let oauthbearer_config = match oauthbearer_config.is_null() {
true => None,
false => unsafe { Some(util::cstr_to_owned(oauthbearer_config)) },
};
let token_info = self
.context()
.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
let mut err_buf = ErrBuf::new();
let code = unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token(
self.native_ptr(),
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
debug!("successfully set refreshed OAuth token");
} else {
debug!(
"failed to set refreshed OAuth token (code {:?}): {}",
code, err_buf
);
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
err_buf.as_mut_ptr(),
)
};
}
}
Err(e) => {
debug!("failed to refresh OAuth token: {}", e);
let message = match CString::new(e.to_string()) {
Ok(message) => message,
Err(e) => {
error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
CString::new(
"error while refreshing OAuth token has embedded null character",
)
.expect("known to be a valid CString")
}
};
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
message.as_ptr(),
)
};
}
}
}

/// Returns the metadata information for the specified topic, or for all topics in the cluster
/// if no topic is specified.
pub fn fetch_metadata<T: Into<Timeout>>(
Expand Down Expand Up @@ -442,6 +549,11 @@ impl<C: ClientContext> Client<C> {
pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
}

/// Returns a NativeQueue for the main librdkafka event queue from the current client.
pub(crate) fn main_queue(&self) -> NativeQueue {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_main(self.native_ptr())).unwrap() }
}
}

pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
Expand Down Expand Up @@ -471,48 +583,6 @@ impl NativeQueue {
}
}

pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
client: *const RDKafka,
level: i32,
fac: *const c_char,
buf: *const c_char,
) {
let fac = CStr::from_ptr(fac).to_string_lossy();
let log_message = CStr::from_ptr(buf).to_string_lossy();

let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
context.log(
RDKafkaLogLevel::from_int(level),
fac.trim(),
log_message.trim(),
);
}

pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
_conf: *mut RDKafka,
json: *mut c_char,
json_len: usize,
opaque: *mut c_void,
) -> i32 {
let context = &mut *(opaque as *mut C);
context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
0 // librdkafka will free the json buffer
}

pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
_client: *mut RDKafka,
err: i32,
reason: *const c_char,
opaque: *mut c_void,
) {
let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
let error = KafkaError::Global(err.into());
let reason = CStr::from_ptr(reason).to_string_lossy();

let context = &mut *(opaque as *mut C);
context.error(error, reason.trim());
}

/// A generated OAuth token and its associated metadata.
///
/// When using the `OAUTHBEARER` SASL authentication method, this type is
Expand All @@ -529,60 +599,6 @@ pub struct OAuthToken {
pub lifetime_ms: i64,
}

pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
client: *mut RDKafka,
oauthbearer_config: *const c_char,
opaque: *mut c_void,
) {
let res: Result<_, Box<dyn Error>> = (|| {
let context = &mut *(opaque as *mut C);
let oauthbearer_config = match oauthbearer_config.is_null() {
true => None,
false => Some(util::cstr_to_owned(oauthbearer_config)),
};
let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
let mut err_buf = ErrBuf::new();
let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
client,
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
err_buf.as_mut_ptr(),
err_buf.capacity(),
);
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
debug!("successfully set refreshed OAuth token");
} else {
debug!(
"failed to set refreshed OAuth token (code {:?}): {}",
code, err_buf
);
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
}
}
Err(e) => {
debug!("failed to refresh OAuth token: {}", e);
let message = match CString::new(e.to_string()) {
Ok(message) => message,
Err(e) => {
error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
CString::new("error while refreshing OAuth token has embedded null character")
.expect("known to be a valid CString")
}
};
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
}
}
}

#[cfg(test)]
mod tests {
// Just call everything to test there no panics by default, behavior
Expand Down
Loading