Compare commits

...

14 Commits

Author SHA1 Message Date
R Tyler Croy f6d4b0aebe
Merge pull request #28 from reiseburo/alloc-reduction
Identify and address some performance hot spots
2020-05-21 14:00:54 -07:00
R Tyler Croy 36ce1888b2 📎 2020-05-21 13:45:00 -07:00
R Tyler Croy b592d6aa18 cargo fmt 2020-05-21 13:45:00 -07:00
R Tyler Croy 741d63d983 Pre-compile the Handlebars templates for each connection rather than each log line
Since I wasn't able to figure out a good way to pre-compile all the Handlebars
templates at 🌭 boot time, this is the next best thing. Once a connection
comes in, pre-compile the templates to speed up handling each log line

Fixes #3
2020-05-21 13:45:00 -07:00
R Tyler Croy 09f75ec0ee Revert "Work in progress registering templates for Handlebars up front"
This reverts commit 380d8b2340c33d160373bb9c4a87d94ba61044ea.

Thinking of going a different route with this, where hotdog compiles templates
on a per-connection basis, thereby working around the lifetimes challenge
2020-05-21 13:45:00 -07:00
R Tyler Croy f28ef5f6eb Work in progress registering templates for Handlebars up front
This doesn't compile because the handling of lifetimes is still incorrect.
2020-05-21 13:45:00 -07:00
R Tyler Croy 4cf7e1807f Refactor the syslog parsing code out into its own module 2020-05-21 13:45:00 -07:00
R Tyler Croy 9f83708b7d Performance improvement by avoiding rendering the entire output buffer as Handlebars
Instead, this commit changes the perform_merge function to render the JSON to be
merged with the output buffer, which is typically going to be smaller than our
log message, and then merges that result.

The underlying assumption here is that Handlebars is going to take longer on
longer strings, and is costing more time than an extra serialization with serde.
2020-05-21 13:45:00 -07:00
R Tyler Croy 5ed6360dc9 Revert "Implement render_value to handle rendering the merged Values"
This reverts commit c2b1bbff96556edbe960481b05550e392abb9bbd.

After working hard on the original commit, I am now thinking the more prudent
approach would be to render the to_merge JSON _first_ and the ndeserialize and
merge the two values
2020-05-21 13:45:00 -07:00
R Tyler Croy aa05859d1f Implement render_value to handle rendering the merged Values
This will hopefully make it easier to render the merged values, hoping that this
approach reduces time spent rendering messages
2020-05-21 13:45:00 -07:00
R Tyler Croy 6016a1bc0c
Merge pull request #27 from reiseburo/dipschtick
Upgrade dipstick
2020-05-19 15:51:26 -07:00
R Tyler Croy 0faabcef05 Upgrade to dipstick 0.8.0 SUCH REJOICING 2020-05-19 15:45:28 -07:00
R Tyler Croy a098ba0bfc Fix comment in the example hotdog.yml 2020-05-16 08:54:08 -07:00
R Tyler Croy ff656efcf8 Bump version for a release 2020-05-14 21:58:34 -07:00
11 changed files with 431 additions and 119 deletions

80
Cargo.lock generated
View File

@ -336,9 +336,9 @@ dependencies = [
[[package]]
name = "dipstick"
version = "0.7.13"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52d7ff907d35a3b505f17bbb7cd0afb61da5c0e7c35b0fbe90fb7d3f59e402ff"
checksum = "585cd86f9fe6282234f10e7435d1c0df9203909731ca9ebaabf5f7e80d5f28e4"
dependencies = [
"atomic_refcell",
"crossbeam-channel 0.3.9",
@ -496,6 +496,17 @@ dependencies = [
"typenum",
]
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "handlebars"
version = "3.0.1"
@ -521,7 +532,7 @@ dependencies = [
[[package]]
name = "hotdog"
version = "0.1.9"
version = "0.2.1"
dependencies = [
"async-std",
"async-tls",
@ -544,6 +555,7 @@ dependencies = [
"serde_regex",
"syslog_loose",
"syslog_rfc5424",
"uuid",
]
[[package]]
@ -1020,6 +1032,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "pretty_env_logger"
version = "0.3.1"
@ -1076,6 +1094,47 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "rdkafka"
version = "0.23.1"
@ -1438,6 +1497,15 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
dependencies = [
"rand",
]
[[package]]
name = "vcpkg"
version = "0.2.8"
@ -1456,6 +1524,12 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasm-bindgen"
version = "0.2.62"

View File

@ -1,6 +1,6 @@
[package]
name = "hotdog"
version = "0.1.9"
version = "0.2.1"
authors = ["R. Tyler Croy <rtyler+hotdog@brokenco.de>"]
edition = "2018"
@ -52,10 +52,13 @@ regex = "~1.3.6"
jmespath = { git = "https://github.com/jmespath/jmespath.rs" }
# Needed to report metrics of hotdog's performance
dipstick = "~0.7.12"
dipstick = "~0.8.0"
# Used for string replacements and other template based transformations
handlebars = "~3.0.1"
# Needed for time management
chrono = "~0.4.11"
# Needed to tag rules and actions with their own unique identifiers
uuid = { version = "~0.8.1", features = ["v4"] }

View File

@ -453,6 +453,17 @@ For TLS connections, you can use the `openssl` `s_client` command:
echo '<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hello world' | openssl s_client -connect localhost:6514
----
=== Profiling
Profiling `hotdog` is best done on a Linux host with the `perf` tool, e.g.
[source,bash]
----
RUST_LOG=info perf record --call-graph dwarf -- ./target/debug/hotdog -c ./hotdog.yml
perf report -ng
----
== Similar Projects
`hotdog` was originally motivated by challenges with

View File

@ -39,10 +39,10 @@ rules:
# We don't want any other rules to try to consume these messages
- type: stop
# Match all JSON looking content
# Match JSON content which has a meta.topic value, e.g.
# {"meta":{"topic" : "foo"}}
- jmespath: 'meta.topic'
field: msg
# But don't do anything with it ^_^
actions:
- type: merge
json:

View File

@ -40,7 +40,7 @@ pub struct Kafka {
* ::new() and the .connect() function
*/
producer: Option<FutureProducer<DefaultClientContext>>,
metrics: Option<Arc<LockingOutput>>,
metrics: Option<Arc<StatsdScope>>,
rx: Receiver<KafkaMessage>,
tx: Sender<KafkaMessage>,
}
@ -56,7 +56,7 @@ impl Kafka {
}
}
pub fn with_metrics(&mut self, metrics: Arc<LockingOutput>) {
pub fn with_metrics(&mut self, metrics: Arc<StatsdScope>) {
self.metrics = Some(metrics);
}
@ -166,7 +166,8 @@ impl Kafka {
Ok(_) => {
timer.stop(handle);
m.counter("kafka.submitted").count(1);
m.counter(&format!("kafka.submitted.{}", &kmsg.topic)).count(1);
m.counter(&format!("kafka.submitted.{}", &kmsg.topic))
.count(1);
}
Err((err, _)) => {
match err {

View File

@ -12,11 +12,8 @@ extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_regex;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
extern crate syslog_rfc5424;
extern crate syslog_loose;
extern crate syslog_rfc5424;
use async_std::{
io::BufReader,
@ -35,6 +32,7 @@ use std::collections::HashMap;
mod kafka;
mod merge;
mod parse;
mod rules;
mod serve_tls;
mod settings;
@ -49,7 +47,7 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
*/
pub struct ConnectionState {
settings: Arc<Settings>,
metrics: Arc<LockingOutput>,
metrics: Arc<StatsdScope>,
sender: Sender<KafkaMessage>,
}
@ -60,7 +58,7 @@ pub struct ConnectionState {
struct RuleState<'a> {
variables: &'a HashMap<String, String>,
hb: &'a handlebars::Handlebars<'a>,
metrics: Arc<LockingOutput>,
metrics: Arc<StatsdScope>,
}
fn main() -> Result<()> {
@ -136,7 +134,7 @@ fn main() -> Result<()> {
async fn accept_loop(
addr: impl ToSocketAddrs,
settings: Arc<Settings>,
metrics: Arc<LockingOutput>,
metrics: Arc<StatsdScope>,
) -> Result<()> {
let mut kafka = Kafka::new(settings.global.kafka.buffer);
@ -178,41 +176,46 @@ async fn accept_loop(
Ok(())
}
#[derive(Debug)]
enum SyslogErrors {
UnknownFormat,
}
#[derive(Debug)]
struct SyslogMessage {
msg: String,
fn template_id_for(rule: &Rule, index: usize) -> String {
format!("{}-{}", rule.uuid, index)
}
/**
* Attempt to parse a given line either as RFC 5424 or RFC 3164
* precompile_templates will register templates for all the Merge and Replace actions from the
* settings
*
* Will usually return a true, unless some setting parse failure occurred which is a critical
* failure for the daemon
*/
fn parse_line(line: String) -> std::result::Result<SyslogMessage, SyslogErrors> {
match syslog_rfc5424::parse_message(&line) {
Ok(msg) => {
return Ok(SyslogMessage {
msg: msg.msg,
})
},
Err(_) => {
let parsed = syslog_loose::parse_message(&line);
fn precompile_templates(hb: &mut Handlebars, settings: Arc<Settings>) -> bool {
for rule in settings.rules.iter() {
for index in 0..rule.actions.len() {
match &rule.actions[index] {
Action::Merge { json: _, json_str } => {
let template_id = template_id_for(rule, index);
/*
* Since syslog_loose doesn't give a Result, the only way to tell if themessage wasn't
* parsed properly is if some fields are None'd out.
*/
if parsed.timestamp != None {
return Ok(SyslogMessage{
msg: parsed.msg.to_string(),
})
if let Some(template) = json_str {
if let Err(e) = hb.register_template_string(&template_id, &template) {
error!("Failed to register template! {}\n{}", e, template);
return false;
}
} else {
error!("Could not look up the json_str for a Merge action");
return false;
}
}
Action::Replace { template } => {
let template_id = format!("{}-{}", rule.uuid, index);
if let Err(e) = hb.register_template_string(&template_id, &template) {
error!("Failed to register template! {}\n{}", e, template);
return false;
}
}
_ => {}
}
Err(SyslogErrors::UnknownFormat)
},
}
}
true
}
/**
@ -226,13 +229,18 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
let mut lines = reader.lines();
let lines_count = state.metrics.counter("lines");
let hb = Handlebars::new();
let mut hb = Handlebars::new();
if !precompile_templates(&mut hb, state.settings.clone()) {
error!("Failing to precompile templates is a fatal error, not going to parse logs since the configuration is broken");
// TODO fix the Err types
return Ok(());
}
while let Some(line) = lines.next().await {
let line = line?;
debug!("log: {}", line);
let parsed = parse_line(line);
let parsed = parse::parse_line(line);
if let Err(_e) = &parsed {
state.metrics.counter("error.log_parse").count(1);
@ -247,7 +255,6 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
let mut continue_rules = true;
debug!("parsed as: {}", msg.msg);
for rule in state.settings.rules.iter() {
/*
* If we have been told to stop processing rules, then it's time to bail on this log
@ -321,7 +328,9 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
/*
* Process the actions one the rule has matched
*/
for action in rule.actions.iter() {
for index in 0..rule.actions.len() {
let action = &rule.actions[index];
match action {
Action::Forward { topic } => {
/*
@ -366,20 +375,30 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
}
break;
}
Action::Merge { json } => {
Action::Merge { json, json_str: _ } => {
debug!("merging JSON content: {}", json);
if let Ok(buffer) = perform_merge(&msg.msg, json, &rule_state) {
if let Ok(buffer) =
perform_merge(&msg.msg, &template_id_for(&rule, index), &rule_state)
{
output = buffer;
} else {
continue_rules = false;
}
}
Action::Replace { template } => {
debug!("replacing content with template: {}", template);
if let Ok(rendered) = hb.render_template(template, &hash) {
let template_id = template_id_for(&rule, index);
debug!(
"replacing content with template: {} ({})",
template, template_id
);
if let Ok(rendered) = hb.render(&template_id, &hash) {
output = rendered;
}
}
Action::Stop => {
continue_rules = false;
}
@ -396,31 +415,39 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
*/
fn perform_merge(
buffer: &str,
to_merge: &serde_json::Value,
template_id: &str,
state: &RuleState,
) -> std::result::Result<String, String> {
/*
* If the administrator configured the merge incorrectly, just pass the buffer along un-merged
*/
if !to_merge.is_object() {
error!("Merge requested was not a JSON object: {}", to_merge);
state.metrics.counter("error.merge_of_invalid_json").count(1);
return Ok(buffer.to_string());
}
if let Ok(mut msg_json) = serde_json::from_str(&buffer) {
if let Ok(rendered) = state.hb.render(template_id, &state.variables) {
let to_merge: serde_json::Value = serde_json::from_str(&rendered)
.expect("Failed to deserialize our rendered to_merge_str");
if let Ok(mut msg_json) = serde_json::from_str::<serde_json::Value>(buffer) {
merge::merge(&mut msg_json, to_merge);
if let Ok(output) = serde_json::to_string(&msg_json) {
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
return Ok(rendered);
/*
* If the administrator configured the merge incorrectly, just pass the buffer along un-merged
*/
if !to_merge.is_object() {
error!("Merge requested was not a JSON object: {}", to_merge);
state
.metrics
.counter("error.merge_of_invalid_json")
.count(1);
return Ok(buffer.to_string());
}
merge::merge(&mut msg_json, &to_merge);
if let Ok(output) = serde_json::to_string(&msg_json) {
return Ok(output);
}
Ok(output)
} else {
Err("Failed to render".to_string())
}
Err("Failed to merge and serialize".to_string())
} else {
error!("Failed to parse as JSON, stopping actions: {}", buffer);
state.metrics.counter("error.merge_target_not_json").count(1);
state
.metrics
.counter("error.merge_target_not_json")
.count(1);
Err("Not JSON".to_string())
}
}
@ -432,9 +459,10 @@ mod tests {
/**
* Generating a test RuleState for consistent states in test
*/
fn rule_state<'a>(hb : &'a handlebars::Handlebars<'a>,
hash : &'a HashMap<String, String>) -> RuleState<'a> {
fn rule_state<'a>(
hb: &'a handlebars::Handlebars<'a>,
hash: &'a HashMap<String, String>,
) -> RuleState<'a> {
let metrics = Arc::new(
Statsd::send_to("example.com:8125")
.expect("Failed to create Statsd recorder")
@ -445,18 +473,20 @@ mod tests {
RuleState {
hb: &hb,
variables: &hash,
metrics: metrics,
metrics,
}
}
#[test]
fn merge_with_empty() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "{}");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = json!({});
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{}".to_string()));
}
@ -465,12 +495,14 @@ mod tests {
*/
#[test]
fn merge_with_non_object() -> std::result::Result<(), String> {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "[1]");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = json!([1]);
let output = perform_merge("{}", &to_merge, &state)?;
let output = perform_merge("{}", template_id, &state)?;
assert_eq!(output, "{}".to_string());
Ok(())
}
@ -480,12 +512,15 @@ mod tests {
*/
#[test]
fn merge_without_json_buffer() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "{}");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = json!({});
let output = perform_merge("invalid", &to_merge, &state);
let to_merge = r#"{}"#;
let output = perform_merge("invalid", template_id, &state);
let expected = Err("Not JSON".to_string());
assert_eq!(output, expected);
}
@ -495,12 +530,14 @@ mod tests {
*/
#[test]
fn merge_with_json_buffer() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, r#"{"hello":1}"#);
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = json!({"hello" : 1});
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{\"hello\":1}".to_string()));
}
@ -509,43 +546,39 @@ mod tests {
*/
#[test]
fn merge_with_json_buffer_and_vars() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, r#"{"hello":"{{name}}"}"#);
let mut hash = HashMap::<String, String>::new();
hash.insert("name".to_string(), "world".to_string());
let state = rule_state(&hb, &hash);
let to_merge = json!({"hello" : "{{name}}"});
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
}
#[test]
fn test_parsing_invalid() {
let buffer = "blah".to_string();
let parsed = parse_line(buffer);
if let Ok(msg) = &parsed {
println!("msg: {}", msg.msg);
}
assert!(parsed.is_err());
fn test_precompile_templates_merge() {
let mut hb = Handlebars::new();
let settings = Arc::new(load("test/configs/single-rule-with-merge.yml"));
// Assuming that we're going to register the template with this id
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
let result = precompile_templates(&mut hb, settings.clone());
assert!(result);
assert!(hb.has_template(&template_id));
}
#[test]
fn test_5424() {
let buffer = r#"<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hi"#.to_string();
let parsed = parse_line(buffer);
assert!(parsed.is_ok());
if let Ok(msg) = parsed {
assert_eq!("hi", msg.msg);
}
}
fn test_precompile_templates_replace() {
let mut hb = Handlebars::new();
let settings = Arc::new(load("test/configs/single-rule-with-replace.yml"));
// Assuming that we're going to register the template with this id
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
#[test]
fn test_3164() {
let buffer = r#"<190>May 13 21:45:18 coconut hotdog: hi"#.to_string();
let parsed = parse_line(buffer);
assert!(parsed.is_ok());
if let Ok(msg) = parsed {
assert_eq!("hi", msg.msg);
}
let result = precompile_templates(&mut hb, settings.clone());
assert!(result);
assert!(hb.has_template(&template_id));
}
}

74
src/parse.rs Normal file
View File

@ -0,0 +1,74 @@
/**
* Enum of syslog parse related errors
*/
#[derive(Debug)]
pub enum SyslogErrors {
UnknownFormat,
}
/**
* SyslogMessage is just a wrapper struct to allow us to deserialize RFC 5424 and RFC 3164 syslog
* messages into some format that can be passed throughout hotdog
*/
#[derive(Debug)]
pub struct SyslogMessage {
pub msg: String,
}
/**
* Attempt to parse a given line either as RFC 5424 or RFC 3164
*/
pub fn parse_line(line: String) -> std::result::Result<SyslogMessage, SyslogErrors> {
match syslog_rfc5424::parse_message(&line) {
Ok(msg) => Ok(SyslogMessage { msg: msg.msg }),
Err(_) => {
let parsed = syslog_loose::parse_message(&line);
/*
* Since syslog_loose doesn't give a Result, the only way to tell if themessage wasn't
* parsed properly is if some fields are None'd out.
*/
if parsed.timestamp != None {
return Ok(SyslogMessage {
msg: parsed.msg.to_string(),
});
}
Err(SyslogErrors::UnknownFormat)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parsing_invalid() {
let buffer = "blah".to_string();
let parsed = parse_line(buffer);
if let Ok(msg) = &parsed {
println!("msg: {}", msg.msg);
}
assert!(parsed.is_err());
}
#[test]
fn test_5424() {
let buffer = r#"<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hi"#.to_string();
let parsed = parse_line(buffer);
assert!(parsed.is_ok());
if let Ok(msg) = parsed {
assert_eq!("hi", msg.msg);
}
}
#[test]
fn test_3164() {
let buffer = r#"<190>May 13 21:45:18 coconut hotdog: hi"#.to_string();
let parsed = parse_line(buffer);
assert!(parsed.is_ok());
if let Ok(msg) = parsed {
assert_eq!("hi", msg.msg);
}
}
}

View File

@ -95,7 +95,7 @@ fn load_tls_config(settings: &Settings) -> io::Result<ServerConfig> {
pub async fn accept_loop(
addr: impl ToSocketAddrs,
settings: Arc<Settings>,
metrics: Arc<LockingOutput>,
metrics: Arc<StatsdScope>,
) -> Result<()> {
let config = load_tls_config(&settings)?;

View File

@ -7,11 +7,15 @@ use log::*;
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;
pub fn load(file: &str) -> Settings {
let conf = load_configuration(file);
conf.try_into()
.expect("Failed to parse the configuration file")
let mut settings: Settings = conf
.try_into()
.expect("Failed to parse the configuration file");
settings.populate_caches();
settings
}
fn load_configuration(file: &str) -> config::Config {
@ -57,14 +61,33 @@ pub enum Field {
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Action {
Forward { topic: String },
Merge { json: Value },
Replace { template: String },
Forward {
topic: String,
},
Merge {
json: Value,
#[serde(default = "default_none")]
json_str: Option<String>,
},
Replace {
template: String,
},
Stop,
}
impl Action {
fn populate_caches(&mut self) {
if let Action::Merge { json, json_str } = self {
*json_str =
Some(serde_json::to_string(json).expect("Failed to serialize Merge action"));
}
}
}
#[derive(Debug, Deserialize)]
pub struct Rule {
#[serde(skip_serializing, skip_deserializing, default = "default_uuid")]
pub uuid: Uuid,
pub field: Field,
pub actions: Vec<Action>,
#[serde(with = "serde_regex", default = "default_none")]
@ -73,6 +96,14 @@ pub struct Rule {
pub jmespath: String,
}
impl Rule {
fn populate_caches(&mut self) {
self.actions.iter_mut().for_each(|action| {
action.populate_caches();
});
}
}
#[derive(Debug, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum TlsType {
@ -130,6 +161,17 @@ pub struct Settings {
pub rules: Vec<Rule>,
}
impl Settings {
/**
* Populate any configuration caches which we want to us
*/
fn populate_caches(&mut self) {
self.rules.iter_mut().for_each(|rule| {
rule.populate_caches();
});
}
}
/*
* Default functions
*/
@ -156,6 +198,10 @@ fn default_none<T>() -> Option<T> {
None
}
fn default_uuid() -> Uuid {
Uuid::new_v4()
}
#[cfg(test)]
mod tests {
use super::*;
@ -165,6 +211,20 @@ mod tests {
load("hotdog.yml");
}
#[test]
fn test_load_example_and_populate_caches() {
let settings = load("test/configs/single-rule-with-merge.yml");
assert_eq!(settings.rules.len(), 1);
match &settings.rules[0].actions[0] {
Action::Merge { json: _, json_str } => {
assert!(json_str.is_some());
}
_ => {
assert!(false);
}
}
}
#[test]
fn test_default_tls() {
assert_eq!(TlsType::None, TlsType::default());
@ -179,4 +239,9 @@ mod tests {
fn test_kafka_buffer_default() {
assert_eq!(1024, kafka_buffer_default());
}
#[test]
fn test_default_uuid() {
assert_eq!(false, default_uuid().is_nil());
}
}

View File

@ -0,0 +1,27 @@
# A simple test configuration for verifiying some Merge action behavior
---
global:
listen:
address: '127.0.0.1'
port: 514
tls:
kafka:
conf:
bootstrap.servers: '127.0.0.1:9092'
# Default topic to log messages to that are not otherwise mapped
topic: 'test'
metrics:
statsd: 'localhost:8125'
rules:
# Match JSON content which has a meta.topic value, e.g.
# {"meta":{"topic" : "foo"}}
- jmespath: 'meta.topic'
field: msg
actions:
- type: merge
json:
meta:
hotdog:
version: '{{version}}'
timestamp: '{{iso8601}}'

View File

@ -0,0 +1,24 @@
# A simple test configuration for verifiying some Replace action behavior
---
global:
listen:
address: '127.0.0.1'
port: 514
tls:
kafka:
conf:
bootstrap.servers: '127.0.0.1:9092'
# Default topic to log messages to that are not otherwise mapped
topic: 'test'
metrics:
statsd: 'localhost:8125'
rules:
- regex: '^hello\s+(?P<name>\w+)?'
field: msg
actions:
- type: replace
template: |
This is the total message: {{msg}}
And the name is: {{name}}