diff --git a/README.adoc b/README.adoc index d111935..f816e97 100644 --- a/README.adoc +++ b/README.adoc @@ -102,6 +102,8 @@ overlap with the built-in variable names [[action-forward]] ===== Forward +The forward action will imply the <> when used. + [[action-merge]] ===== Merge diff --git a/example.log b/example.log index 30a80b2..00ec117 100644 --- a/example.log +++ b/example.log @@ -1,4 +1,4 @@ hello there This is a somewhat longer line of logs? This is a MUCH longer log line and it should not be truncated, hopefully -{"this": "is some JSON data", "which": "should also be transmitted properly", "meta" : {"topic" : "hotdog"}} +{"this": "is some JSON data", "which": "should also be transmitted properly", "meta" : {"topic" : "test"}} diff --git a/hotdog.yml b/hotdog.yml index 47c04f9..08bf50a 100644 --- a/hotdog.yml +++ b/hotdog.yml @@ -48,7 +48,7 @@ rules: timestamp: '{{iso8601}}' - type: forward - topic: test + topic: '{{value}}' - type: stop @@ -58,3 +58,5 @@ rules: actions: - type: forward topic: 'logs-unknown' + + - type: stop diff --git a/src/main.rs b/src/main.rs index d0f3297..e745bea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -206,7 +206,13 @@ async fn connection_loop(stream: TcpStream, settings: Arc, metrics: Ar if let Ok(result) = expr.search(data) { if ! result.is_null() { rule_matches = true; - /* TODO: need to find a way to extrac tmatches */ + debug!("jmespath rule matched, value: {}", result); + if let Some(value) = result.as_string() { + hash.insert("value".to_string(), value.to_string()); + } + else { + warn!("Unable to parse out the string value for {}, the `value` variable substitution will not be available,", result); + } } } } @@ -245,8 +251,14 @@ async fn connection_loop(stream: TcpStream, settings: Arc, metrics: Ar for action in rule.actions.iter() { match action { Action::Forward { topic } => { - debug!("Forwarding to the topic: `{}`", topic); - send_to_kafka(output, topic, &producer, &state).await; + if let Ok(actual) = hb.render_template(&topic, &hash) { + debug!("Forwarding to the topic: `{}`", actual); + send_to_kafka(output, &actual, &producer, &state).await; + continue_rules = false; + } + else { + error!("Failed to process the configured topic: `{}`", topic); + } break; }, Action::Merge { json } => {