cargo fmt
This commit is contained in:
parent
52d8bd57a7
commit
3bb74ecfd4
|
@ -9,7 +9,12 @@ use crate::status::{Statistic, Stats};
|
|||
* The connection module is responsible for handling everything pertaining to a single inbound TCP
|
||||
* connection.
|
||||
*/
|
||||
use async_std::{io::BufReader, prelude::*, sync::{Arc, Sender}, task};
|
||||
use async_std::{
|
||||
io::BufReader,
|
||||
prelude::*,
|
||||
sync::{Arc, Sender},
|
||||
task,
|
||||
};
|
||||
use chrono::prelude::*;
|
||||
use handlebars::Handlebars;
|
||||
use log::*;
|
||||
|
@ -88,8 +93,7 @@ impl Connection {
|
|||
let parsed = parse::parse_line(line);
|
||||
|
||||
if let Err(e) = &parsed {
|
||||
self.stats
|
||||
.send((Stats::LogParseError, 1)).await;
|
||||
self.stats.send((Stats::LogParseError, 1)).await;
|
||||
error!("failed to parse message: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
|
@ -97,8 +101,7 @@ impl Connection {
|
|||
* Now that we've logged the error, let's unpack and bubble the error anyways
|
||||
*/
|
||||
let msg = parsed.unwrap();
|
||||
self.stats
|
||||
.send((Stats::LineReceived, 1)).await;
|
||||
self.stats.send((Stats::LineReceived, 1)).await;
|
||||
let mut continue_rules = true;
|
||||
debug!("parsed as: {}", msg.msg);
|
||||
|
||||
|
@ -190,8 +193,7 @@ impl Connection {
|
|||
continue_rules = false;
|
||||
} else {
|
||||
error!("Failed to process the configured topic: `{}`", topic);
|
||||
self.stats
|
||||
.send((Stats::TopicParseFailed, 1)).await;
|
||||
self.stats.send((Stats::TopicParseFailed, 1)).await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -310,9 +312,7 @@ fn perform_merge(buffer: &str, template_id: &str, state: &RuleState) -> Result<S
|
|||
*/
|
||||
if !to_merge.is_object() {
|
||||
error!("Merge requested was not a JSON object: {}", to_merge);
|
||||
state
|
||||
.stats
|
||||
.send((Stats::MergeTargetNotJsonError, 1));
|
||||
state.stats.send((Stats::MergeTargetNotJsonError, 1));
|
||||
return Ok(buffer.to_string());
|
||||
}
|
||||
|
||||
|
@ -325,9 +325,7 @@ fn perform_merge(buffer: &str, template_id: &str, state: &RuleState) -> Result<S
|
|||
Err("Failed to merge and serialize".to_string())
|
||||
} else {
|
||||
error!("Failed to parse as JSON, stopping actions: {}", buffer);
|
||||
state
|
||||
.stats
|
||||
.send((Stats::MergeInvalidJsonError, 1));
|
||||
state.stats.send((Stats::MergeInvalidJsonError, 1));
|
||||
Err("Not JSON".to_string())
|
||||
}
|
||||
}
|
||||
|
|
53
src/kafka.rs
53
src/kafka.rs
|
@ -3,7 +3,7 @@ use crate::status::{Statistic, Stats};
|
|||
* The Kafka module contains all the tooling/code necessary for connecting hotdog to Kafka for
|
||||
* sending log lines along as Kafka messages
|
||||
*/
|
||||
use async_std::{sync::{channel, Receiver, Sender}};
|
||||
use async_std::sync::{channel, Receiver, Sender};
|
||||
use log::*;
|
||||
use rdkafka::client::DefaultClientContext;
|
||||
use rdkafka::config::ClientConfig;
|
||||
|
@ -140,41 +140,54 @@ impl Kafka {
|
|||
|
||||
let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
|
||||
/*
|
||||
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
|
||||
* outbound librdkafka queue is full. This will block up the crossbeam channel
|
||||
* properly and cause messages to begin to be dropped, rather than buffering
|
||||
* "forever" inside of hotdog
|
||||
*/
|
||||
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
|
||||
* outbound librdkafka queue is full. This will block up the crossbeam channel
|
||||
* properly and cause messages to begin to be dropped, rather than buffering
|
||||
* "forever" inside of hotdog
|
||||
*/
|
||||
if let Ok(delivery_result) = producer.send(record, -1 as i64).await {
|
||||
match delivery_result {
|
||||
Ok(_) => {
|
||||
stats.send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1)).await;
|
||||
stats
|
||||
.send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1))
|
||||
.await;
|
||||
/*
|
||||
* dipstick only supports u64 timers anyways, but as_micros() can
|
||||
* give a u128 (!).
|
||||
*/
|
||||
* dipstick only supports u64 timers anyways, but as_micros() can
|
||||
* give a u128 (!).
|
||||
*/
|
||||
if let Ok(elapsed) = start_time.elapsed().as_micros().try_into() {
|
||||
stats.send((Stats::KafkaMsgSent, elapsed)).await;
|
||||
} else {
|
||||
error!("Could not collect message time because the duration couldn't fit in an i64, yikes");
|
||||
}
|
||||
},
|
||||
}
|
||||
Err((err, _)) => {
|
||||
match err {
|
||||
/*
|
||||
* err_type will be one of RdKafkaError types defined:
|
||||
* https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html
|
||||
*/
|
||||
* err_type will be one of RdKafkaError types defined:
|
||||
* https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html
|
||||
*/
|
||||
KafkaError::MessageProduction(err_type) => {
|
||||
error!(
|
||||
"Failed to send message to Kafka due to: {}",
|
||||
err_type
|
||||
);
|
||||
stats.send((Stats::KafkaMsgErrored { errcode: metric_name_for(err_type) }, 1)).await;
|
||||
error!("Failed to send message to Kafka due to: {}", err_type);
|
||||
stats
|
||||
.send((
|
||||
Stats::KafkaMsgErrored {
|
||||
errcode: metric_name_for(err_type),
|
||||
},
|
||||
1,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
_ => {
|
||||
error!("Failed to send message to Kafka!");
|
||||
stats.send((Stats::KafkaMsgErrored { errcode: String::from("generic") }, 1)).await;
|
||||
stats
|
||||
.send((
|
||||
Stats::KafkaMsgErrored {
|
||||
errcode: String::from("generic"),
|
||||
},
|
||||
1,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
12
src/serve.rs
12
src/serve.rs
|
@ -6,7 +6,13 @@ use crate::status;
|
|||
/**
|
||||
* The serve module is responsible for general syslog over TCP serving functionality
|
||||
*/
|
||||
use async_std::{io::BufReader, net::*, prelude::*, sync::{Arc, Sender}, task};
|
||||
use async_std::{
|
||||
io::BufReader,
|
||||
net::*,
|
||||
prelude::*,
|
||||
sync::{Arc, Sender},
|
||||
task,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use log::*;
|
||||
|
||||
|
@ -107,9 +113,7 @@ pub trait Server {
|
|||
let stream = stream?;
|
||||
debug!("Accepting from: {}", stream.peer_addr()?);
|
||||
|
||||
state
|
||||
.stats
|
||||
.send((status::Stats::ConnectionCount, 1)).await;
|
||||
state.stats.send((status::Stats::ConnectionCount, 1)).await;
|
||||
|
||||
let connection =
|
||||
Connection::new(state.settings.clone(), sender.clone(), state.stats.clone());
|
||||
|
|
|
@ -6,7 +6,13 @@ use crate::status;
|
|||
/**
|
||||
* This module handles the necessary configuration to serve over TLS
|
||||
*/
|
||||
use async_std::{io, io::BufReader, net::TcpStream, sync::{Arc, Sender}, task};
|
||||
use async_std::{
|
||||
io,
|
||||
io::BufReader,
|
||||
net::TcpStream,
|
||||
sync::{Arc, Sender},
|
||||
task,
|
||||
};
|
||||
use async_tls::TlsAcceptor;
|
||||
use log::*;
|
||||
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
|
||||
|
|
Loading…
Reference in New Issue