Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase from upstream c6d9a65a6df8c43fdf93b42bb276b6b7d2ba6c23 #729

Conversation

davidblewett
Copy link
Collaborator

Removing #636.

joelwachsler and others added 30 commits September 26, 2024 14:27
I realize that there is a `stats_raw` trait method, but it would be better to not have to re-type out the statistics fields myself. This came about because I would like to act on many of the individual fields locally via some aggregations, but then output them back out (along with other fields in a larger status struct) as a JSON-encoded string, e.g.:

```rs
#[derive(Serialize)]
pub struct Status {
  pub sample_field: u64,
  pub sample_field_2: boolean,
  pub statistics: Statistics,
}
```
The `Arc<BaseConsumer<C>>` to `MessageStream` isn't necessary anymore,
and the changes to `split_partition_queue` can be reverted as well I
think.
This is required as multiple write acks are tied to a single event.
One poll call might not be enough to serve the delivery report
callbacks of the purged messages. The current flush impl will call
poll multiple times until the queue is empty or timeout.
If timeout::Never is used, poll should eventually return a Message or
Error rather than None when handling other events like stats, rebalances,
etc.
rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets
embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event
for the consumer case in order to return the error to the user.
Currently if a group.id is not specified we allow the use of the consumer
for fetching metadata and watermarks. Keeping this behaviour.
If you have a consumer wrapping this one (FFI cases), the outer consumer must close
the queue and serve the events via Poll. Otherwise it will hang forever
as prior to calling close there's a rebalance & rdkafka awaits a
response before continuing.
With the Event API we propagate generic client instance-level errors,
such as broker connection failures, authentication issues, etc.

However, fatal errors are also propagated via the Event API. These
indicates that the particular instance of the client (producer/consumer)
becomes non-functional.
davidblewett and others added 27 commits September 26, 2024 14:27
This does not affect the StreamConsumer or any other wrapper consumer.
It will only incur on an extra Poll call when there's a rebalance event.

When using bindings built upon the rust-rdkafka ffi, the caller is
responsible for initiating the rebalance calls (*assign).
If a high timeout is specified, the rebalance handler will only be
triggered once the timeout period has elapsed.

This fixes it by always returning on rebalance events except when
Timeout::Never. Poll calls with timeout::Never are expected to return
a message.
In some error cases, the `Base{Consumer,Producer}` were eagerly copying strings,
and `unwrap`ing utf8 validation, just to print an error message.

This will avoid the allocation in the common case, and be panic-safe in the presumably unreachable case of invalid utf-8.
Kafka can return string with multiple \0 chars (seen on Windows x64),
and CStr::from_bytes_with_nul panics in that case.
String::from_utf8_lossy() handles that ok
Fixed several issues with the current test suite:

* Missing cmake dependency
* Rust version being too old for some dependencies
* Missing null pointer check in src/groups.rs
* Test timeout being too strict
* CI test jobs not working in parallel

This also fixes a few compiler warnings (unused imports, unused code,
etc.) and formatting issues.
Fix various clippy warnings in both source and tests
It is currently impossible to develop a custom consumer based on `BaseConsumer`
because its `queue` property, which is necessary to receive notifications about
new incoming messages, is private.

This defines `set_nonempty_callback` method on `BaseConsumer` similarly to how
it has already been done for `PartitionQueue`. That will allow setting
`rdkafka_sys::rd_kafka_queue_cb_event_enable` callback from within a custom
consumer implementation.
The `OnDrop` struct is no longer used, so just remove it. It's easy to
add back if we need it in the future.
Update the MSRV of Rust
@davidblewett davidblewett deleted the davidblewett/update-from-upstream-c6d9a65a6df8c43fdf93b42bb276b6b7d2ba6c23 branch September 26, 2024 18:57
@davidblewett davidblewett restored the davidblewett/update-from-upstream-c6d9a65a6df8c43fdf93b42bb276b6b7d2ba6c23 branch September 26, 2024 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.