Compare commits

...

7 Commits

Author SHA1 Message Date
R Tyler Croy becfefd1ab Ensure that the settings load properly, and default to a None regex field
This was broken in 0.1.7 because I didn't run a manual integration test, oops

Yay automation
2020-05-11 15:02:36 -07:00
R Tyler Croy 6dc8add976
Merge pull request #20 from reiseburo/rtyler-patch-1
Add clippy-check to the actions
2020-05-11 14:41:53 -07:00
R Tyler Croy c292ce1270
Add clippy-check to the actions 2020-05-11 14:34:11 -07:00
R Tyler Croy 37ff647213
Merge pull request #19 from reiseburo/clippy
Address all the concerns made by 📎
2020-05-11 14:31:32 -07:00
R Tyler Croy 4f89067ab0 Address all the concerns made by 📎
This also moves the regex rule setting into an Option which is a better ideal
anyways

Fixes #17
2020-05-11 14:27:15 -07:00
R Tyler Croy d6430ae82f
Merge pull request #18 from reiseburo/null-kafka-key-14
Send a null key rather than just making one up
2020-05-11 13:46:11 -07:00
R Tyler Croy b786826068 Send a null key rather than just making one up
Fixes #14
2020-05-11 13:41:08 -07:00
9 changed files with 98 additions and 123 deletions

View File

@ -15,6 +15,12 @@ jobs:
- uses: actions/checkout@v2
- name: Prepare
run: sudo apt-get install -qy libsasl2-dev
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
- name: Build
run: cargo build --verbose
- name: Run tests

2
Cargo.lock generated
View File

@ -517,7 +517,7 @@ dependencies = [
[[package]]
name = "hotdog"
version = "0.1.5"
version = "0.1.8"
dependencies = [
"async-std",
"async-tls",

View File

@ -1,6 +1,6 @@
[package]
name = "hotdog"
version = "0.1.6"
version = "0.1.8"
authors = ["R. Tyler Croy <rtyler+hotdog@brokenco.de>"]
edition = "2018"

View File

@ -113,7 +113,7 @@ impl Kafka {
warn!("Failed to connect to a Kafka broker");
return false;
false
}
/**
@ -141,9 +141,10 @@ impl Kafka {
// TODO: replace me with a select
loop {
if let Ok(kmsg) = self.rx.recv_timeout(timeout_ms) {
let record = FutureRecord::to(&kmsg.topic)
.payload(&kmsg.msg)
.key(&kmsg.msg);
/* Note, setting the `K` (key) type on FutureRecord to a string
* even though we're explicitly not sending a key
*/
let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
/*
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
@ -166,7 +167,7 @@ impl Kafka {
timer.stop(handle);
m.counter("kafka.submitted").count(1);
}
Err((err, msg)) => {
Err((err, _)) => {
match err {
/*
* err_type will be one of RdKafkaError types defined:
@ -218,7 +219,7 @@ fn metric_name_for(err: RDKafkaError) -> String {
if let Some(name) = err.to_string().to_lowercase().split(' ').next() {
return name.to_string();
}
return String::from("unknown");
String::from("unknown")
}
#[cfg(test)]

View File

@ -12,6 +12,7 @@ extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_regex;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
extern crate syslog_rfc5424;
@ -91,7 +92,7 @@ fn main() -> Result<()> {
let settings = Arc::new(settings::load(settings_file));
if let Some(test_file) = matches.value_of("test") {
return task::block_on(rules::test_rules(&test_file, settings.clone()));
return task::block_on(rules::test_rules(&test_file, settings));
}
let metrics = Arc::new(
@ -108,17 +109,21 @@ fn main() -> Result<()> {
info!("Listening on: {}", addr);
match &settings.global.listen.tls {
TlsType::CertAndKey { cert: _, key: _, ca: _ } => {
TlsType::CertAndKey {
cert: _,
key: _,
ca: _,
} => {
info!("Serving in TLS mode");
task::block_on(crate::serve_tls::accept_loop(
addr,
settings.clone(),
metrics.clone(),
metrics,
))
}
_ => {
info!("Serving in plaintext mode");
task::block_on(accept_loop(addr, settings.clone(), metrics.clone()))
task::block_on(accept_loop(addr, settings.clone(), metrics))
}
}
}
@ -163,7 +168,9 @@ async fn accept_loop(
};
task::spawn(async move {
read_logs(reader, state).await;
if let Err(e) = read_logs(reader, state).await {
error!("Failed to read logs: {:?}", e);
}
debug!("Connection dropped");
});
}
@ -190,11 +197,8 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
let parsed = parse_message(line);
match &parsed {
Err(e) => {
error!("failed to parse message: {}", e);
}
_ => {}
if let Err(e) = &parsed {
error!("failed to parse message: {}", e);
}
/*
@ -226,7 +230,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
/*
* Check to see if we have a jmespath first
*/
if rule.jmespath.len() > 0 {
if !rule.jmespath.is_empty() {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&msg.msg) {
// Search the data with the compiled expression
@ -242,20 +246,22 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
}
}
}
} else if let Some(captures) = rule.regex.captures(&msg.msg) {
rule_matches = true;
} else if let Some(regex) = &rule.regex {
if let Some(captures) = regex.captures(&msg.msg) {
rule_matches = true;
for name in rule.regex.capture_names() {
if let Some(name) = name {
if let Some(value) = captures.name(name) {
hash.insert(name.to_string(), String::from(value.as_str()));
for name in regex.capture_names() {
if let Some(name) = name {
if let Some(value) = captures.name(name) {
hash.insert(name.to_string(), String::from(value.as_str()));
}
}
}
}
}
}
_ => {
debug!("unhandled `field` for this rule: {}", rule.regex);
warn!("unhandled `field` for rule");
}
}
@ -291,7 +297,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
* If a custom output was never defined, just take the
* raw message and pass that along.
*/
if output.len() == 0 {
if output.is_empty() {
output = String::from(&msg.msg);
}
@ -363,7 +369,7 @@ fn perform_merge(
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
return Ok(rendered);
}
return Ok(output);
Ok(output)
} else {
Err("Failed to render".to_string())
}
@ -374,33 +380,6 @@ fn perform_merge(
}
}
/**
* merge_and_render will take care of merging the two values and manage the
* rendering of variable substitutions
*/
fn merge_and_render<'a>(
mut left: &mut serde_json::Value,
right: &serde_json::Value,
state: &RuleState<'a>,
) -> String {
merge::merge(&mut left, &right);
let output = serde_json::to_string(&left).unwrap();
/*
* This is a bit inefficient, but until I can figure out a better way
* to render the variables that are being substituted in a merged JSON
* object, hotdog will just render the JSON object and then render it
* as a template.
*
* what could possibly go wrong
*/
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
return rendered;
}
return output;
}
#[cfg(test)]
mod tests {
use super::*;
@ -490,22 +469,4 @@ mod tests {
let output = perform_merge("{}", &to_merge, &state);
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
}
#[test]
fn test_merge_and_render() {
let mut hash = HashMap::<String, String>::new();
hash.insert("value".to_string(), "hi".to_string());
let hb = Handlebars::new();
let state = RuleState {
hb: &hb,
variables: &hash,
};
let mut origin = json!({"rust" : true});
let config = json!({"test" : "{{value}}"});
let buf = merge_and_render(&mut origin, &config, &state);
assert_eq!(buf, r#"{"rust":true,"test":"hi"}"#);
}
}

View File

@ -1,11 +1,13 @@
/*
* Disabling this lint because I don't want to fix it in this imported code
*/
#![allow(clippy::clone_double_ref)]
/**
* This module is from the crate json_value_merge
* <https://github.com/jmfiaschi/json_value_merge/>
*
* It is licensed under the MIT license
*/
extern crate serde_json;
use serde_json::{Map, Value};
/// Trait used to merge Json Values
@ -85,8 +87,9 @@ pub fn merge(a: &mut Value, b: &Value) {
}
}
fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -> () {
let mut fields: Vec<&str> = json_pointer.split("/").skip(1).collect();
fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) {
let mut fields: Vec<&str> = json_pointer.split('/').skip(1).collect();
//let first_field = fields[0].clone();
let first_field = fields[0].clone();
fields.remove(0);
let next_fields = fields;
@ -100,7 +103,7 @@ fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -
match json_value.pointer_mut(format!("/{}", first_field).as_str()) {
// Find the field and the json_value_targeted.
Some(json_targeted) => {
if 0 < next_fields.len() {
if !next_fields.is_empty() {
merge_in(
json_targeted,
format!("/{}", next_fields.join("/")).as_ref(),

View File

@ -19,31 +19,30 @@ pub async fn test_rules(file_name: &str, settings: Arc<Settings>) -> Result<()>
while let Some(line) = lines.next().await {
let line = line?;
debug!("Testing the line: {}", line);
number = number + 1;
number += 1;
let mut matches: Vec<&str> = vec![];
for rule in settings.rules.iter() {
match rule.field {
Field::Msg => {
if rule.jmespath.len() > 0 {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&line) {
// Search the data with the compiled expression
if let Ok(result) = expr.search(data) {
if !result.is_null() {
matches.push(&rule.jmespath);
}
if let Field::Msg = rule.field {
if !rule.jmespath.is_empty() {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&line) {
// Search the data with the compiled expression
if let Ok(result) = expr.search(data) {
if !result.is_null() {
matches.push(&rule.jmespath);
}
}
} else if let Some(_captures) = rule.regex.captures(&line) {
matches.push(&rule.regex.as_str());
}
} else if let Some(regex) = &rule.regex {
if let Some(_captures) = regex.captures(&line) {
matches.push(&regex.as_str());
}
}
_ => {}
}
}
if matches.len() > 0 {
if !matches.is_empty() {
println!("Line {} matches on:", number);
for m in matches.iter() {
println!("\t - {}", m);

View File

@ -18,12 +18,8 @@ use dipstick::*;
use log::*;
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
use rustls::{
Certificate,
AllowAnyAnonymousOrAuthenticatedClient,
NoClientAuth,
PrivateKey,
RootCertStore,
ServerConfig
AllowAnyAnonymousOrAuthenticatedClient, Certificate, NoClientAuth, PrivateKey, RootCertStore,
ServerConfig,
};
use std::path::Path;
@ -44,14 +40,14 @@ fn load_keys(path: &Path) -> io::Result<Vec<PrivateKey>> {
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
if let Ok(keys) = result {
if keys.len() == 0 {
if keys.is_empty() {
debug!("Failed to load key as RSA, trying PKCS8");
return pkcs8_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
return Ok(keys);
}
return result;
result
}
/// Configure the server using rusttls
@ -64,18 +60,22 @@ fn load_tls_config(settings: &Settings) -> io::Result<ServerConfig> {
let certs = load_certs(cert.as_path())?;
let mut keys = load_keys(key.as_path())?;
if keys.len() <= 0 {
if keys.is_empty() {
panic!("TLS key could not be properly loaded! This is fatal!");
}
let mut verifier = NoClientAuth::new();
if ca.is_some() {
let verifier = if ca.is_some() {
let ca_path = ca.as_ref().unwrap();
let mut store = RootCertStore::empty();
store.add_pem_file(&mut std::io::BufReader::new(std::fs::File::open(ca_path.as_path())?));
verifier = AllowAnyAnonymousOrAuthenticatedClient::new(store);
}
if let Err(e) = store.add_pem_file(&mut std::io::BufReader::new(
std::fs::File::open(ca_path.as_path())?,
)) {
error!("Failed to add the CA properly, certificate verification may not work as expected: {:?}", e);
}
AllowAnyAnonymousOrAuthenticatedClient::new(store)
} else {
NoClientAuth::new()
};
// we don't use client authentication
let mut config = ServerConfig::new(verifier);
@ -138,7 +138,7 @@ pub async fn accept_loop(
loop {
if let Ok(count) = conn_rx.recv() {
connections = connections + count;
connections += count;
debug!("Connection count now {}", connections);
counter.value(connections);
}
@ -163,7 +163,9 @@ pub async fn accept_loop(
let ctx = conn_tx.clone();
task::spawn(async move {
handle_connection(&acceptor, &mut stream, state).await;
if let Err(e) = handle_connection(&acceptor, &mut stream, state).await {
error!("Failed to handle the connection properly: {:?}", e);
}
ctx.send(-1).unwrap();
});
}
@ -186,7 +188,9 @@ async fn handle_connection(
let tls_stream = handshake.await?;
let reader = BufReader::new(tls_stream);
read_logs(reader, state).await;
if let Err(e) = read_logs(reader, state).await {
error!("Failed to read logs properly: {:?}", e);
}
Ok(())
}

View File

@ -4,7 +4,6 @@
*/
use async_std::path::Path;
use log::*;
use regex;
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
@ -41,7 +40,7 @@ fn load_configuration(file: &str) -> config::Config {
let _port: u64 = conf
.get("global.listen.port")
.expect("Configuration had no `global.listen.port` setting");
return conf;
conf
}
#[derive(Debug, Deserialize)]
@ -68,8 +67,8 @@ pub enum Action {
pub struct Rule {
pub field: Field,
pub actions: Vec<Action>,
#[serde(with = "serde_regex", default = "empty_regex")]
pub regex: regex::Regex,
#[serde(with = "serde_regex", default = "default_none")]
pub regex: Option<regex::Regex>,
#[serde(default = "empty_str")]
pub jmespath: String,
}
@ -135,18 +134,11 @@ pub struct Settings {
* Default functions
*/
/**
* Return an empty regular expression
*/
fn empty_regex() -> regex::Regex {
return regex::Regex::new("").unwrap();
}
/**
* Allocate an return an empty string
*/
fn empty_str() -> String {
return String::new();
String::new()
}
/**
@ -160,10 +152,19 @@ fn kafka_timeout_default() -> Duration {
Duration::from_secs(30)
}
fn default_none<T>() -> Option<T> {
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_example_config() {
load("hotdog.yml");
}
#[test]
fn test_default_tls() {
assert_eq!(TlsType::None, TlsType::default());