dipstick/src/statsd.rs

168 lines
4.4 KiB
Rust
Raw Normal View History

2017-09-01 07:31:25 +00:00
//! Send metrics to a statsd server.
2017-09-28 22:17:16 +00:00
use core::*;
2018-04-06 21:11:50 +00:00
use output::*;
2017-09-28 22:17:16 +00:00
use error;
use self_metrics::*;
use std::net::UdpSocket;
2017-10-10 20:44:27 +00:00
use std::sync::{Arc, RwLock};
pub use std::net::ToSocketAddrs;
metrics! {
2018-05-29 02:58:58 +00:00
<Aggregate> DIPSTICK_METRICS.with_prefix("statsd") => {
2018-04-07 03:47:43 +00:00
Marker SEND_ERR: "send_failed";
Counter SENT_BYTES: "sent_bytes";
2018-03-13 13:54:12 +00:00
}
2018-03-08 22:22:35 +00:00
}
2018-01-15 17:20:11 +00:00
/// Send metrics to a statsd server at the address and port provided.
2018-04-06 21:11:50 +00:00
pub fn to_statsd<ADDR>(address: ADDR) -> error::Result<MetricOutput<Statsd>>
2017-10-10 20:44:27 +00:00
where
ADDR: ToSocketAddrs,
{
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?);
2017-09-22 21:39:57 +00:00
socket.set_nonblocking(true)?;
socket.connect(address)?;
2018-03-26 16:57:40 +00:00
// TODO buffering toggle
let buffered = false;
2018-04-06 21:11:50 +00:00
Ok(metric_output(
2018-05-20 10:43:55 +00:00
move |namespace, kind, rate| {
let mut prefix = namespace.join(".");
prefix.push(':');
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match kind {
Kind::Marker | Kind::Counter => "c",
Kind::Gauge => "g",
Kind::Timer => "ms",
});
if rate < FULL_SAMPLING_RATE {
suffix.push_str("|@");
suffix.push_str(&rate.to_string());
}
let scale = match kind {
// timers are in µs, statsd wants ms
Kind::Timer => 1000,
_ => 1,
};
Statsd {
prefix,
suffix,
scale,
}
},
2018-03-26 16:57:40 +00:00
move || {
let buf = RwLock::new(ScopeBuffer {
buffer: String::with_capacity(MAX_UDP_PAYLOAD),
socket: socket.clone(),
2018-01-08 05:32:32 +00:00
buffered,
});
2018-04-06 21:11:50 +00:00
command_fn(move |cmd| {
if let Ok(mut buf) = buf.write() {
match cmd {
2018-04-06 21:11:50 +00:00
Command::Write(metric, value) => buf.write(metric, value),
Command::Flush => buf.flush(),
}
}
})
},
))
}
/// Key of a statsd metric.
2017-10-05 14:52:59 +00:00
#[derive(Debug, Clone)]
pub struct Statsd {
prefix: String,
suffix: String,
scale: u64,
2017-06-27 10:04:02 +00:00
}
2017-07-19 20:46:04 +00:00
/// Use a safe maximum size for UDP to prevent fragmentation.
2017-12-06 22:05:47 +00:00
// TODO make configurable?
2017-07-19 20:46:04 +00:00
const MAX_UDP_PAYLOAD: usize = 576;
2017-09-28 22:17:16 +00:00
/// Wrapped string buffer & socket as one.
2017-10-04 15:58:08 +00:00
#[derive(Debug)]
2017-09-23 12:18:55 +00:00
struct ScopeBuffer {
buffer: String,
2017-09-23 12:18:55 +00:00
socket: Arc<UdpSocket>,
2018-01-08 05:32:32 +00:00
buffered: bool,
2017-07-19 20:46:04 +00:00
}
2017-09-28 22:17:16 +00:00
/// Any remaining buffered data is flushed on Drop.
2017-09-23 12:18:55 +00:00
impl Drop for ScopeBuffer {
2017-09-22 21:39:57 +00:00
fn drop(&mut self) {
self.flush()
2017-07-22 02:51:49 +00:00
}
}
2017-10-05 14:52:59 +00:00
impl ScopeBuffer {
2018-01-08 05:32:32 +00:00
fn write(&mut self, metric: &Statsd, value: Value) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
if entry_len > self.buffer.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
}
let remaining = self.buffer.capacity() - self.buffer.len();
if entry_len + 1 > remaining {
// buffer is full, flush before appending
self.flush();
} else {
if !self.buffer.is_empty() {
// separate from previous entry
self.buffer.push('\n')
}
self.buffer.push_str(&metric.prefix);
self.buffer.push_str(&value_str);
self.buffer.push_str(&metric.suffix);
}
2018-01-08 05:32:32 +00:00
if self.buffered {
2017-12-06 22:05:47 +00:00
self.flush();
}
}
2017-09-22 21:39:57 +00:00
fn flush(&mut self) {
if !self.buffer.is_empty() {
match self.socket.send(self.buffer.as_bytes()) {
Ok(size) => {
SENT_BYTES.count(size);
trace!("Sent {} bytes to statsd", self.buffer.len());
}
Err(e) => {
SEND_ERR.mark();
debug!("Failed to send packet to statsd: {}", e);
}
};
self.buffer.clear();
}
2017-07-22 02:51:49 +00:00
}
2017-06-27 10:04:02 +00:00
}
#[cfg(feature = "bench")]
mod bench {
use super::*;
use test;
#[bench]
pub fn timer_statsd(b: &mut test::Bencher) {
2018-03-26 16:57:40 +00:00
let sd = to_statsd("localhost:8125").unwrap().open_scope();
2018-05-20 10:43:55 +00:00
let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0);
2018-03-26 16:57:40 +00:00
b.iter(|| test::black_box(sd.write(&timer, 2000)));
}
}