Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseProducer Poll and Flush no longer clearing queue after Move to Event-based API commit #676

Open
brentmjohnson opened this issue Apr 30, 2024 · 2 comments

Comments

@brentmjohnson
Copy link

Upgraded to 0.36.2, and identified that rdkafka::error::RDKafkaErrorCode::QueueFull is continually returned after each send, even when explicitly calling BaseProducer.poll().

Additionally, explicit calls to BaseProducer.flush() leaves un-published events in queue - these can be observed when re-using the BaseProducer after flushing - ending thread - starting new thread using the same thread-safe BaseProducer.

Related code:

loop {
    let producer_future = kafka_producer.lock().unwrap().send(
        BaseRecord::to(topic_name.as_ref().unwrap())
            .key(&())
            .payload(&interval_subscription),
    );
    match producer_future {
        Ok(_) => break,
        Err((KafkaError::MessageProduction(rdkafka::error::RDKafkaErrorCode::QueueFull), _)) => {
            let poll_amt = kafka_producer.lock().unwrap().poll(Timeout::Never);
            println!("poll: {:?}", poll_amt);
        },
        Err((e, _)) => {
            println!("Error {:?}", e);
            break;
        }
    }
}

Downgrading to 0.35.0 to avoid 19f32bf resolves this.

Have seen a few other issues that may be related, like #638

Let me know if more information would be helpful, but I believe this can be observed with some minor changes to the kafka-benchmark project: https://github.com/fede1024/kafka-benchmark/blob/c04d8cee98b0aa47e1580a27d4db5ba90e6318b6/src/producer/mod.rs#L67-L87

@davidblewett
Copy link
Collaborator

@brentmjohnson Can you test to see if #748 addresses this?

@davidblewett
Copy link
Collaborator

The changes in that PR would mean .poll(Timeout::Never) should only be done in a dedicated thread. This call will continue processing events for Duration::MAX.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants