Implement per-connection caching of JMESPath compilations

This should further reduce the amount of CPU time spent per log line

Fixes #32
This commit is contained in:
R Tyler Croy 2020-05-29 16:11:13 -07:00
parent 8f6ae815f3
commit 016b1ae52d
5 changed files with 102 additions and 27 deletions

View File

@ -49,7 +49,9 @@ serde_regex = "~0.4.0"
regex = "~1.3.6"
# used for rule matching on JSON
jmespath = { git = "https://github.com/jmespath/jmespath.rs" }
# The "sync" feature is undocumented but required in order to swap Rc for Arc
# in the crate, allowing it to be used with futures and threads properly
jmespath = { git = "https://github.com/jmespath/jmespath.rs", features = ["sync"] }
# Needed to report metrics of hotdog's performance
dipstick = "~0.8.0"

View File

@ -26,6 +26,13 @@ struct RuleState<'a> {
metrics: Arc<StatsdScope>,
}
/**
* Simple type to capture a map of precompiled jmespath expressions
*/
pub type JmesPathExpressions<'a> = HashMap<String, jmespath::Expression<'a>>;
pub struct Connection {
/**
* A reference to the global Settings object for all configuration information
@ -67,12 +74,20 @@ impl Connection {
let lines_count = self.metrics.counter("lines");
let mut hb = Handlebars::new();
let mut jmespaths = JmesPathExpressions::new();
if !precompile_templates(&mut hb, self.settings.clone()) {
error!("Failing to precompile templates is a fatal error, not going to parse logs since the configuration is broken");
// TODO fix the Err types
return Ok(());
}
if !precompile_jmespath(&mut jmespaths, self.settings.clone()) {
error!("Failing to precompile jmespaths is a fata error, not parsing this connection's logs because the configuration is broken");
// TODO fix the Err types
return Ok(());
}
while let Some(line) = lines.next().await {
let line = line?;
debug!("log: {}", line);
@ -111,26 +126,26 @@ impl Connection {
match rule.field {
Field::Msg => {
rule_matches = rules::apply_rule(&rule, &msg.msg, &mut hash);
rule_matches = rules::apply_rule(&rule, &msg.msg, &jmespaths, &mut hash);
},
Field::Appname => {
if let Some(appname) = &msg.appname {
rule_matches = rules::apply_rule(&rule, &appname, &mut hash);
rule_matches = rules::apply_rule(&rule, &appname, &jmespaths, &mut hash);
}
},
Field::Hostname => {
if let Some(hostname) = &msg.hostname {
rule_matches = rules::apply_rule(&rule, &hostname, &mut hash);
rule_matches = rules::apply_rule(&rule, &hostname, &jmespaths, &mut hash);
}
},
Field::Severity => {
if let Some(severity) = &msg.severity {
rule_matches = rules::apply_rule(&rule, &severity, &mut hash);
rule_matches = rules::apply_rule(&rule, &severity, &jmespaths, &mut hash);
}
},
Field::Facility => {
if let Some(facility) = &msg.facility {
rule_matches = rules::apply_rule(&rule, &facility, &mut hash);
rule_matches = rules::apply_rule(&rule, &facility, &jmespaths, &mut hash);
}
},
}
@ -280,6 +295,28 @@ fn precompile_templates(hb: &mut Handlebars, settings: Arc<Settings>) -> bool {
true
}
/**
* precompile_jmespath will pre-generate all the necessary JMESPath::Variable objects from the
* configuration file and shove thoe in the map given to it
*/
fn precompile_jmespath(map: &mut JmesPathExpressions, settings: Arc<Settings>) -> bool {
for rule in settings.rules.iter() {
if let Some(expression) = &rule.jmespath {
if ! map.contains_key(expression) {
if let Ok(compiled) = jmespath::compile(&expression) {
map.insert(expression.to_string(), compiled);
}
else {
error!("Failed to compile the JMESPath expression: {}", expression);
return false;
}
}
}
}
true
}
/**
* perform_merge will generate the buffer resulting of the JSON merge
*/
@ -447,4 +484,22 @@ mod tests {
assert!(result);
assert!(hb.has_template(&template_id));
}
#[test]
fn test_precompile_jmespath() {
let settings = Arc::new(load("test/configs/single-rule-with-merge.yml"));
let mut map = JmesPathExpressions::new();
let result = precompile_jmespath(&mut map, settings.clone());
assert!(result);
let expected = settings.rules[0].jmespath.as_ref().unwrap();
assert!(map.contains_key(expected));
}
#[test]
fn test_precompile_jmespath_baddata() {
let settings = Arc::new(load("test/configs/single-rule-with-invalid-jmespath.yml"));
let mut map = JmesPathExpressions::new();
let result = precompile_jmespath(&mut map, settings.clone());
assert!(!result);
}
}

View File

@ -25,11 +25,12 @@ pub async fn test_rules(
number += 1;
let mut matches: Vec<&Rule> = vec![];
let mut unused = HashMap::<String, String>::new();
let also_unused = HashMap::<String, jmespath::Expression>::new();
for rule in settings.rules.iter() {
match rule.field {
Field::Msg => {
if apply_rule(&rule, &line, &mut unused) {
if apply_rule(&rule, &line, &also_unused, &mut unused) {
matches.push(rule);
}
},
@ -55,13 +56,14 @@ pub async fn test_rules(
*
* If the rule matches, then this will return true
*/
pub fn apply_rule(rule: &Rule, value: &str, hash: &mut HashMap<String, String>) -> bool {
pub fn apply_rule(rule: &Rule, value: &str, jmespaths: &crate::connection::JmesPathExpressions, hash: &mut HashMap<String, String>) -> bool {
let mut rule_matches = false;
/*
* Check to see if we have a jmespath first
*/
if !rule.jmespath.is_empty() {
let expr = jmespath::compile(&rule.jmespath).unwrap();
* Check to see if we have a jmespath first
*
*/
if let Some(expression) = &rule.jmespath {
let expr = &jmespaths[expression];
if let Ok(data) = jmespath::Variable::from_json(value) {
// Search the data with the compiled expression
if let Ok(result) = expr.search(data) {

View File

@ -96,8 +96,9 @@ pub struct Rule {
pub actions: Vec<Action>,
#[serde(with = "serde_regex", default = "default_none")]
pub regex: Option<regex::Regex>,
#[serde(default = "empty_str")]
pub jmespath: String,
#[serde(default = "default_none")]
pub jmespath: Option<String>,
}
impl Rule {
@ -113,7 +114,7 @@ impl std::fmt::Display for Rule {
write!(f, "Regex: {}", regex)
}
else {
write!(f, "JMESPath: {}", self.jmespath)
write!(f, "JMESPath: {}", self.jmespath.as_ref().unwrap())
}
}
}
@ -190,13 +191,6 @@ impl Settings {
* Default functions
*/
/**
* Allocate an return an empty string
*/
fn empty_str() -> String {
String::new()
}
/**
* Return the default size used for the Kafka buffer
*/
@ -244,11 +238,6 @@ mod tests {
assert_eq!(TlsType::None, TlsType::default());
}
#[test]
fn test_empty_str() {
assert_eq!("".to_string(), empty_str());
}
#[test]
fn test_kafka_buffer_default() {
assert_eq!(1024, kafka_buffer_default());

View File

@ -0,0 +1,27 @@
# A simple test configuration for verifiying some Merge action behavior
---
global:
listen:
address: '127.0.0.1'
port: 514
tls:
kafka:
conf:
bootstrap.servers: '127.0.0.1:9092'
# Default topic to log messages to that are not otherwise mapped
topic: 'test'
metrics:
statsd: 'localhost:8125'
rules:
# Match JSON content which has a meta.topic value, e.g.
# {"meta":{"topic" : "foo"}}
- jmespath: '. 0 meta.topic'
field: msg
actions:
- type: merge
json:
meta:
hotdog:
version: '{{version}}'
timestamp: '{{iso8601}}'