Compare commits
14 Commits
199c0f7fe9
...
f6d4b0aebe
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | f6d4b0aebe | |
R Tyler Croy | 36ce1888b2 | |
R Tyler Croy | b592d6aa18 | |
R Tyler Croy | 741d63d983 | |
R Tyler Croy | 09f75ec0ee | |
R Tyler Croy | f28ef5f6eb | |
R Tyler Croy | 4cf7e1807f | |
R Tyler Croy | 9f83708b7d | |
R Tyler Croy | 5ed6360dc9 | |
R Tyler Croy | aa05859d1f | |
R Tyler Croy | 6016a1bc0c | |
R Tyler Croy | 0faabcef05 | |
R Tyler Croy | a098ba0bfc | |
R Tyler Croy | ff656efcf8 |
|
@ -336,9 +336,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dipstick"
|
||||
version = "0.7.13"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52d7ff907d35a3b505f17bbb7cd0afb61da5c0e7c35b0fbe90fb7d3f59e402ff"
|
||||
checksum = "585cd86f9fe6282234f10e7435d1c0df9203909731ca9ebaabf5f7e80d5f28e4"
|
||||
dependencies = [
|
||||
"atomic_refcell",
|
||||
"crossbeam-channel 0.3.9",
|
||||
|
@ -496,6 +496,17 @@ dependencies = [
|
|||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "handlebars"
|
||||
version = "3.0.1"
|
||||
|
@ -521,7 +532,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "hotdog"
|
||||
version = "0.1.9"
|
||||
version = "0.2.1"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"async-tls",
|
||||
|
@ -544,6 +555,7 @@ dependencies = [
|
|||
"serde_regex",
|
||||
"syslog_loose",
|
||||
"syslog_rfc5424",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1020,6 +1032,12 @@ version = "0.3.17"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
|
||||
|
||||
[[package]]
|
||||
name = "pretty_env_logger"
|
||||
version = "0.3.1"
|
||||
|
@ -1076,6 +1094,47 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
"rand_hc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
|
||||
dependencies = [
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rdkafka"
|
||||
version = "0.23.1"
|
||||
|
@ -1438,6 +1497,15 @@ version = "0.7.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
|
||||
dependencies = [
|
||||
"rand",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.8"
|
||||
|
@ -1456,6 +1524,12 @@ version = "0.9.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.9.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen"
|
||||
version = "0.2.62"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "hotdog"
|
||||
version = "0.1.9"
|
||||
version = "0.2.1"
|
||||
authors = ["R. Tyler Croy <rtyler+hotdog@brokenco.de>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -52,10 +52,13 @@ regex = "~1.3.6"
|
|||
jmespath = { git = "https://github.com/jmespath/jmespath.rs" }
|
||||
|
||||
# Needed to report metrics of hotdog's performance
|
||||
dipstick = "~0.7.12"
|
||||
dipstick = "~0.8.0"
|
||||
|
||||
# Used for string replacements and other template based transformations
|
||||
handlebars = "~3.0.1"
|
||||
|
||||
# Needed for time management
|
||||
chrono = "~0.4.11"
|
||||
|
||||
# Needed to tag rules and actions with their own unique identifiers
|
||||
uuid = { version = "~0.8.1", features = ["v4"] }
|
||||
|
|
11
README.adoc
11
README.adoc
|
@ -453,6 +453,17 @@ For TLS connections, you can use the `openssl` `s_client` command:
|
|||
echo '<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hello world' | openssl s_client -connect localhost:6514
|
||||
----
|
||||
|
||||
|
||||
=== Profiling
|
||||
|
||||
Profiling `hotdog` is best done on a Linux host with the `perf` tool, e.g.
|
||||
|
||||
[source,bash]
|
||||
----
|
||||
RUST_LOG=info perf record --call-graph dwarf -- ./target/debug/hotdog -c ./hotdog.yml
|
||||
perf report -ng
|
||||
----
|
||||
|
||||
== Similar Projects
|
||||
|
||||
`hotdog` was originally motivated by challenges with
|
||||
|
|
|
@ -39,10 +39,10 @@ rules:
|
|||
# We don't want any other rules to try to consume these messages
|
||||
- type: stop
|
||||
|
||||
# Match all JSON looking content
|
||||
# Match JSON content which has a meta.topic value, e.g.
|
||||
# {"meta":{"topic" : "foo"}}
|
||||
- jmespath: 'meta.topic'
|
||||
field: msg
|
||||
# But don't do anything with it ^_^
|
||||
actions:
|
||||
- type: merge
|
||||
json:
|
||||
|
|
|
@ -40,7 +40,7 @@ pub struct Kafka {
|
|||
* ::new() and the .connect() function
|
||||
*/
|
||||
producer: Option<FutureProducer<DefaultClientContext>>,
|
||||
metrics: Option<Arc<LockingOutput>>,
|
||||
metrics: Option<Arc<StatsdScope>>,
|
||||
rx: Receiver<KafkaMessage>,
|
||||
tx: Sender<KafkaMessage>,
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ impl Kafka {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn with_metrics(&mut self, metrics: Arc<LockingOutput>) {
|
||||
pub fn with_metrics(&mut self, metrics: Arc<StatsdScope>) {
|
||||
self.metrics = Some(metrics);
|
||||
}
|
||||
|
||||
|
@ -166,7 +166,8 @@ impl Kafka {
|
|||
Ok(_) => {
|
||||
timer.stop(handle);
|
||||
m.counter("kafka.submitted").count(1);
|
||||
m.counter(&format!("kafka.submitted.{}", &kmsg.topic)).count(1);
|
||||
m.counter(&format!("kafka.submitted.{}", &kmsg.topic))
|
||||
.count(1);
|
||||
}
|
||||
Err((err, _)) => {
|
||||
match err {
|
||||
|
|
239
src/main.rs
239
src/main.rs
|
@ -12,11 +12,8 @@ extern crate serde;
|
|||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde_regex;
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate serde_json;
|
||||
extern crate syslog_rfc5424;
|
||||
extern crate syslog_loose;
|
||||
extern crate syslog_rfc5424;
|
||||
|
||||
use async_std::{
|
||||
io::BufReader,
|
||||
|
@ -35,6 +32,7 @@ use std::collections::HashMap;
|
|||
|
||||
mod kafka;
|
||||
mod merge;
|
||||
mod parse;
|
||||
mod rules;
|
||||
mod serve_tls;
|
||||
mod settings;
|
||||
|
@ -49,7 +47,7 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
|
|||
*/
|
||||
pub struct ConnectionState {
|
||||
settings: Arc<Settings>,
|
||||
metrics: Arc<LockingOutput>,
|
||||
metrics: Arc<StatsdScope>,
|
||||
sender: Sender<KafkaMessage>,
|
||||
}
|
||||
|
||||
|
@ -60,7 +58,7 @@ pub struct ConnectionState {
|
|||
struct RuleState<'a> {
|
||||
variables: &'a HashMap<String, String>,
|
||||
hb: &'a handlebars::Handlebars<'a>,
|
||||
metrics: Arc<LockingOutput>,
|
||||
metrics: Arc<StatsdScope>,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
|
@ -136,7 +134,7 @@ fn main() -> Result<()> {
|
|||
async fn accept_loop(
|
||||
addr: impl ToSocketAddrs,
|
||||
settings: Arc<Settings>,
|
||||
metrics: Arc<LockingOutput>,
|
||||
metrics: Arc<StatsdScope>,
|
||||
) -> Result<()> {
|
||||
let mut kafka = Kafka::new(settings.global.kafka.buffer);
|
||||
|
||||
|
@ -178,41 +176,46 @@ async fn accept_loop(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum SyslogErrors {
|
||||
UnknownFormat,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SyslogMessage {
|
||||
msg: String,
|
||||
fn template_id_for(rule: &Rule, index: usize) -> String {
|
||||
format!("{}-{}", rule.uuid, index)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to parse a given line either as RFC 5424 or RFC 3164
|
||||
* precompile_templates will register templates for all the Merge and Replace actions from the
|
||||
* settings
|
||||
*
|
||||
* Will usually return a true, unless some setting parse failure occurred which is a critical
|
||||
* failure for the daemon
|
||||
*/
|
||||
fn parse_line(line: String) -> std::result::Result<SyslogMessage, SyslogErrors> {
|
||||
match syslog_rfc5424::parse_message(&line) {
|
||||
Ok(msg) => {
|
||||
return Ok(SyslogMessage {
|
||||
msg: msg.msg,
|
||||
})
|
||||
},
|
||||
Err(_) => {
|
||||
let parsed = syslog_loose::parse_message(&line);
|
||||
fn precompile_templates(hb: &mut Handlebars, settings: Arc<Settings>) -> bool {
|
||||
for rule in settings.rules.iter() {
|
||||
for index in 0..rule.actions.len() {
|
||||
match &rule.actions[index] {
|
||||
Action::Merge { json: _, json_str } => {
|
||||
let template_id = template_id_for(rule, index);
|
||||
|
||||
/*
|
||||
* Since syslog_loose doesn't give a Result, the only way to tell if themessage wasn't
|
||||
* parsed properly is if some fields are None'd out.
|
||||
*/
|
||||
if parsed.timestamp != None {
|
||||
return Ok(SyslogMessage{
|
||||
msg: parsed.msg.to_string(),
|
||||
})
|
||||
if let Some(template) = json_str {
|
||||
if let Err(e) = hb.register_template_string(&template_id, &template) {
|
||||
error!("Failed to register template! {}\n{}", e, template);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
error!("Could not look up the json_str for a Merge action");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Action::Replace { template } => {
|
||||
let template_id = format!("{}-{}", rule.uuid, index);
|
||||
if let Err(e) = hb.register_template_string(&template_id, &template) {
|
||||
error!("Failed to register template! {}\n{}", e, template);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Err(SyslogErrors::UnknownFormat)
|
||||
},
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -226,13 +229,18 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
let mut lines = reader.lines();
|
||||
let lines_count = state.metrics.counter("lines");
|
||||
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
if !precompile_templates(&mut hb, state.settings.clone()) {
|
||||
error!("Failing to precompile templates is a fatal error, not going to parse logs since the configuration is broken");
|
||||
// TODO fix the Err types
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
while let Some(line) = lines.next().await {
|
||||
let line = line?;
|
||||
debug!("log: {}", line);
|
||||
|
||||
let parsed = parse_line(line);
|
||||
let parsed = parse::parse_line(line);
|
||||
|
||||
if let Err(_e) = &parsed {
|
||||
state.metrics.counter("error.log_parse").count(1);
|
||||
|
@ -247,7 +255,6 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
let mut continue_rules = true;
|
||||
debug!("parsed as: {}", msg.msg);
|
||||
|
||||
|
||||
for rule in state.settings.rules.iter() {
|
||||
/*
|
||||
* If we have been told to stop processing rules, then it's time to bail on this log
|
||||
|
@ -321,7 +328,9 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
/*
|
||||
* Process the actions one the rule has matched
|
||||
*/
|
||||
for action in rule.actions.iter() {
|
||||
for index in 0..rule.actions.len() {
|
||||
let action = &rule.actions[index];
|
||||
|
||||
match action {
|
||||
Action::Forward { topic } => {
|
||||
/*
|
||||
|
@ -366,20 +375,30 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
}
|
||||
break;
|
||||
}
|
||||
Action::Merge { json } => {
|
||||
|
||||
Action::Merge { json, json_str: _ } => {
|
||||
debug!("merging JSON content: {}", json);
|
||||
if let Ok(buffer) = perform_merge(&msg.msg, json, &rule_state) {
|
||||
if let Ok(buffer) =
|
||||
perform_merge(&msg.msg, &template_id_for(&rule, index), &rule_state)
|
||||
{
|
||||
output = buffer;
|
||||
} else {
|
||||
continue_rules = false;
|
||||
}
|
||||
}
|
||||
|
||||
Action::Replace { template } => {
|
||||
debug!("replacing content with template: {}", template);
|
||||
if let Ok(rendered) = hb.render_template(template, &hash) {
|
||||
let template_id = template_id_for(&rule, index);
|
||||
|
||||
debug!(
|
||||
"replacing content with template: {} ({})",
|
||||
template, template_id
|
||||
);
|
||||
if let Ok(rendered) = hb.render(&template_id, &hash) {
|
||||
output = rendered;
|
||||
}
|
||||
}
|
||||
|
||||
Action::Stop => {
|
||||
continue_rules = false;
|
||||
}
|
||||
|
@ -396,31 +415,39 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
*/
|
||||
fn perform_merge(
|
||||
buffer: &str,
|
||||
to_merge: &serde_json::Value,
|
||||
template_id: &str,
|
||||
state: &RuleState,
|
||||
) -> std::result::Result<String, String> {
|
||||
/*
|
||||
* If the administrator configured the merge incorrectly, just pass the buffer along un-merged
|
||||
*/
|
||||
if !to_merge.is_object() {
|
||||
error!("Merge requested was not a JSON object: {}", to_merge);
|
||||
state.metrics.counter("error.merge_of_invalid_json").count(1);
|
||||
return Ok(buffer.to_string());
|
||||
}
|
||||
if let Ok(mut msg_json) = serde_json::from_str(&buffer) {
|
||||
if let Ok(rendered) = state.hb.render(template_id, &state.variables) {
|
||||
let to_merge: serde_json::Value = serde_json::from_str(&rendered)
|
||||
.expect("Failed to deserialize our rendered to_merge_str");
|
||||
|
||||
if let Ok(mut msg_json) = serde_json::from_str::<serde_json::Value>(buffer) {
|
||||
merge::merge(&mut msg_json, to_merge);
|
||||
if let Ok(output) = serde_json::to_string(&msg_json) {
|
||||
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
|
||||
return Ok(rendered);
|
||||
/*
|
||||
* If the administrator configured the merge incorrectly, just pass the buffer along un-merged
|
||||
*/
|
||||
if !to_merge.is_object() {
|
||||
error!("Merge requested was not a JSON object: {}", to_merge);
|
||||
state
|
||||
.metrics
|
||||
.counter("error.merge_of_invalid_json")
|
||||
.count(1);
|
||||
return Ok(buffer.to_string());
|
||||
}
|
||||
|
||||
merge::merge(&mut msg_json, &to_merge);
|
||||
|
||||
if let Ok(output) = serde_json::to_string(&msg_json) {
|
||||
return Ok(output);
|
||||
}
|
||||
Ok(output)
|
||||
} else {
|
||||
Err("Failed to render".to_string())
|
||||
}
|
||||
Err("Failed to merge and serialize".to_string())
|
||||
} else {
|
||||
error!("Failed to parse as JSON, stopping actions: {}", buffer);
|
||||
state.metrics.counter("error.merge_target_not_json").count(1);
|
||||
state
|
||||
.metrics
|
||||
.counter("error.merge_target_not_json")
|
||||
.count(1);
|
||||
Err("Not JSON".to_string())
|
||||
}
|
||||
}
|
||||
|
@ -432,9 +459,10 @@ mod tests {
|
|||
/**
|
||||
* Generating a test RuleState for consistent states in test
|
||||
*/
|
||||
fn rule_state<'a>(hb : &'a handlebars::Handlebars<'a>,
|
||||
hash : &'a HashMap<String, String>) -> RuleState<'a> {
|
||||
|
||||
fn rule_state<'a>(
|
||||
hb: &'a handlebars::Handlebars<'a>,
|
||||
hash: &'a HashMap<String, String>,
|
||||
) -> RuleState<'a> {
|
||||
let metrics = Arc::new(
|
||||
Statsd::send_to("example.com:8125")
|
||||
.expect("Failed to create Statsd recorder")
|
||||
|
@ -445,18 +473,20 @@ mod tests {
|
|||
RuleState {
|
||||
hb: &hb,
|
||||
variables: &hash,
|
||||
metrics: metrics,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_with_empty() {
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
let template_id = "1";
|
||||
hb.register_template_string(&template_id, "{}");
|
||||
|
||||
let hash = HashMap::<String, String>::new();
|
||||
let state = rule_state(&hb, &hash);
|
||||
|
||||
let to_merge = json!({});
|
||||
let output = perform_merge("{}", &to_merge, &state);
|
||||
let output = perform_merge("{}", template_id, &state);
|
||||
assert_eq!(output, Ok("{}".to_string()));
|
||||
}
|
||||
|
||||
|
@ -465,12 +495,14 @@ mod tests {
|
|||
*/
|
||||
#[test]
|
||||
fn merge_with_non_object() -> std::result::Result<(), String> {
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
let template_id = "1";
|
||||
hb.register_template_string(&template_id, "[1]");
|
||||
|
||||
let hash = HashMap::<String, String>::new();
|
||||
let state = rule_state(&hb, &hash);
|
||||
|
||||
let to_merge = json!([1]);
|
||||
let output = perform_merge("{}", &to_merge, &state)?;
|
||||
let output = perform_merge("{}", template_id, &state)?;
|
||||
assert_eq!(output, "{}".to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
@ -480,12 +512,15 @@ mod tests {
|
|||
*/
|
||||
#[test]
|
||||
fn merge_without_json_buffer() {
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
let template_id = "1";
|
||||
hb.register_template_string(&template_id, "{}");
|
||||
|
||||
let hash = HashMap::<String, String>::new();
|
||||
let state = rule_state(&hb, &hash);
|
||||
|
||||
let to_merge = json!({});
|
||||
let output = perform_merge("invalid", &to_merge, &state);
|
||||
let to_merge = r#"{}"#;
|
||||
let output = perform_merge("invalid", template_id, &state);
|
||||
let expected = Err("Not JSON".to_string());
|
||||
assert_eq!(output, expected);
|
||||
}
|
||||
|
@ -495,12 +530,14 @@ mod tests {
|
|||
*/
|
||||
#[test]
|
||||
fn merge_with_json_buffer() {
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
let template_id = "1";
|
||||
hb.register_template_string(&template_id, r#"{"hello":1}"#);
|
||||
|
||||
let hash = HashMap::<String, String>::new();
|
||||
let state = rule_state(&hb, &hash);
|
||||
|
||||
let to_merge = json!({"hello" : 1});
|
||||
let output = perform_merge("{}", &to_merge, &state);
|
||||
let output = perform_merge("{}", template_id, &state);
|
||||
assert_eq!(output, Ok("{\"hello\":1}".to_string()));
|
||||
}
|
||||
|
||||
|
@ -509,43 +546,39 @@ mod tests {
|
|||
*/
|
||||
#[test]
|
||||
fn merge_with_json_buffer_and_vars() {
|
||||
let hb = Handlebars::new();
|
||||
let mut hb = Handlebars::new();
|
||||
let template_id = "1";
|
||||
hb.register_template_string(&template_id, r#"{"hello":"{{name}}"}"#);
|
||||
|
||||
let mut hash = HashMap::<String, String>::new();
|
||||
hash.insert("name".to_string(), "world".to_string());
|
||||
let state = rule_state(&hb, &hash);
|
||||
|
||||
let to_merge = json!({"hello" : "{{name}}"});
|
||||
let output = perform_merge("{}", &to_merge, &state);
|
||||
let output = perform_merge("{}", template_id, &state);
|
||||
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parsing_invalid() {
|
||||
let buffer = "blah".to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
if let Ok(msg) = &parsed {
|
||||
println!("msg: {}", msg.msg);
|
||||
}
|
||||
assert!(parsed.is_err());
|
||||
fn test_precompile_templates_merge() {
|
||||
let mut hb = Handlebars::new();
|
||||
let settings = Arc::new(load("test/configs/single-rule-with-merge.yml"));
|
||||
// Assuming that we're going to register the template with this id
|
||||
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
|
||||
|
||||
let result = precompile_templates(&mut hb, settings.clone());
|
||||
assert!(result);
|
||||
assert!(hb.has_template(&template_id));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_5424() {
|
||||
let buffer = r#"<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hi"#.to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
assert!(parsed.is_ok());
|
||||
if let Ok(msg) = parsed {
|
||||
assert_eq!("hi", msg.msg);
|
||||
}
|
||||
}
|
||||
fn test_precompile_templates_replace() {
|
||||
let mut hb = Handlebars::new();
|
||||
let settings = Arc::new(load("test/configs/single-rule-with-replace.yml"));
|
||||
// Assuming that we're going to register the template with this id
|
||||
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
|
||||
|
||||
#[test]
|
||||
fn test_3164() {
|
||||
let buffer = r#"<190>May 13 21:45:18 coconut hotdog: hi"#.to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
assert!(parsed.is_ok());
|
||||
if let Ok(msg) = parsed {
|
||||
assert_eq!("hi", msg.msg);
|
||||
}
|
||||
let result = precompile_templates(&mut hb, settings.clone());
|
||||
assert!(result);
|
||||
assert!(hb.has_template(&template_id));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Enum of syslog parse related errors
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub enum SyslogErrors {
|
||||
UnknownFormat,
|
||||
}
|
||||
|
||||
/**
|
||||
* SyslogMessage is just a wrapper struct to allow us to deserialize RFC 5424 and RFC 3164 syslog
|
||||
* messages into some format that can be passed throughout hotdog
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub struct SyslogMessage {
|
||||
pub msg: String,
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to parse a given line either as RFC 5424 or RFC 3164
|
||||
*/
|
||||
pub fn parse_line(line: String) -> std::result::Result<SyslogMessage, SyslogErrors> {
|
||||
match syslog_rfc5424::parse_message(&line) {
|
||||
Ok(msg) => Ok(SyslogMessage { msg: msg.msg }),
|
||||
Err(_) => {
|
||||
let parsed = syslog_loose::parse_message(&line);
|
||||
|
||||
/*
|
||||
* Since syslog_loose doesn't give a Result, the only way to tell if themessage wasn't
|
||||
* parsed properly is if some fields are None'd out.
|
||||
*/
|
||||
if parsed.timestamp != None {
|
||||
return Ok(SyslogMessage {
|
||||
msg: parsed.msg.to_string(),
|
||||
});
|
||||
}
|
||||
Err(SyslogErrors::UnknownFormat)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parsing_invalid() {
|
||||
let buffer = "blah".to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
if let Ok(msg) = &parsed {
|
||||
println!("msg: {}", msg.msg);
|
||||
}
|
||||
assert!(parsed.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_5424() {
|
||||
let buffer = r#"<13>1 2020-04-18T15:16:09.956153-07:00 coconut tyler - - [timeQuality tzKnown="1" isSynced="1" syncAccuracy="505061"] hi"#.to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
assert!(parsed.is_ok());
|
||||
if let Ok(msg) = parsed {
|
||||
assert_eq!("hi", msg.msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_3164() {
|
||||
let buffer = r#"<190>May 13 21:45:18 coconut hotdog: hi"#.to_string();
|
||||
let parsed = parse_line(buffer);
|
||||
assert!(parsed.is_ok());
|
||||
if let Ok(msg) = parsed {
|
||||
assert_eq!("hi", msg.msg);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -95,7 +95,7 @@ fn load_tls_config(settings: &Settings) -> io::Result<ServerConfig> {
|
|||
pub async fn accept_loop(
|
||||
addr: impl ToSocketAddrs,
|
||||
settings: Arc<Settings>,
|
||||
metrics: Arc<LockingOutput>,
|
||||
metrics: Arc<StatsdScope>,
|
||||
) -> Result<()> {
|
||||
let config = load_tls_config(&settings)?;
|
||||
|
||||
|
|
|
@ -7,11 +7,15 @@ use log::*;
|
|||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn load(file: &str) -> Settings {
|
||||
let conf = load_configuration(file);
|
||||
conf.try_into()
|
||||
.expect("Failed to parse the configuration file")
|
||||
let mut settings: Settings = conf
|
||||
.try_into()
|
||||
.expect("Failed to parse the configuration file");
|
||||
settings.populate_caches();
|
||||
settings
|
||||
}
|
||||
|
||||
fn load_configuration(file: &str) -> config::Config {
|
||||
|
@ -57,14 +61,33 @@ pub enum Field {
|
|||
#[derive(Debug, Deserialize)]
|
||||
#[serde(rename_all = "camelCase", tag = "type")]
|
||||
pub enum Action {
|
||||
Forward { topic: String },
|
||||
Merge { json: Value },
|
||||
Replace { template: String },
|
||||
Forward {
|
||||
topic: String,
|
||||
},
|
||||
Merge {
|
||||
json: Value,
|
||||
#[serde(default = "default_none")]
|
||||
json_str: Option<String>,
|
||||
},
|
||||
Replace {
|
||||
template: String,
|
||||
},
|
||||
Stop,
|
||||
}
|
||||
|
||||
impl Action {
|
||||
fn populate_caches(&mut self) {
|
||||
if let Action::Merge { json, json_str } = self {
|
||||
*json_str =
|
||||
Some(serde_json::to_string(json).expect("Failed to serialize Merge action"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Rule {
|
||||
#[serde(skip_serializing, skip_deserializing, default = "default_uuid")]
|
||||
pub uuid: Uuid,
|
||||
pub field: Field,
|
||||
pub actions: Vec<Action>,
|
||||
#[serde(with = "serde_regex", default = "default_none")]
|
||||
|
@ -73,6 +96,14 @@ pub struct Rule {
|
|||
pub jmespath: String,
|
||||
}
|
||||
|
||||
impl Rule {
|
||||
fn populate_caches(&mut self) {
|
||||
self.actions.iter_mut().for_each(|action| {
|
||||
action.populate_caches();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
#[serde(untagged)]
|
||||
pub enum TlsType {
|
||||
|
@ -130,6 +161,17 @@ pub struct Settings {
|
|||
pub rules: Vec<Rule>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
/**
|
||||
* Populate any configuration caches which we want to us
|
||||
*/
|
||||
fn populate_caches(&mut self) {
|
||||
self.rules.iter_mut().for_each(|rule| {
|
||||
rule.populate_caches();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Default functions
|
||||
*/
|
||||
|
@ -156,6 +198,10 @@ fn default_none<T>() -> Option<T> {
|
|||
None
|
||||
}
|
||||
|
||||
fn default_uuid() -> Uuid {
|
||||
Uuid::new_v4()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -165,6 +211,20 @@ mod tests {
|
|||
load("hotdog.yml");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_example_and_populate_caches() {
|
||||
let settings = load("test/configs/single-rule-with-merge.yml");
|
||||
assert_eq!(settings.rules.len(), 1);
|
||||
match &settings.rules[0].actions[0] {
|
||||
Action::Merge { json: _, json_str } => {
|
||||
assert!(json_str.is_some());
|
||||
}
|
||||
_ => {
|
||||
assert!(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_tls() {
|
||||
assert_eq!(TlsType::None, TlsType::default());
|
||||
|
@ -179,4 +239,9 @@ mod tests {
|
|||
fn test_kafka_buffer_default() {
|
||||
assert_eq!(1024, kafka_buffer_default());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_uuid() {
|
||||
assert_eq!(false, default_uuid().is_nil());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# A simple test configuration for verifiying some Merge action behavior
|
||||
---
|
||||
global:
|
||||
listen:
|
||||
address: '127.0.0.1'
|
||||
port: 514
|
||||
tls:
|
||||
kafka:
|
||||
conf:
|
||||
bootstrap.servers: '127.0.0.1:9092'
|
||||
# Default topic to log messages to that are not otherwise mapped
|
||||
topic: 'test'
|
||||
metrics:
|
||||
statsd: 'localhost:8125'
|
||||
|
||||
rules:
|
||||
# Match JSON content which has a meta.topic value, e.g.
|
||||
# {"meta":{"topic" : "foo"}}
|
||||
- jmespath: 'meta.topic'
|
||||
field: msg
|
||||
actions:
|
||||
- type: merge
|
||||
json:
|
||||
meta:
|
||||
hotdog:
|
||||
version: '{{version}}'
|
||||
timestamp: '{{iso8601}}'
|
|
@ -0,0 +1,24 @@
|
|||
# A simple test configuration for verifiying some Replace action behavior
|
||||
---
|
||||
global:
|
||||
listen:
|
||||
address: '127.0.0.1'
|
||||
port: 514
|
||||
tls:
|
||||
kafka:
|
||||
conf:
|
||||
bootstrap.servers: '127.0.0.1:9092'
|
||||
# Default topic to log messages to that are not otherwise mapped
|
||||
topic: 'test'
|
||||
metrics:
|
||||
statsd: 'localhost:8125'
|
||||
|
||||
rules:
|
||||
- regex: '^hello\s+(?P<name>\w+)?'
|
||||
field: msg
|
||||
actions:
|
||||
- type: replace
|
||||
template: |
|
||||
This is the total message: {{msg}}
|
||||
|
||||
And the name is: {{name}}
|
Loading…
Reference in New Issue