Pre-compile the Handlebars templates for each connection rather than each log line

Since I wasn't able to figure out a good way to pre-compile all the Handlebars
templates at 🌭 boot time, this is the next best thing. Once a connection
comes in, pre-compile the templates to speed up handling each log line

Fixes #3
This commit is contained in:
R Tyler Croy 2020-05-21 13:31:23 -07:00
parent 09f75ec0ee
commit 741d63d983
5 changed files with 229 additions and 34 deletions

74
Cargo.lock generated
View File

@ -496,6 +496,17 @@ dependencies = [
"typenum",
]
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "handlebars"
version = "3.0.1"
@ -544,6 +555,7 @@ dependencies = [
"serde_regex",
"syslog_loose",
"syslog_rfc5424",
"uuid",
]
[[package]]
@ -1020,6 +1032,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "pretty_env_logger"
version = "0.3.1"
@ -1076,6 +1094,47 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "rdkafka"
version = "0.23.1"
@ -1438,6 +1497,15 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
dependencies = [
"rand",
]
[[package]]
name = "vcpkg"
version = "0.2.8"
@ -1456,6 +1524,12 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasm-bindgen"
version = "0.2.62"

View File

@ -59,3 +59,6 @@ handlebars = "~3.0.1"
# Needed for time management
chrono = "~0.4.11"
# Needed to tag rules and actions with their own unique identifiers
uuid = { version = "~0.8.1", features = ["v4"] }

View File

@ -176,6 +176,46 @@ async fn accept_loop(
Ok(())
}
fn template_id_for(rule: &Rule, index: usize) -> String {
format!("{}-{}", rule.uuid, index)
}
/**
* precompile_templates will register templates for all the Merge and Replace actions from the
* settings
*
* Will usually return a true, unless some setting parse failure occurred which is a critical
* failure for the daemon
*/
fn precompile_templates(hb: &mut Handlebars, settings: Arc<Settings>) -> bool {
for rule in settings.rules.iter() {
for index in 0..rule.actions.len() {
match &rule.actions[index] {
Action::Merge { json: _, json_str } => {
let template_id = template_id_for(rule, index);
if let Some(template) = json_str {
hb.register_template_string(&template_id, &template);
}
else {
error!("Could not look up the json_str for a Merge action");
return false;
}
},
Action::Replace { template } => {
let template_id = format!("{}-{}", rule.uuid, index);
hb.register_template_string(&template_id, &template);
},
_ => {
},
}
}
}
true
}
/**
* connection_loop is responsible for handling incoming syslog streams connections
*
@ -187,7 +227,12 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
let mut lines = reader.lines();
let lines_count = state.metrics.counter("lines");
let hb = Handlebars::new();
let mut hb = Handlebars::new();
if ! precompile_templates(&mut hb, state.settings.clone()) {
error!("Failing to precompile templates is a fatal error, not going to parse logs since the configuration is broken");
// TODO fix the Err types
return Ok(());
}
while let Some(line) = lines.next().await {
let line = line?;
@ -282,7 +327,9 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
/*
* Process the actions one the rule has matched
*/
for action in rule.actions.iter() {
for index in 0..rule.actions.len() {
let action = &rule.actions[index];
match action {
Action::Forward { topic } => {
/*
@ -326,26 +373,26 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
state.metrics.counter("error.topic_parse_failed").count(1);
}
break;
}
Action::Merge { json, json_str } => {
if let Some(json_str) = json_str {
debug!("merging JSON content: {}", json);
if let Ok(buffer) = perform_merge(&msg.msg, json_str, &rule_state) {
output = buffer;
} else {
continue_rules = false;
}
},
Action::Merge { json, json_str: _ } => {
debug!("merging JSON content: {}", json);
if let Ok(buffer) = perform_merge(&msg.msg, &template_id_for(&rule, index), &rule_state) {
output = buffer;
} else {
continue_rules = false;
}
else {
error!("Merge action contained no cached json-str for {}", json);
}
}
},
Action::Replace { template } => {
debug!("replacing content with template: {}", template);
if let Ok(rendered) = hb.render_template(template, &hash) {
let template_id = template_id_for(&rule, index);
debug!("replacing content with template: {} ({})", template, template_id);
if let Ok(rendered) = hb.render(&template_id, &hash) {
output = rendered;
}
}
},
Action::Stop => {
continue_rules = false;
}
@ -362,12 +409,12 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
*/
fn perform_merge(
buffer: &str,
to_merge: &str,
template_id: &str,
state: &RuleState,
) -> std::result::Result<String, String> {
if let Ok(mut msg_json) = serde_json::from_str(&buffer) {
if let Ok(rendered) = state.hb.render_template(&to_merge, &state.variables) {
if let Ok(rendered) = state.hb.render(template_id, &state.variables) {
let to_merge: serde_json::Value = serde_json::from_str(&rendered).expect("Failed to deserialize our rendered to_merge_str");
/*
@ -419,12 +466,14 @@ mod tests {
#[test]
fn merge_with_empty() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "{}");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = r#"{}"#;
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{}".to_string()));
}
@ -433,12 +482,14 @@ mod tests {
*/
#[test]
fn merge_with_non_object() -> std::result::Result<(), String> {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "[1]");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = r#"[1]"#;
let output = perform_merge("{}", &to_merge, &state)?;
let output = perform_merge("{}", template_id, &state)?;
assert_eq!(output, "{}".to_string());
Ok(())
}
@ -448,12 +499,15 @@ mod tests {
*/
#[test]
fn merge_without_json_buffer() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, "{}");
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = r#"{}"#;
let output = perform_merge("invalid", &to_merge, &state);
let output = perform_merge("invalid", template_id, &state);
let expected = Err("Not JSON".to_string());
assert_eq!(output, expected);
}
@ -463,12 +517,14 @@ mod tests {
*/
#[test]
fn merge_with_json_buffer() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, r#"{"hello":1}"#);
let hash = HashMap::<String, String>::new();
let state = rule_state(&hb, &hash);
let to_merge = r#"{"hello":1}"#;
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{\"hello\":1}".to_string()));
}
@ -477,13 +533,39 @@ mod tests {
*/
#[test]
fn merge_with_json_buffer_and_vars() {
let hb = Handlebars::new();
let mut hb = Handlebars::new();
let template_id = "1";
hb.register_template_string(&template_id, r#"{"hello":"{{name}}"}"#);
let mut hash = HashMap::<String, String>::new();
hash.insert("name".to_string(), "world".to_string());
let state = rule_state(&hb, &hash);
let to_merge = r#"{"hello":"{{name}}"}"#;
let output = perform_merge("{}", &to_merge, &state);
let output = perform_merge("{}", template_id, &state);
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
}
#[test]
fn test_precompile_templates_merge() {
let mut hb = Handlebars::new();
let settings = Arc::new(load("test/configs/single-rule-with-merge.yml"));
// Assuming that we're going to register the template with this id
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
let result = precompile_templates(&mut hb, settings.clone());
assert!(result);
assert!(hb.has_template(&template_id));
}
#[test]
fn test_precompile_templates_replace() {
let mut hb = Handlebars::new();
let settings = Arc::new(load("test/configs/single-rule-with-replace.yml"));
// Assuming that we're going to register the template with this id
let template_id = format!("{}-{}", settings.rules[0].uuid, 0);
let result = precompile_templates(&mut hb, settings.clone());
assert!(result);
assert!(hb.has_template(&template_id));
}
}

View File

@ -7,6 +7,7 @@ use log::*;
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;
pub fn load(file: &str) -> Settings {
let conf = load_configuration(file);
@ -82,6 +83,8 @@ impl Action {
#[derive(Debug, Deserialize)]
pub struct Rule {
#[serde(skip_serializing, skip_deserializing, default = "default_uuid")]
pub uuid: Uuid,
pub field: Field,
pub actions: Vec<Action>,
#[serde(with = "serde_regex", default = "default_none")]
@ -192,6 +195,10 @@ fn default_none<T>() -> Option<T> {
None
}
fn default_uuid() -> Uuid {
Uuid::new_v4()
}
#[cfg(test)]
mod tests {
use super::*;
@ -229,4 +236,9 @@ mod tests {
fn test_kafka_buffer_default() {
assert_eq!(1024, kafka_buffer_default());
}
#[test]
fn test_default_uuid() {
assert_eq!(false, default_uuid().is_nil());
}
}

View File

@ -0,0 +1,24 @@
# A simple test configuration for verifiying some Replace action behavior
---
global:
listen:
address: '127.0.0.1'
port: 514
tls:
kafka:
conf:
bootstrap.servers: '127.0.0.1:9092'
# Default topic to log messages to that are not otherwise mapped
topic: 'test'
metrics:
statsd: 'localhost:8125'
rules:
- regex: '^hello\s+(?P<name>\w+)?'
field: msg
actions:
- type: replace
template: |
This is the total message: {{msg}}
And the name is: {{name}}