From d764cb5826ff5095f029b2f91c7c0653646f33f1 Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Fri, 25 May 2018 17:10:45 -0400 Subject: [PATCH] Put back some 0.6 compat fn --- src/core.rs | 8 ++ src/deprecated.rs | 20 ++++- src/lib.rs | 3 + src/output.rs | 13 ++- src/prometheus.rs | 197 +++++++++++++++++++++++++++++++++++++++++ src/todo/prometheus.rs | 1 - 6 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 src/prometheus.rs delete mode 100644 src/todo/prometheus.rs diff --git a/src/core.rs b/src/core.rs index 71e4781..6393b29 100755 --- a/src/core.rs +++ b/src/core.rs @@ -138,6 +138,14 @@ impl<'a> From<&'a str> for Namespace { } } +impl<'a, 'b: 'a> From<&'b [&'a str]> for Namespace { + fn from(names: &'a [&'a str]) -> Namespace { + Namespace { + inner: names.iter().map(|n| n.to_string()).collect() + } + } +} + impl From for Namespace { fn from(name: String) -> Namespace { if name.is_empty() { diff --git a/src/deprecated.rs b/src/deprecated.rs index 1805572..2e076df 100644 --- a/src/deprecated.rs +++ b/src/deprecated.rs @@ -1,10 +1,28 @@ use scope::{Marker, Counter, Gauge, Timer, MetricScope}; use output::MetricOutput; -use core::Sampling; +use core::{Sampling, Namespace, Kind, Value}; +use aggregate::MetricAggregator; +use scores::ScoreType; use async_queue::WithAsyncQueue; use sample::WithSamplingRate; +/// Aggregate metrics in memory. +/// Depending on the type of metric, count, sum, minimum and maximum of values will be tracked. +/// Needs to be connected to a publish to be useful. +#[deprecated(since = "0.7.0", note = "Use `MetricAggregator::new()` instead.")] +pub fn aggregate(stats_fn: E, pub_scope: P) -> MetricAggregator + where + E: Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static, + P: Into>, + M: Send + Sync + 'static + Clone, +{ + let agg = MetricAggregator::new(); + agg.set_stats(stats_fn); + agg.set_output(pub_scope); + agg +} + /// Enqueue collected metrics for dispatch on background thread. #[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")] pub fn async(queue_size: usize, chain: IC) -> MetricOutput diff --git a/src/lib.rs b/src/lib.rs index e5917be..8adf353 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,9 @@ pub use statsd::{Statsd, to_statsd}; mod graphite; pub use graphite::{Graphite, to_graphite, to_buffered_graphite}; +//mod prometheus; +//pub use prometheus::{Prometheus, to_prometheus, to_buffered_prometheus}; + mod socket; pub use socket::RetrySocket; diff --git a/src/output.rs b/src/output.rs index db39917..89bd0f9 100755 --- a/src/output.rs +++ b/src/output.rs @@ -91,15 +91,22 @@ impl MetricOutput { } } - /// Return a copy of this output with the specified name appended to the namespace. - pub fn with_suffix(&self, name: &str) -> Self { + /// Return cloned output with appended namespace. + pub fn with_namespace(&self, names: impl Into) -> Self { + let mut namespace = self.namespace.clone(); + namespace.extend(&names.into()); MetricOutput { - namespace: self.namespace.with_suffix(name), + namespace, define_metric_fn: self.define_metric_fn.clone(), open_scope_fn: self.open_scope_fn.clone(), } } + /// Return a copy of this output with the specified name appended to the namespace. + pub fn with_suffix(&self, name: &str) -> Self { + self.with_namespace(name) + } + } //impl<'a, M: Send + Sync + Clone + 'static> Index<&'a str> for MetricOutput { diff --git a/src/prometheus.rs b/src/prometheus.rs new file mode 100644 index 0000000..bc43544 --- /dev/null +++ b/src/prometheus.rs @@ -0,0 +1,197 @@ + +//! Send metrics to a prometheus server. + +use core::*; +use output::*; +use error; +use self_metrics::*; + +use std::net::ToSocketAddrs; + +use std::sync::{Arc, RwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::io::Write; +use std::fmt::Debug; + +use socket::RetrySocket; + +metrics!{ + DIPSTICK_METRICS.with_suffix("prometheus") => { + Marker SEND_ERR: "send_failed"; + Marker TRESHOLD_EXCEEDED: "bufsize_exceeded"; + Counter SENT_BYTES: "sent_bytes"; + } +} + +/// Send metrics to a prometheus server at the address and port provided. +pub fn to_prometheus(address: ADDR) -> error::Result> + where + ADDR: ToSocketAddrs + Debug + Clone, +{ + debug!("Connecting to prometheus {:?}", address); + let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?)); + + Ok(metric_output( + move |ns, kind, rate| prometheus_metric(ns, kind, rate), + move || prometheus_scope(&socket, false), + )) +} + +/// Send metrics to a prometheus server at the address and port provided. +pub fn to_buffered_prometheus(address: ADDR) -> error::Result> + where + ADDR: ToSocketAddrs + Debug + Clone, +{ + debug!("Connecting to prometheus {:?}", address); + let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?)); + + Ok(metric_output( + move |ns, kind, rate| prometheus_metric(ns, kind, rate), + move || prometheus_scope(&socket, true), + )) +} + +fn prometheus_metric(namespace: &Namespace, kind: Kind, rate: Sampling) -> Prometheus { + let mut prefix = namespace.join("."); + prefix.push(' '); + + let mut scale = match kind { + // timers are in µs, lets give prometheus milliseconds for consistency with statsd + Kind::Timer => 1000, + _ => 1, + }; + + if rate < FULL_SAMPLING_RATE { + // prometheus does not do sampling, so we'll upsample before sending + let upsample = (1.0 / rate).round() as u64; + warn!( + "Metric {:?} '{:?}' being sampled at rate {} will be upsampled \ + by a factor of {} when sent to prometheus.", + kind, namespace, rate, upsample + ); + scale *= upsample; + } + + Prometheus { prefix, scale } +} + +fn prometheus_scope(socket: &Arc>, buffered: bool) -> CommandFn { + let buf = ScopeBuffer { + buffer: Arc::new(RwLock::new(String::new())), + socket: socket.clone(), + buffered, + }; + command_fn(move |cmd| match cmd { + Command::Write(metric, value) => buf.write(metric, value), + Command::Flush => buf.flush(), + }) +} + +/// Its hard to see how a single scope could get more metrics than this. +// TODO make configurable? +const BUFFER_FLUSH_THRESHOLD: usize = 65_536; + +/// Key of a prometheus metric. +#[derive(Debug, Clone)] +pub struct Prometheus { + prefix: String, + scale: u64, +} + +/// Wrap string buffer & socket as one. +#[derive(Debug)] +struct ScopeBuffer { + buffer: Arc>, + socket: Arc>, + buffered: bool, +} + +/// Any remaining buffered data is flushed on Drop. +impl Drop for ScopeBuffer { + fn drop(&mut self) { + self.flush() + } +} + +impl ScopeBuffer { + fn write(&self, metric: &Prometheus, value: Value) { + let scaled_value = value / metric.scale; + let value_str = scaled_value.to_string(); + + let start = SystemTime::now(); + match start.duration_since(UNIX_EPOCH) { + Ok(timestamp) => { + let mut buf = self.buffer.write().expect("Locking prometheus buffer"); + + buf.push_str(&metric.prefix); + buf.push_str(&value_str); + buf.push(' '); + buf.push_str(timestamp.as_secs().to_string().as_ref()); + buf.push('\n'); + + if buf.len() > BUFFER_FLUSH_THRESHOLD { + TRESHOLD_EXCEEDED.mark(); + warn!( + "Flushing metrics scope buffer to prometheus because its size exceeds \ + the threshold of {} bytes. ", + BUFFER_FLUSH_THRESHOLD + ); + self.flush_inner(&mut buf); + } else if !self.buffered { + self.flush_inner(&mut buf); + } + } + Err(e) => { + warn!("Could not compute epoch timestamp. {}", e); + } + }; + } + + fn flush_inner(&self, buf: &mut String) { + if !buf.is_empty() { + let mut sock = self.socket.write().expect("Locking prometheus socket"); + match sock.write(buf.as_bytes()) { + Ok(size) => { + buf.clear(); + SENT_BYTES.count(size); + trace!("Sent {} bytes to prometheus", buf.len()); + } + Err(e) => { + SEND_ERR.mark(); + // still just a best effort, do not warn! for every failure + debug!("Failed to send buffer to prometheus: {}", e); + } + }; + buf.clear(); + } + } + + fn flush(&self) { + let mut buf = self.buffer.write().expect("Locking prometheus buffer"); + self.flush_inner(&mut buf); + } +} + +#[cfg(feature = "bench")] +mod bench { + + use super::*; + use test; + + #[bench] + pub fn unbufferd_prometheus(b: &mut test::Bencher) { + let sd = to_prometheus("localhost:8125").unwrap().open_scope(); + let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0); + + b.iter(|| test::black_box(sd.write(&timer, 2000))); + } + + #[bench] + pub fn buffered_prometheus(b: &mut test::Bencher) { + let sd = to_buffered_prometheus("localhost:8125").unwrap().open_scope(); + let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0); + + b.iter(|| test::black_box(sd.write(&timer, 2000))); + } + +} diff --git a/src/todo/prometheus.rs b/src/todo/prometheus.rs deleted file mode 100644 index 8b13789..0000000 --- a/src/todo/prometheus.rs +++ /dev/null @@ -1 +0,0 @@ -