Adjust the Kafka configuration to allow raw librdkafka options to pass through

This should make it much easier to manage underlying configuration or support
changes, since Hotdog will just punt to librdkafka!
This commit is contained in:
R Tyler Croy 2020-04-19 19:22:21 -07:00
parent 52c9987cd4
commit 95bb84e0d3
4 changed files with 12 additions and 9 deletions

View File

@ -6,7 +6,8 @@ global:
port: 1514
tls:
kafka:
brokers: '127.0.0.1:9092'
conf:
bootstrap.servers: '127.0.0.1:9092'
topic: 'default-topic'
metrics:
statsd: 'localhost:8125'

View File

@ -11,7 +11,8 @@ global:
#port: 1514
#tls:
kafka:
brokers: '127.0.0.1:9092'
conf:
bootstrap.servers: '127.0.0.1:9092'
# Default topic to log messages to that are not otherwise mapped
topic: 'test'
metrics:

View File

@ -138,14 +138,14 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(reader: BufR
let mut lines = reader.lines();
let lines_count = metrics.counter("lines");
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &settings.global.kafka.brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let hb = Handlebars::new();
let mut rd_conf = ClientConfig::new();
for (key, value) in settings.global.kafka.conf.iter() {
rd_conf.set(key, value);
}
let producer: FutureProducer = rd_conf.create().expect("Failed to create Kafka producer!");
while let Some(line) = lines.next().await {
let line = line?;
debug!("log: {}", line);

View File

@ -7,6 +7,7 @@ use async_std::path::Path;
use log::*;
use regex;
use serde_json::Value;
use std::collections::HashMap;
pub fn load(file: &str) -> Settings {
let conf = load_configuration(file);
@ -117,7 +118,7 @@ pub struct Listen {
#[derive(Debug, Deserialize)]
pub struct Kafka {
pub brokers: String,
pub conf: HashMap<String, String>,
pub topic: String,
}