dipstick/src/output/statsd.rs

276 lines
7.9 KiB
Rust
Raw Normal View History

2017-09-01 07:31:25 +00:00
//! Send metrics to a statsd server.
2018-10-26 01:20:47 +00:00
use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Prefixed};
use core::name::MetricName;
2018-09-14 10:47:31 +00:00
use core::pcg32;
2018-10-26 01:20:47 +00:00
use core::{Flush, MetricValue};
use core::input::InputKind;
2018-09-14 10:47:31 +00:00
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
use cache::cache_out;
use queue::queue_out;
2018-06-26 16:28:38 +00:00
use std::net::ToSocketAddrs;
2018-06-26 15:46:55 +00:00
use std::sync::Arc;
2018-06-26 16:28:38 +00:00
use std::net::UdpSocket;
2018-06-26 15:46:55 +00:00
use std::rc::Rc;
2018-06-26 16:28:38 +00:00
use std::cell::{RefCell, RefMut};
2018-06-26 16:28:38 +00:00
/// Use a safe maximum size for UDP to prevent fragmentation.
// TODO make configurable?
const MAX_UDP_PAYLOAD: usize = 576;
2018-06-26 16:28:38 +00:00
/// Statsd output holds a datagram (UDP) socket to a statsd server.
/// The socket is shared between scopes opened from the output.
#[derive(Clone, Debug)]
2018-07-03 15:44:29 +00:00
pub struct Statsd {
2018-06-20 19:04:04 +00:00
attributes: Attributes,
socket: Arc<UdpSocket>,
}
2018-07-03 15:44:29 +00:00
impl Statsd {
/// Send metrics to a statsd server at the address and port provided.
pub fn send_to<ADDR: ToSocketAddrs>(address: ADDR) -> error::Result<Statsd> {
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?);
socket.set_nonblocking(true)?;
socket.connect(address)?;
2018-06-26 16:28:38 +00:00
2018-07-03 15:44:29 +00:00
Ok(Statsd {
attributes: Attributes::default(),
socket,
})
}
}
2018-09-11 12:27:10 +00:00
impl Buffered for Statsd {}
impl Sampled for Statsd {}
2018-07-03 15:44:29 +00:00
2018-09-11 12:27:10 +00:00
impl queue_out::QueuedOutput for Statsd {}
impl cache_out::CachedOutput for Statsd {}
2018-06-26 16:28:38 +00:00
2018-07-03 15:44:29 +00:00
impl Output for Statsd {
type SCOPE = StatsdScope;
fn new_scope(&self) -> Self::SCOPE {
2018-07-03 15:44:29 +00:00
StatsdScope {
2018-06-14 16:37:21 +00:00
attributes: self.attributes.clone(),
2018-06-26 15:46:55 +00:00
buffer: Rc::new(RefCell::new(String::with_capacity(MAX_UDP_PAYLOAD))),
socket: self.socket.clone(),
2018-06-14 16:37:21 +00:00
}
}
}
2018-07-03 15:44:29 +00:00
impl WithAttributes for Statsd {
2018-06-26 16:28:38 +00:00
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
2018-06-14 16:37:21 +00:00
}
2018-06-26 16:28:38 +00:00
/// Statsd Input
#[derive(Debug, Clone)]
2018-07-03 15:44:29 +00:00
pub struct StatsdScope {
2018-06-14 16:37:21 +00:00
attributes: Attributes,
2018-06-26 15:46:55 +00:00
buffer: Rc<RefCell<String>>,
socket: Arc<UdpSocket>,
2018-06-14 16:37:21 +00:00
}
2018-09-11 12:27:10 +00:00
impl Sampled for StatsdScope {}
2018-06-26 15:46:55 +00:00
2018-07-03 15:44:29 +00:00
impl OutputScope for StatsdScope {
2018-06-26 16:28:38 +00:00
/// Define a metric of the specified type.
2018-10-26 01:20:47 +00:00
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let mut prefix = self.prefix_prepend(name).join(".");
2018-06-14 16:37:21 +00:00
prefix.push(':');
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match kind {
2018-10-26 01:20:47 +00:00
InputKind::Marker | InputKind::Counter => "c",
2018-12-20 21:19:03 +00:00
InputKind::Gauge | InputKind::Level => "g",
2018-10-26 01:20:47 +00:00
InputKind::Timer => "ms",
2018-06-14 16:37:21 +00:00
});
let scale = match kind {
// timers are in µs, statsd wants ms
2018-10-26 01:20:47 +00:00
InputKind::Timer => 1000,
2018-06-14 16:37:21 +00:00
_ => 1,
};
2018-06-26 16:28:38 +00:00
let cloned = self.clone();
2018-06-26 15:46:55 +00:00
if let Some(Sampling::Random(float_rate)) = self.get_sampling() {
2018-06-26 16:28:38 +00:00
suffix.push_str(&format!{"|@{}\n", float_rate});
2018-06-14 16:37:21 +00:00
let int_sampling_rate = pcg32::to_int_rate(float_rate);
2018-06-26 16:28:38 +00:00
let metric = StatsdMetric { prefix, suffix, scale };
2018-06-14 16:37:21 +00:00
2018-10-11 16:32:01 +00:00
OutputMetric::new(move |value, _labels| {
2018-06-14 16:37:21 +00:00
if pcg32::accept_sample(int_sampling_rate) {
2018-06-26 16:28:38 +00:00
cloned.print(&metric, value)
}
})
2018-06-14 16:37:21 +00:00
} else {
2018-06-26 16:28:38 +00:00
suffix.push_str("\n");
let metric = StatsdMetric { prefix, suffix, scale };
2018-10-11 16:32:01 +00:00
OutputMetric::new(move |value, _labels| {
2018-06-26 16:28:38 +00:00
cloned.print(&metric, value)
2018-06-14 16:37:21 +00:00
})
}
}
2018-06-24 04:31:34 +00:00
}
2018-07-03 15:44:29 +00:00
impl Flush for StatsdScope {
2018-06-14 16:37:21 +00:00
fn flush(&self) -> error::Result<()> {
2018-06-26 16:28:38 +00:00
let buf = self.buffer.borrow_mut();
self.flush_inner(buf)
}
}
2018-07-03 15:44:29 +00:00
impl StatsdScope {
2018-10-26 01:20:47 +00:00
fn print(&self, metric: &StatsdMetric, value: MetricValue) {
2018-06-26 16:28:38 +00:00
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
2018-06-26 15:46:55 +00:00
let mut buffer = self.buffer.borrow_mut();
2018-06-26 16:28:38 +00:00
if entry_len > buffer.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
}
let remaining = buffer.capacity() - buffer.len();
if entry_len + 1 > remaining {
// buffer is nearly full, make room
let _ = self.flush_inner(buffer);
buffer = self.buffer.borrow_mut();
} else {
if !buffer.is_empty() {
// separate from previous entry
buffer.push('\n')
}
buffer.push_str(&metric.prefix);
buffer.push_str(&value_str);
buffer.push_str(&metric.suffix);
}
if self.get_buffering().is_none() {
2018-06-26 16:28:38 +00:00
if let Err(e) = self.flush_inner(buffer) {
debug!("Could not send to statsd {}", e)
}
}
}
fn flush_inner(&self, mut buffer: RefMut<String>) -> error::Result<()> {
if !buffer.is_empty() {
2018-06-26 15:46:55 +00:00
match self.socket.send(buffer.as_bytes()) {
Ok(size) => {
metrics::STATSD_SENT_BYTES.count(size);
trace!("Sent {} bytes to statsd", buffer.len());
}
Err(e) => {
metrics::STATSD_SEND_ERR.mark();
return Err(e.into())
}
};
buffer.clear();
}
Ok(())
2018-06-14 16:37:21 +00:00
}
}
2018-07-03 15:44:29 +00:00
impl WithAttributes for StatsdScope {
2018-06-14 16:37:21 +00:00
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
2017-06-27 10:04:02 +00:00
}
2018-09-11 12:27:10 +00:00
impl Buffered for StatsdScope {}
2017-07-19 20:46:04 +00:00
2018-06-26 16:28:38 +00:00
/// Key of a statsd metric.
#[derive(Debug, Clone)]
pub struct StatsdMetric {
prefix: String,
suffix: String,
2018-12-18 21:06:03 +00:00
scale: isize,
2018-06-26 16:28:38 +00:00
}
2017-07-19 20:46:04 +00:00
2017-09-28 22:17:16 +00:00
/// Any remaining buffered data is flushed on Drop.
2018-07-03 15:44:29 +00:00
impl Drop for StatsdScope {
2017-09-22 21:39:57 +00:00
fn drop(&mut self) {
2018-06-14 16:37:21 +00:00
if let Err(err) = self.flush() {
2018-06-26 16:28:38 +00:00
warn!("Could not flush statsd metrics upon Drop: {}", err)
2018-06-14 16:37:21 +00:00
}
2017-07-22 02:51:49 +00:00
}
}
2018-10-10 18:29:30 +00:00
// TODO use templates for statsd format
//lazy_static!({
// static ref STATSD_FORMAT: StatsdFormat = StatsdFormat;
//});
//
//#[derive(Default)]
//pub struct StatsdFormat;
//
//impl Format for StatsdFormat {
2018-10-26 01:20:47 +00:00
// fn template(&self, name: &Name, kind: InputKind) -> Template {
2018-10-10 18:29:30 +00:00
// let mut before_value = name.join(".");
// before_value.push(':');
//
// let mut after_value = String::with_capacity(16);
// after_value.push('|');
// after_value.push_str(match kind {
2018-10-26 01:20:47 +00:00
// InputKind::Marker | InputKind::Counter => "c",
// InputKind::Gauge => "g",
// InputKind::Timer => "ms",
2018-10-10 18:29:30 +00:00
// });
//
// // specify sampling rate if any
// if let Some(Sampling::Random(float_rate)) = self.get_sampling() {
// suffix.push_str(&format! {"|@{}\n", float_rate});
// }
//
// // scale timer values
// let value_text = match kind {
// // timers are in µs, statsd wants ms
2018-10-26 01:20:47 +00:00
// InputKind::Timer => ScaledValueAsText(1000),
2018-10-10 18:29:30 +00:00
// _ => ValueAsText,
// };
//
// Template {
// commands: vec![
// StringLit(before_value),
// value_text,
// StringLit(after_value),
// NewLine,
// ]
// }
// }
//}
#[cfg(feature = "bench")]
mod bench {
2018-10-05 14:26:33 +00:00
use core::attributes::*;
use core::input::*;
use super::*;
use test;
#[bench]
2018-06-26 16:28:38 +00:00
pub fn immediate_statsd(b: &mut test::Bencher) {
let sd = Statsd::send_to("localhost:2003").unwrap().metrics();
2018-10-26 01:20:47 +00:00
let timer = sd.new_metric("timer".into(), InputKind::Timer);
2018-06-26 16:28:38 +00:00
2018-10-12 20:44:39 +00:00
b.iter(|| test::black_box(timer.write(2000, labels![])));
2018-06-26 16:28:38 +00:00
}
#[bench]
pub fn buffering_statsd(b: &mut test::Bencher) {
2018-07-03 15:44:29 +00:00
let sd = Statsd::send_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).metrics();
2018-10-26 01:20:47 +00:00
let timer = sd.new_metric("timer".into(), InputKind::Timer);
2018-10-12 20:44:39 +00:00
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}