mirror of https://github.com/fede1024/rust-rdkafka
115 lines
3.5 KiB
Rust
115 lines
3.5 KiB
Rust
use std::convert::TryInto;
|
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|
|
|
use clap::{App, Arg};
|
|
use hdrhistogram::Histogram;
|
|
|
|
use rdkafka::config::ClientConfig;
|
|
use rdkafka::consumer::{Consumer, StreamConsumer};
|
|
use rdkafka::message::Message;
|
|
use rdkafka::producer::{FutureProducer, FutureRecord};
|
|
|
|
use crate::example_utils::setup_logger;
|
|
|
|
mod example_utils;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
let matches = App::new("Roundtrip example")
|
|
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
|
|
.about("Measures latency between producer and consumer")
|
|
.arg(
|
|
Arg::with_name("brokers")
|
|
.short("b")
|
|
.long("brokers")
|
|
.help("Broker list in kafka format")
|
|
.takes_value(true)
|
|
.default_value("localhost:9092"),
|
|
)
|
|
.arg(
|
|
Arg::with_name("topic")
|
|
.long("topic")
|
|
.help("topic")
|
|
.takes_value(true)
|
|
.required(true),
|
|
)
|
|
.arg(
|
|
Arg::with_name("log-conf")
|
|
.long("log-conf")
|
|
.help("Configure the logging format (example: 'rdkafka=trace')")
|
|
.takes_value(true),
|
|
)
|
|
.get_matches();
|
|
|
|
setup_logger(true, matches.value_of("log-conf"));
|
|
|
|
let brokers = matches.value_of("brokers").unwrap();
|
|
let topic = matches.value_of("topic").unwrap().to_owned();
|
|
|
|
let producer: FutureProducer = ClientConfig::new()
|
|
.set("bootstrap.servers", brokers)
|
|
.set("message.timeout.ms", "5000")
|
|
.create()
|
|
.expect("Producer creation error");
|
|
|
|
let consumer: StreamConsumer = ClientConfig::new()
|
|
.set("bootstrap.servers", brokers)
|
|
.set("session.timeout.ms", "6000")
|
|
.set("enable.auto.commit", "false")
|
|
.set("group.id", "rust-rdkafka-roundtrip-example")
|
|
.create()
|
|
.expect("Consumer creation failed");
|
|
consumer.subscribe(&[&topic]).unwrap();
|
|
|
|
tokio::spawn(async move {
|
|
let mut i = 0_usize;
|
|
loop {
|
|
producer
|
|
.send_result(
|
|
FutureRecord::to(&topic)
|
|
.key(&i.to_string())
|
|
.payload("dummy")
|
|
.timestamp(now()),
|
|
)
|
|
.unwrap()
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
i += 1;
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
let mut latencies = Histogram::<u64>::new(5).unwrap();
|
|
println!("Warming up for 10s...");
|
|
loop {
|
|
let message = consumer.recv().await.unwrap();
|
|
let then = message.timestamp().to_millis().unwrap();
|
|
if start.elapsed() < Duration::from_secs(10) {
|
|
// Warming up.
|
|
} else if start.elapsed() < Duration::from_secs(20) {
|
|
if latencies.len() == 0 {
|
|
println!("Recording for 10s...");
|
|
}
|
|
latencies += (now() - then) as u64;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
println!("measurements: {}", latencies.len());
|
|
println!("mean latency: {}ms", latencies.mean());
|
|
println!("p50 latency: {}ms", latencies.value_at_quantile(0.50));
|
|
println!("p90 latency: {}ms", latencies.value_at_quantile(0.90));
|
|
println!("p99 latency: {}ms", latencies.value_at_quantile(0.99));
|
|
}
|
|
|
|
fn now() -> i64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis()
|
|
.try_into()
|
|
.unwrap()
|
|
}
|