rust-rdkafka/examples/simple_producer.rs

92 lines
3.0 KiB
Rust

use std::time::Duration;
use clap::{App, Arg};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use crate::example_utils::setup_logger;
mod example_utils;
async fn produce(brokers: &str, topic_name: &str) {
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// This loop is non blocking: all messages will be sent one after the other, without waiting
// for the results.
let futures = (0..5)
.map(|i| async move {
// The send operation on the topic returns a future, which will be
// completed once the result or failure from Kafka is received.
let delivery_status = producer
.send(
FutureRecord::to(topic_name)
.payload(&format!("Message {}", i))
.key(&format!("Key {}", i))
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
)
.await;
// This will be executed when the result is received.
info!("Delivery status for message {} received", i);
delivery_status
})
.collect::<Vec<_>>();
// This loop will wait until all delivery statuses have been received.
for future in futures {
info!("Future completed. Result: {:?}", future.await);
}
}
#[tokio::main]
async fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.help("Destination topic")
.takes_value(true)
.required(true),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topic = matches.value_of("topic").unwrap();
let brokers = matches.value_of("brokers").unwrap();
produce(brokers, topic).await;
}