Add support for passing -X key=value arguments through into librdkafka

This _should_ allow this to be configured to use a Kafka authentication scheme
like user/pass or SASL
This commit is contained in:
R Tyler Croy 2019-11-30 11:38:46 -08:00
parent afc0f348e7
commit 50e998b488
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
5 changed files with 70 additions and 20 deletions

2
Cargo.lock generated
View File

@ -508,7 +508,7 @@ checksum = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"
[[package]]
name = "kafkakitty"
version = "0.1.1"
version = "0.2.0"
dependencies = [
"clap",
"crossbeam",

View File

@ -1,6 +1,6 @@
[package]
name = "kafkakitty"
version = "0.1.1"
version = "0.2.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"

View File

@ -22,7 +22,7 @@ interface can be found.
[source,bash]
----
Kafkakitty 😿 0.1.0
Kafkakitty 😿 0.2.0
A cute little Kafka consumer
USAGE:
@ -36,10 +36,19 @@ FLAGS:
OPTIONS:
-b, --brokers <brokers> Broker list in kafka format [default: localhost:9092]
-G, --group-id <group-id> Consumer group ID for Kafka [default: kafkakitty]
-X <settings>... Set a librdkafka configuration property
-t, --topics <topics>... List of topics to follow
----
=== Configuration
For the various additional properties which can be set through the `-X`
command-line argument, the
link:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md[librdkafka
configuration] documents the possible options.
[NOTE]
====
Kafkakitty does *not* come with any built-in security, which means that all

View File

@ -30,18 +30,16 @@ pub struct KittyMessage {
*
*/
pub fn consume(tx: Sender<KittyMessage>,
brokers: &str,
group_id: &str,
topics: &[&str]) {
settings: Vec<(String, String)>,
topics: &[&str]) {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.create()
.expect("Consumer creation failed");
let mut config = ClientConfig::new();
for (key, value) in settings {
config.set(key.as_str(), value.as_str());
}
let consumer: StreamConsumer = config.create().expect("Consumer createion failed");
consumer.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");

View File

@ -15,7 +15,7 @@ mod websocket;
use clap::{App, Arg};
use log::info;
use log::warn;
use std::thread;
use std::time::Duration;
@ -48,12 +48,58 @@ fn main() {
.long("no-open")
.help("Disable opening a browser window automatically")
.takes_value(false))
.arg(Arg::with_name("settings")
.short("X")
.help("Set a librdkafka configuration property")
.takes_value(true)
.multiple(true))
.get_matches();
let should_open = ! matches.is_present("no-open");
let (sender, receiver) = crossbeam::channel::unbounded::<kafka::KittyMessage>();
let mut settings: Vec<(String, String)> = vec![
(String::from("group.id"), String::from("fo"))
];
// Default settings for librdkafka
for (key, value) in [
("enable.partition.eof", "false"),
("session.timeout.ms", "6000"),
("enable.auto.commit", "true"),
].iter() {
settings.push( (key.to_string(), value.to_string()) );
}
settings.push((
String::from("group.id"),
String::from(matches.value_of("group-id").unwrap())
));
settings.push((
String::from("bootstrap.servers"),
String::from(matches.value_of("brokers").unwrap())
));
/*
* Process the arbitrary settings passed through via the -X arguments
*
* These must be processed last since they should overwrite defaults if they exist
*/
if matches.is_present("settings") {
let x_args = matches.values_of("settings").unwrap().collect::<Vec<&str>>();
for arg in x_args {
let parts = arg.split("=").collect::<Vec<&str>>();
if parts.len() == 2 {
settings.push((parts[0].to_string(), parts[1].to_string()));
}
else {
warn!("Could not understand the setting `{}`, skipping", arg);
}
}
}
let (sender, receiver) = crossbeam::channel::unbounded::<kafka::KittyMessage>();
/*
* Kafkakitty requires at minimum three threads:
* - The websocket server thread
@ -71,11 +117,8 @@ fn main() {
thread::spawn(move || {
let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
info!("Setting up consumer for {}", brokers);
kafka::consume(sender, brokers, group_id, &topics);
kafka::consume(sender, settings, &topics);
});
if should_open {