2017-09-01 07:31:25 +00:00
|
|
|
//! Send metrics to a statsd server.
|
|
|
|
|
2018-06-19 19:27:26 +00:00
|
|
|
use core::{Input, Output, Value, Metric, Attributes, WithAttributes, Kind,
|
2018-06-22 19:32:07 +00:00
|
|
|
Name, WithSamplingRate, WithName, WithBuffering, Sampling, Cache, Async};
|
2018-06-14 16:37:21 +00:00
|
|
|
use pcg32;
|
2017-09-28 22:17:16 +00:00
|
|
|
use error;
|
2018-06-22 19:32:07 +00:00
|
|
|
use metrics;
|
2017-09-20 17:13:44 +00:00
|
|
|
|
|
|
|
use std::net::UdpSocket;
|
2017-10-10 20:44:27 +00:00
|
|
|
use std::sync::{Arc, RwLock};
|
2017-12-11 21:39:17 +00:00
|
|
|
|
2017-09-20 17:13:44 +00:00
|
|
|
pub use std::net::ToSocketAddrs;
|
|
|
|
|
|
|
|
/// Send metrics to a statsd server at the address and port provided.
|
2018-06-21 15:52:10 +00:00
|
|
|
pub fn output_statsd<ADDR: ToSocketAddrs>(address: ADDR) -> error::Result<StatsdOutput> {
|
2017-12-11 21:39:17 +00:00
|
|
|
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?);
|
2017-09-22 21:39:57 +00:00
|
|
|
socket.set_nonblocking(true)?;
|
|
|
|
socket.connect(address)?;
|
|
|
|
|
2018-06-14 16:37:21 +00:00
|
|
|
Ok(StatsdOutput {
|
|
|
|
attributes: Attributes::default(),
|
|
|
|
socket,
|
|
|
|
})
|
|
|
|
}
|
2017-12-11 21:39:17 +00:00
|
|
|
|
2018-06-20 19:04:04 +00:00
|
|
|
/// Statsd output holds a UDP client socket to a statsd host.
|
|
|
|
/// The output's connection is shared between all inputs originating from it.
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct StatsdOutput {
|
|
|
|
attributes: Attributes,
|
|
|
|
socket: Arc<UdpSocket>,
|
|
|
|
}
|
|
|
|
|
2018-06-15 21:33:28 +00:00
|
|
|
impl Output for StatsdOutput {
|
2018-06-22 19:32:07 +00:00
|
|
|
type INPUT = Statsd;
|
2018-06-16 23:11:29 +00:00
|
|
|
fn new_input(&self) -> Self::INPUT {
|
2018-06-22 19:32:07 +00:00
|
|
|
Statsd {
|
2018-06-14 16:37:21 +00:00
|
|
|
attributes: self.attributes.clone(),
|
|
|
|
buffer: Arc::new(RwLock::new(InputBuffer {
|
2017-12-11 21:39:17 +00:00
|
|
|
buffer: String::with_capacity(MAX_UDP_PAYLOAD),
|
2018-06-14 16:37:21 +00:00
|
|
|
socket: self.socket.clone(),
|
|
|
|
buffering: self.is_buffering(),
|
|
|
|
})),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl WithAttributes for StatsdOutput {
|
|
|
|
fn get_attributes(&self) -> &Attributes {
|
|
|
|
&self.attributes
|
|
|
|
}
|
|
|
|
fn mut_attributes(&mut self) -> &mut Attributes {
|
|
|
|
&mut self.attributes
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-20 14:04:14 +00:00
|
|
|
impl WithBuffering for StatsdOutput {}
|
|
|
|
impl WithSamplingRate for StatsdOutput {}
|
|
|
|
|
|
|
|
impl Cache for StatsdOutput {}
|
|
|
|
impl Async for StatsdOutput {}
|
|
|
|
|
2018-06-14 20:08:54 +00:00
|
|
|
/// Metrics input for statsd.
|
2018-06-14 16:37:21 +00:00
|
|
|
#[derive(Clone)]
|
2018-06-22 19:32:07 +00:00
|
|
|
pub struct Statsd {
|
2018-06-14 16:37:21 +00:00
|
|
|
attributes: Attributes,
|
|
|
|
buffer: Arc<RwLock<InputBuffer>>,
|
|
|
|
}
|
|
|
|
|
2018-06-22 19:32:07 +00:00
|
|
|
impl Input for Statsd {
|
2018-06-19 19:27:26 +00:00
|
|
|
fn new_metric(&self, name: Name, kind: Kind) -> Metric {
|
2018-06-14 16:37:21 +00:00
|
|
|
let mut prefix = self.qualified_name(name).join(".");
|
|
|
|
prefix.push(':');
|
|
|
|
|
|
|
|
let mut suffix = String::with_capacity(16);
|
|
|
|
suffix.push('|');
|
|
|
|
suffix.push_str(match kind {
|
|
|
|
Kind::Marker | Kind::Counter => "c",
|
|
|
|
Kind::Gauge => "g",
|
|
|
|
Kind::Timer => "ms",
|
|
|
|
});
|
|
|
|
|
|
|
|
let buffer = self.buffer.clone();
|
|
|
|
let scale = match kind {
|
|
|
|
// timers are in µs, statsd wants ms
|
|
|
|
Kind::Timer => 1000,
|
|
|
|
_ => 1,
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Sampling::SampleRate(float_rate) = self.get_sampling() {
|
|
|
|
suffix.push_str(&format!{"|@{}", float_rate});
|
|
|
|
let int_sampling_rate = pcg32::to_int_rate(float_rate);
|
|
|
|
|
2018-06-19 19:27:26 +00:00
|
|
|
Metric::new(move |value| {
|
2018-06-14 16:37:21 +00:00
|
|
|
if pcg32::accept_sample(int_sampling_rate) {
|
|
|
|
let mut buffer = buffer.write().expect("InputBuffer");
|
|
|
|
buffer.write(&prefix, &suffix, scale, value)
|
2017-12-11 21:39:17 +00:00
|
|
|
}
|
|
|
|
})
|
2018-06-14 16:37:21 +00:00
|
|
|
} else {
|
2018-06-19 19:27:26 +00:00
|
|
|
Metric::new(move |value| {
|
2018-06-14 16:37:21 +00:00
|
|
|
let mut buffer = buffer.write().expect("InputBuffer");
|
|
|
|
buffer.write(&prefix, &suffix, scale, value)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
2017-09-20 17:13:44 +00:00
|
|
|
|
2018-06-14 16:37:21 +00:00
|
|
|
fn flush(&self) -> error::Result<()> {
|
|
|
|
let mut buffer = self.buffer.write().expect("InputBuffer");
|
2018-06-14 20:08:54 +00:00
|
|
|
Ok(buffer.flush()?)
|
2018-06-14 16:37:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-22 19:32:07 +00:00
|
|
|
impl WithSamplingRate for Statsd {}
|
2018-06-14 16:37:21 +00:00
|
|
|
|
2018-06-22 19:32:07 +00:00
|
|
|
impl WithAttributes for Statsd {
|
2018-06-14 16:37:21 +00:00
|
|
|
fn get_attributes(&self) -> &Attributes { &self.attributes }
|
|
|
|
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
|
2017-06-27 10:04:02 +00:00
|
|
|
}
|
|
|
|
|
2017-07-19 20:46:04 +00:00
|
|
|
/// Use a safe maximum size for UDP to prevent fragmentation.
|
2017-12-06 22:05:47 +00:00
|
|
|
// TODO make configurable?
|
2017-07-19 20:46:04 +00:00
|
|
|
const MAX_UDP_PAYLOAD: usize = 576;
|
|
|
|
|
2017-09-28 22:17:16 +00:00
|
|
|
/// Wrapped string buffer & socket as one.
|
2017-10-04 15:58:08 +00:00
|
|
|
#[derive(Debug)]
|
2018-06-14 16:37:21 +00:00
|
|
|
struct InputBuffer {
|
2017-11-29 13:42:01 +00:00
|
|
|
buffer: String,
|
2017-09-23 12:18:55 +00:00
|
|
|
socket: Arc<UdpSocket>,
|
2018-06-14 16:37:21 +00:00
|
|
|
buffering: bool,
|
2017-07-19 20:46:04 +00:00
|
|
|
}
|
|
|
|
|
2017-09-28 22:17:16 +00:00
|
|
|
/// Any remaining buffered data is flushed on Drop.
|
2018-06-14 16:37:21 +00:00
|
|
|
impl Drop for InputBuffer {
|
2017-09-22 21:39:57 +00:00
|
|
|
fn drop(&mut self) {
|
2018-06-14 16:37:21 +00:00
|
|
|
if let Err(err) = self.flush() {
|
|
|
|
warn!("Couldn't flush statsd buffer on Drop: {}", err)
|
|
|
|
}
|
2017-07-22 02:51:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-14 16:37:21 +00:00
|
|
|
impl InputBuffer {
|
|
|
|
fn write(&mut self, prefix: &str, suffix: &str, scale: u64, value: Value) {
|
|
|
|
let scaled_value = value / scale;
|
2017-12-06 15:56:20 +00:00
|
|
|
let value_str = scaled_value.to_string();
|
2018-06-14 16:37:21 +00:00
|
|
|
let entry_len = prefix.len() + value_str.len() + suffix.len();
|
2017-12-06 15:56:20 +00:00
|
|
|
|
|
|
|
if entry_len > self.buffer.capacity() {
|
|
|
|
// TODO report entry too big to fit in buffer (!?)
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let remaining = self.buffer.capacity() - self.buffer.len();
|
|
|
|
if entry_len + 1 > remaining {
|
|
|
|
// buffer is full, flush before appending
|
2018-06-14 20:08:54 +00:00
|
|
|
let _ = self.flush();
|
2017-12-06 15:56:20 +00:00
|
|
|
} else {
|
|
|
|
if !self.buffer.is_empty() {
|
|
|
|
// separate from previous entry
|
|
|
|
self.buffer.push('\n')
|
|
|
|
}
|
2018-06-14 16:37:21 +00:00
|
|
|
self.buffer.push_str(prefix);
|
2017-12-06 15:56:20 +00:00
|
|
|
self.buffer.push_str(&value_str);
|
2018-06-14 16:37:21 +00:00
|
|
|
self.buffer.push_str(suffix);
|
2017-12-06 15:56:20 +00:00
|
|
|
}
|
2018-06-14 16:37:21 +00:00
|
|
|
if self.buffering {
|
2018-06-14 20:08:54 +00:00
|
|
|
let _ = self.flush();
|
2017-12-06 22:05:47 +00:00
|
|
|
}
|
2017-12-06 15:56:20 +00:00
|
|
|
}
|
|
|
|
|
2018-06-14 16:37:21 +00:00
|
|
|
fn flush(&mut self) -> error::Result<()> {
|
2017-11-29 13:42:01 +00:00
|
|
|
if !self.buffer.is_empty() {
|
|
|
|
match self.socket.send(self.buffer.as_bytes()) {
|
|
|
|
Ok(size) => {
|
2018-06-22 19:32:07 +00:00
|
|
|
metrics::STATSD_SENT_BYTES.count(size);
|
2017-11-29 13:42:01 +00:00
|
|
|
trace!("Sent {} bytes to statsd", self.buffer.len());
|
|
|
|
}
|
|
|
|
Err(e) => {
|
2018-06-22 19:32:07 +00:00
|
|
|
metrics::STATSD_SEND_ERR.mark();
|
2018-06-14 16:37:21 +00:00
|
|
|
return Err(e.into())
|
2017-11-29 13:42:01 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
self.buffer.clear();
|
|
|
|
}
|
2018-06-14 16:37:21 +00:00
|
|
|
Ok(())
|
2017-07-22 02:51:49 +00:00
|
|
|
}
|
2017-06-27 10:04:02 +00:00
|
|
|
}
|
|
|
|
|
2017-12-06 15:56:20 +00:00
|
|
|
#[cfg(feature = "bench")]
|
|
|
|
mod bench {
|
|
|
|
|
2018-06-14 20:08:54 +00:00
|
|
|
use core::*;
|
2017-12-06 15:56:20 +00:00
|
|
|
use super::*;
|
|
|
|
use test;
|
|
|
|
|
|
|
|
#[bench]
|
|
|
|
pub fn timer_statsd(b: &mut test::Bencher) {
|
2018-06-21 15:52:10 +00:00
|
|
|
let sd = output_statsd("localhost:8125").unwrap().new_input_dyn();
|
2018-06-15 21:33:28 +00:00
|
|
|
let timer = sd.new_metric("timer".into(), Kind::Timer);
|
2017-12-06 15:56:20 +00:00
|
|
|
|
2018-06-14 20:08:54 +00:00
|
|
|
b.iter(|| test::black_box(timer.write(2000)));
|
2017-12-06 15:56:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|