Compare commits
7 Commits
25754775fa
...
becfefd1ab
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | becfefd1ab | |
R Tyler Croy | 6dc8add976 | |
R Tyler Croy | c292ce1270 | |
R Tyler Croy | 37ff647213 | |
R Tyler Croy | 4f89067ab0 | |
R Tyler Croy | d6430ae82f | |
R Tyler Croy | b786826068 |
|
@ -15,6 +15,12 @@ jobs:
|
|||
- uses: actions/checkout@v2
|
||||
- name: Prepare
|
||||
run: sudo apt-get install -qy libsasl2-dev
|
||||
|
||||
- uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
args: --all-features
|
||||
|
||||
- name: Build
|
||||
run: cargo build --verbose
|
||||
- name: Run tests
|
||||
|
|
|
@ -517,7 +517,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "hotdog"
|
||||
version = "0.1.5"
|
||||
version = "0.1.8"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"async-tls",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "hotdog"
|
||||
version = "0.1.6"
|
||||
version = "0.1.8"
|
||||
authors = ["R. Tyler Croy <rtyler+hotdog@brokenco.de>"]
|
||||
edition = "2018"
|
||||
|
||||
|
|
13
src/kafka.rs
13
src/kafka.rs
|
@ -113,7 +113,7 @@ impl Kafka {
|
|||
|
||||
warn!("Failed to connect to a Kafka broker");
|
||||
|
||||
return false;
|
||||
false
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,9 +141,10 @@ impl Kafka {
|
|||
// TODO: replace me with a select
|
||||
loop {
|
||||
if let Ok(kmsg) = self.rx.recv_timeout(timeout_ms) {
|
||||
let record = FutureRecord::to(&kmsg.topic)
|
||||
.payload(&kmsg.msg)
|
||||
.key(&kmsg.msg);
|
||||
/* Note, setting the `K` (key) type on FutureRecord to a string
|
||||
* even though we're explicitly not sending a key
|
||||
*/
|
||||
let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
|
||||
|
||||
/*
|
||||
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
|
||||
|
@ -166,7 +167,7 @@ impl Kafka {
|
|||
timer.stop(handle);
|
||||
m.counter("kafka.submitted").count(1);
|
||||
}
|
||||
Err((err, msg)) => {
|
||||
Err((err, _)) => {
|
||||
match err {
|
||||
/*
|
||||
* err_type will be one of RdKafkaError types defined:
|
||||
|
@ -218,7 +219,7 @@ fn metric_name_for(err: RDKafkaError) -> String {
|
|||
if let Some(name) = err.to_string().to_lowercase().split(' ').next() {
|
||||
return name.to_string();
|
||||
}
|
||||
return String::from("unknown");
|
||||
String::from("unknown")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
91
src/main.rs
91
src/main.rs
|
@ -12,6 +12,7 @@ extern crate serde;
|
|||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde_regex;
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate serde_json;
|
||||
extern crate syslog_rfc5424;
|
||||
|
@ -91,7 +92,7 @@ fn main() -> Result<()> {
|
|||
let settings = Arc::new(settings::load(settings_file));
|
||||
|
||||
if let Some(test_file) = matches.value_of("test") {
|
||||
return task::block_on(rules::test_rules(&test_file, settings.clone()));
|
||||
return task::block_on(rules::test_rules(&test_file, settings));
|
||||
}
|
||||
|
||||
let metrics = Arc::new(
|
||||
|
@ -108,17 +109,21 @@ fn main() -> Result<()> {
|
|||
info!("Listening on: {}", addr);
|
||||
|
||||
match &settings.global.listen.tls {
|
||||
TlsType::CertAndKey { cert: _, key: _, ca: _ } => {
|
||||
TlsType::CertAndKey {
|
||||
cert: _,
|
||||
key: _,
|
||||
ca: _,
|
||||
} => {
|
||||
info!("Serving in TLS mode");
|
||||
task::block_on(crate::serve_tls::accept_loop(
|
||||
addr,
|
||||
settings.clone(),
|
||||
metrics.clone(),
|
||||
metrics,
|
||||
))
|
||||
}
|
||||
_ => {
|
||||
info!("Serving in plaintext mode");
|
||||
task::block_on(accept_loop(addr, settings.clone(), metrics.clone()))
|
||||
task::block_on(accept_loop(addr, settings.clone(), metrics))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -163,7 +168,9 @@ async fn accept_loop(
|
|||
};
|
||||
|
||||
task::spawn(async move {
|
||||
read_logs(reader, state).await;
|
||||
if let Err(e) = read_logs(reader, state).await {
|
||||
error!("Failed to read logs: {:?}", e);
|
||||
}
|
||||
debug!("Connection dropped");
|
||||
});
|
||||
}
|
||||
|
@ -190,11 +197,8 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
|
||||
let parsed = parse_message(line);
|
||||
|
||||
match &parsed {
|
||||
Err(e) => {
|
||||
error!("failed to parse message: {}", e);
|
||||
}
|
||||
_ => {}
|
||||
if let Err(e) = &parsed {
|
||||
error!("failed to parse message: {}", e);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -226,7 +230,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
/*
|
||||
* Check to see if we have a jmespath first
|
||||
*/
|
||||
if rule.jmespath.len() > 0 {
|
||||
if !rule.jmespath.is_empty() {
|
||||
let expr = jmespath::compile(&rule.jmespath).unwrap();
|
||||
if let Ok(data) = jmespath::Variable::from_json(&msg.msg) {
|
||||
// Search the data with the compiled expression
|
||||
|
@ -242,20 +246,22 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(captures) = rule.regex.captures(&msg.msg) {
|
||||
rule_matches = true;
|
||||
} else if let Some(regex) = &rule.regex {
|
||||
if let Some(captures) = regex.captures(&msg.msg) {
|
||||
rule_matches = true;
|
||||
|
||||
for name in rule.regex.capture_names() {
|
||||
if let Some(name) = name {
|
||||
if let Some(value) = captures.name(name) {
|
||||
hash.insert(name.to_string(), String::from(value.as_str()));
|
||||
for name in regex.capture_names() {
|
||||
if let Some(name) = name {
|
||||
if let Some(value) = captures.name(name) {
|
||||
hash.insert(name.to_string(), String::from(value.as_str()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
debug!("unhandled `field` for this rule: {}", rule.regex);
|
||||
warn!("unhandled `field` for rule");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,7 +297,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
|
|||
* If a custom output was never defined, just take the
|
||||
* raw message and pass that along.
|
||||
*/
|
||||
if output.len() == 0 {
|
||||
if output.is_empty() {
|
||||
output = String::from(&msg.msg);
|
||||
}
|
||||
|
||||
|
@ -363,7 +369,7 @@ fn perform_merge(
|
|||
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
|
||||
return Ok(rendered);
|
||||
}
|
||||
return Ok(output);
|
||||
Ok(output)
|
||||
} else {
|
||||
Err("Failed to render".to_string())
|
||||
}
|
||||
|
@ -374,33 +380,6 @@ fn perform_merge(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* merge_and_render will take care of merging the two values and manage the
|
||||
* rendering of variable substitutions
|
||||
*/
|
||||
fn merge_and_render<'a>(
|
||||
mut left: &mut serde_json::Value,
|
||||
right: &serde_json::Value,
|
||||
state: &RuleState<'a>,
|
||||
) -> String {
|
||||
merge::merge(&mut left, &right);
|
||||
|
||||
let output = serde_json::to_string(&left).unwrap();
|
||||
|
||||
/*
|
||||
* This is a bit inefficient, but until I can figure out a better way
|
||||
* to render the variables that are being substituted in a merged JSON
|
||||
* object, hotdog will just render the JSON object and then render it
|
||||
* as a template.
|
||||
*
|
||||
* what could possibly go wrong
|
||||
*/
|
||||
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
|
||||
return rendered;
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -490,22 +469,4 @@ mod tests {
|
|||
let output = perform_merge("{}", &to_merge, &state);
|
||||
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_and_render() {
|
||||
let mut hash = HashMap::<String, String>::new();
|
||||
hash.insert("value".to_string(), "hi".to_string());
|
||||
|
||||
let hb = Handlebars::new();
|
||||
let state = RuleState {
|
||||
hb: &hb,
|
||||
variables: &hash,
|
||||
};
|
||||
|
||||
let mut origin = json!({"rust" : true});
|
||||
let config = json!({"test" : "{{value}}"});
|
||||
|
||||
let buf = merge_and_render(&mut origin, &config, &state);
|
||||
assert_eq!(buf, r#"{"rust":true,"test":"hi"}"#);
|
||||
}
|
||||
}
|
||||
|
|
13
src/merge.rs
13
src/merge.rs
|
@ -1,11 +1,13 @@
|
|||
/*
|
||||
* Disabling this lint because I don't want to fix it in this imported code
|
||||
*/
|
||||
#![allow(clippy::clone_double_ref)]
|
||||
/**
|
||||
* This module is from the crate json_value_merge
|
||||
* <https://github.com/jmfiaschi/json_value_merge/>
|
||||
*
|
||||
* It is licensed under the MIT license
|
||||
*/
|
||||
extern crate serde_json;
|
||||
|
||||
use serde_json::{Map, Value};
|
||||
|
||||
/// Trait used to merge Json Values
|
||||
|
@ -85,8 +87,9 @@ pub fn merge(a: &mut Value, b: &Value) {
|
|||
}
|
||||
}
|
||||
|
||||
fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -> () {
|
||||
let mut fields: Vec<&str> = json_pointer.split("/").skip(1).collect();
|
||||
fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) {
|
||||
let mut fields: Vec<&str> = json_pointer.split('/').skip(1).collect();
|
||||
//let first_field = fields[0].clone();
|
||||
let first_field = fields[0].clone();
|
||||
fields.remove(0);
|
||||
let next_fields = fields;
|
||||
|
@ -100,7 +103,7 @@ fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -
|
|||
match json_value.pointer_mut(format!("/{}", first_field).as_str()) {
|
||||
// Find the field and the json_value_targeted.
|
||||
Some(json_targeted) => {
|
||||
if 0 < next_fields.len() {
|
||||
if !next_fields.is_empty() {
|
||||
merge_in(
|
||||
json_targeted,
|
||||
format!("/{}", next_fields.join("/")).as_ref(),
|
||||
|
|
29
src/rules.rs
29
src/rules.rs
|
@ -19,31 +19,30 @@ pub async fn test_rules(file_name: &str, settings: Arc<Settings>) -> Result<()>
|
|||
while let Some(line) = lines.next().await {
|
||||
let line = line?;
|
||||
debug!("Testing the line: {}", line);
|
||||
number = number + 1;
|
||||
number += 1;
|
||||
let mut matches: Vec<&str> = vec![];
|
||||
|
||||
for rule in settings.rules.iter() {
|
||||
match rule.field {
|
||||
Field::Msg => {
|
||||
if rule.jmespath.len() > 0 {
|
||||
let expr = jmespath::compile(&rule.jmespath).unwrap();
|
||||
if let Ok(data) = jmespath::Variable::from_json(&line) {
|
||||
// Search the data with the compiled expression
|
||||
if let Ok(result) = expr.search(data) {
|
||||
if !result.is_null() {
|
||||
matches.push(&rule.jmespath);
|
||||
}
|
||||
if let Field::Msg = rule.field {
|
||||
if !rule.jmespath.is_empty() {
|
||||
let expr = jmespath::compile(&rule.jmespath).unwrap();
|
||||
if let Ok(data) = jmespath::Variable::from_json(&line) {
|
||||
// Search the data with the compiled expression
|
||||
if let Ok(result) = expr.search(data) {
|
||||
if !result.is_null() {
|
||||
matches.push(&rule.jmespath);
|
||||
}
|
||||
}
|
||||
} else if let Some(_captures) = rule.regex.captures(&line) {
|
||||
matches.push(&rule.regex.as_str());
|
||||
}
|
||||
} else if let Some(regex) = &rule.regex {
|
||||
if let Some(_captures) = regex.captures(&line) {
|
||||
matches.push(®ex.as_str());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if matches.len() > 0 {
|
||||
if !matches.is_empty() {
|
||||
println!("Line {} matches on:", number);
|
||||
for m in matches.iter() {
|
||||
println!("\t - {}", m);
|
||||
|
|
|
@ -18,12 +18,8 @@ use dipstick::*;
|
|||
use log::*;
|
||||
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
|
||||
use rustls::{
|
||||
Certificate,
|
||||
AllowAnyAnonymousOrAuthenticatedClient,
|
||||
NoClientAuth,
|
||||
PrivateKey,
|
||||
RootCertStore,
|
||||
ServerConfig
|
||||
AllowAnyAnonymousOrAuthenticatedClient, Certificate, NoClientAuth, PrivateKey, RootCertStore,
|
||||
ServerConfig,
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
|
@ -44,14 +40,14 @@ fn load_keys(path: &Path) -> io::Result<Vec<PrivateKey>> {
|
|||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
|
||||
|
||||
if let Ok(keys) = result {
|
||||
if keys.len() == 0 {
|
||||
if keys.is_empty() {
|
||||
debug!("Failed to load key as RSA, trying PKCS8");
|
||||
return pkcs8_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?))
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
|
||||
}
|
||||
return Ok(keys);
|
||||
}
|
||||
return result;
|
||||
result
|
||||
}
|
||||
|
||||
/// Configure the server using rusttls
|
||||
|
@ -64,18 +60,22 @@ fn load_tls_config(settings: &Settings) -> io::Result<ServerConfig> {
|
|||
let certs = load_certs(cert.as_path())?;
|
||||
let mut keys = load_keys(key.as_path())?;
|
||||
|
||||
if keys.len() <= 0 {
|
||||
if keys.is_empty() {
|
||||
panic!("TLS key could not be properly loaded! This is fatal!");
|
||||
}
|
||||
|
||||
let mut verifier = NoClientAuth::new();
|
||||
|
||||
if ca.is_some() {
|
||||
let verifier = if ca.is_some() {
|
||||
let ca_path = ca.as_ref().unwrap();
|
||||
let mut store = RootCertStore::empty();
|
||||
store.add_pem_file(&mut std::io::BufReader::new(std::fs::File::open(ca_path.as_path())?));
|
||||
verifier = AllowAnyAnonymousOrAuthenticatedClient::new(store);
|
||||
}
|
||||
if let Err(e) = store.add_pem_file(&mut std::io::BufReader::new(
|
||||
std::fs::File::open(ca_path.as_path())?,
|
||||
)) {
|
||||
error!("Failed to add the CA properly, certificate verification may not work as expected: {:?}", e);
|
||||
}
|
||||
AllowAnyAnonymousOrAuthenticatedClient::new(store)
|
||||
} else {
|
||||
NoClientAuth::new()
|
||||
};
|
||||
|
||||
// we don't use client authentication
|
||||
let mut config = ServerConfig::new(verifier);
|
||||
|
@ -138,7 +138,7 @@ pub async fn accept_loop(
|
|||
|
||||
loop {
|
||||
if let Ok(count) = conn_rx.recv() {
|
||||
connections = connections + count;
|
||||
connections += count;
|
||||
debug!("Connection count now {}", connections);
|
||||
counter.value(connections);
|
||||
}
|
||||
|
@ -163,7 +163,9 @@ pub async fn accept_loop(
|
|||
let ctx = conn_tx.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
handle_connection(&acceptor, &mut stream, state).await;
|
||||
if let Err(e) = handle_connection(&acceptor, &mut stream, state).await {
|
||||
error!("Failed to handle the connection properly: {:?}", e);
|
||||
}
|
||||
ctx.send(-1).unwrap();
|
||||
});
|
||||
}
|
||||
|
@ -186,7 +188,9 @@ async fn handle_connection(
|
|||
let tls_stream = handshake.await?;
|
||||
let reader = BufReader::new(tls_stream);
|
||||
|
||||
read_logs(reader, state).await;
|
||||
if let Err(e) = read_logs(reader, state).await {
|
||||
error!("Failed to read logs properly: {:?}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
*/
|
||||
use async_std::path::Path;
|
||||
use log::*;
|
||||
use regex;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
@ -41,7 +40,7 @@ fn load_configuration(file: &str) -> config::Config {
|
|||
let _port: u64 = conf
|
||||
.get("global.listen.port")
|
||||
.expect("Configuration had no `global.listen.port` setting");
|
||||
return conf;
|
||||
conf
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
@ -68,8 +67,8 @@ pub enum Action {
|
|||
pub struct Rule {
|
||||
pub field: Field,
|
||||
pub actions: Vec<Action>,
|
||||
#[serde(with = "serde_regex", default = "empty_regex")]
|
||||
pub regex: regex::Regex,
|
||||
#[serde(with = "serde_regex", default = "default_none")]
|
||||
pub regex: Option<regex::Regex>,
|
||||
#[serde(default = "empty_str")]
|
||||
pub jmespath: String,
|
||||
}
|
||||
|
@ -135,18 +134,11 @@ pub struct Settings {
|
|||
* Default functions
|
||||
*/
|
||||
|
||||
/**
|
||||
* Return an empty regular expression
|
||||
*/
|
||||
fn empty_regex() -> regex::Regex {
|
||||
return regex::Regex::new("").unwrap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate an return an empty string
|
||||
*/
|
||||
fn empty_str() -> String {
|
||||
return String::new();
|
||||
String::new()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,10 +152,19 @@ fn kafka_timeout_default() -> Duration {
|
|||
Duration::from_secs(30)
|
||||
}
|
||||
|
||||
fn default_none<T>() -> Option<T> {
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_load_example_config() {
|
||||
load("hotdog.yml");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_tls() {
|
||||
assert_eq!(TlsType::None, TlsType::default());
|
||||
|
|
Loading…
Reference in New Issue