Correctly route topics based on variable substitutions for jmespath

This commit also introduces the functionality where a Forward implies Stop, this
is because we need to `break` out of the action processing during the handling
of the Forward, since `output` is being consumed in the process.

See #4
This commit is contained in:
R Tyler Croy 2020-04-15 13:04:53 -07:00
parent 19e869c3b4
commit c7023dad03
4 changed files with 21 additions and 5 deletions

View File

@ -102,6 +102,8 @@ overlap with the built-in variable names
[[action-forward]]
===== Forward
The forward action will imply the <<action-stop, Stop action>> when used.
[[action-merge]]
===== Merge

View File

@ -1,4 +1,4 @@
hello there
This is a somewhat longer line of logs?
This is a MUCH longer log line and it should not be truncated, hopefully
{"this": "is some JSON data", "which": "should also be transmitted properly", "meta" : {"topic" : "hotdog"}}
{"this": "is some JSON data", "which": "should also be transmitted properly", "meta" : {"topic" : "test"}}

View File

@ -48,7 +48,7 @@ rules:
timestamp: '{{iso8601}}'
- type: forward
topic: test
topic: '{{value}}'
- type: stop
@ -58,3 +58,5 @@ rules:
actions:
- type: forward
topic: 'logs-unknown'
- type: stop

View File

@ -206,7 +206,13 @@ async fn connection_loop(stream: TcpStream, settings: Arc<Settings>, metrics: Ar
if let Ok(result) = expr.search(data) {
if ! result.is_null() {
rule_matches = true;
/* TODO: need to find a way to extrac tmatches */
debug!("jmespath rule matched, value: {}", result);
if let Some(value) = result.as_string() {
hash.insert("value".to_string(), value.to_string());
}
else {
warn!("Unable to parse out the string value for {}, the `value` variable substitution will not be available,", result);
}
}
}
}
@ -245,8 +251,14 @@ async fn connection_loop(stream: TcpStream, settings: Arc<Settings>, metrics: Ar
for action in rule.actions.iter() {
match action {
Action::Forward { topic } => {
debug!("Forwarding to the topic: `{}`", topic);
send_to_kafka(output, topic, &producer, &state).await;
if let Ok(actual) = hb.render_template(&topic, &hash) {
debug!("Forwarding to the topic: `{}`", actual);
send_to_kafka(output, &actual, &producer, &state).await;
continue_rules = false;
}
else {
error!("Failed to process the configured topic: `{}`", topic);
}
break;
},
Action::Merge { json } => {