Skip to content

Commit

Permalink
add producer
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed May 2, 2024
1 parent 2f8d22a commit 950981f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,3 @@ rdkafka ={ version = "0.36", features = [

tokio ={ version = "1", features = ["macros", "time", "rt-multi-thread"] }
tracing-subscriber = "0.3"

[[example]]
name = "consumer"
path = "examples/consumer.rs"
90 changes: 90 additions & 0 deletions examples/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::thread;
use std::time::Duration;

use aws_config::Region;
use aws_msk_iam_sasl_signer_rs::generate_auth_token;
use rdkafka::client::OAuthToken;
use rdkafka::producer::{FutureProducer, FutureRecord, ProducerContext};
use rdkafka::{ClientConfig, ClientContext};
use tokio::runtime::Handle;
use tokio::time::timeout;
use tracing_subscriber;

const REGION: &str = "us-east-2";
const KAFKA_BROKER: &str = "your-broker-address";
const KAFKA_TOPIC: &str = "your-topic-name";

struct IamProducerContext {
region: Region,
rt: Handle,
}

impl IamProducerContext {
fn new(region: Region, rt: Handle) -> Self {
Self { region, rt }
}
}

impl ProducerContext for IamProducerContext {
type DeliveryOpaque = ();
fn delivery(
&self,
_delivery_result: &rdkafka::message::DeliveryResult<'_>,
_delivery_opaque: Self::DeliveryOpaque,
) {
}
}

impl ClientContext for IamProducerContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;

fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn std::error::Error>> {
let region = self.region.clone();
let rt = self.rt.clone();
let (token, expiration_time_ms) = {
let handle = thread::spawn(move || {
rt.block_on(async {
timeout(Duration::from_secs(10), generate_auth_token(region.clone())).await
})
});
handle.join().unwrap()??
};
Ok(OAuthToken {
token,
principal_name: "".to_string(),
lifetime_ms: expiration_time_ms,
})
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let mut config = ClientConfig::new();

config.set("bootstrap.servers", KAFKA_BROKER);
config.set("security.protocol", "SASL_SSL");
config.set("sasl.mechanism", "OAUTHBEARER");

let region = Region::from_static(REGION);
let context = IamProducerContext::new(region, Handle::current());

let producer: FutureProducer<IamProducerContext> = config.create_with_context(context).unwrap();

let test_record = FutureRecord {
topic: KAFKA_TOPIC,
key: Some("test_key"),
payload: Some("test_payload"),
partition: None,
timestamp: None,
headers: None,
};
let (partition, offset) = producer
.send(test_record, Duration::from_secs(5))
.await
.unwrap();
println!("Message send to partition {partition}, offset {offset}");
}

0 comments on commit 950981f

Please sign in to comment.