From f32a1777b0d739e65a282109831a5ff97c386e09 Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Tue, 26 Jun 2018 15:51:49 -0400 Subject: [PATCH] Multi output raw --- Cargo.toml | 6 +- examples/bucket_cleanup.rs | 37 ++-- examples/{summary.rs => bucket_summary.rs} | 0 examples/buffered.rs | 46 ----- examples/buffered_flush_on_drop.rs | 25 +++ examples/multi_out.rs | 9 +- examples/multi_out_raw.rs | 27 +++ examples/sampling.rs | 18 -- src/bucket.rs | 2 +- src/core.rs | 48 ++++- src/error.rs | 8 +- src/lib.rs | 12 +- src/multi.rs | 13 +- src/multi_raw.rs | 98 ++++++++++ src/{raw_queue.rs => queue_raw.rs} | 0 src/statsd_crap.rs | 217 --------------------- 16 files changed, 237 insertions(+), 329 deletions(-) rename examples/{summary.rs => bucket_summary.rs} (100%) delete mode 100644 examples/buffered.rs create mode 100644 examples/buffered_flush_on_drop.rs create mode 100644 examples/multi_out_raw.rs delete mode 100644 examples/sampling.rs create mode 100755 src/multi_raw.rs rename src/{raw_queue.rs => queue_raw.rs} (100%) delete mode 100755 src/statsd_crap.rs diff --git a/Cargo.toml b/Cargo.toml index 28e0cee..74536e9 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,10 @@ num = { version = "0.1", default-features = false } # FIXME required only for random seed for sampling time = "0.1" -# optional dep for prometheus format -protobuf = { version = "2", features = ["with-bytes"], optional = true } + +# optional dep for prometheus binary format +protobuf = { version = "2", optional = true } + # optional dep for standalone http pull metrics tiny_http = { version = "0.6.0", optional = true } diff --git a/examples/bucket_cleanup.rs b/examples/bucket_cleanup.rs index 5ddb428..e88dd02 100755 --- a/examples/bucket_cleanup.rs +++ b/examples/bucket_cleanup.rs @@ -1,32 +1,31 @@ -//! A sample application continuously aggregating metrics, -//! printing the summary stats every three seconds +//! Transient metrics are not retained by buckets after flushing. extern crate dipstick; use dipstick::*; -fn main() { - let metrics = Bucket::new(); +use std::io; +use std::time::Duration; +use std::thread::sleep; - let counter = metrics.counter("counter_a"); - let timer = metrics.timer("timer_a"); - let gauge = metrics.gauge("gauge_a"); - let marker = metrics.marker("marker_a"); + +fn main() { + let bucket = Bucket::new(); + Bucket::set_default_target(Text::output(io::stdout())); + + let persistent_marker = bucket.marker("persistent"); + + let mut i = 0; loop { - // add counts forever, non-stop - counter.count(11); - counter.count(12); - counter.count(13); + i += 1; + let transient_marker = bucket.marker(&format!("marker_{}", i)); - timer.interval_us(11_000_000); - timer.interval_us(12_000_000); - timer.interval_us(13_000_000); + transient_marker.mark(); + persistent_marker.mark(); - gauge.value(11); - gauge.value(12); - gauge.value(13); + bucket.flush().unwrap(); - marker.mark(); + sleep(Duration::from_secs(1)); } } diff --git a/examples/summary.rs b/examples/bucket_summary.rs similarity index 100% rename from examples/summary.rs rename to examples/bucket_summary.rs diff --git a/examples/buffered.rs b/examples/buffered.rs deleted file mode 100644 index 5507289..0000000 --- a/examples/buffered.rs +++ /dev/null @@ -1,46 +0,0 @@ -// metrics are printed at the end of every cycle as scope is dropped -// use scope.flush_on_drop(false) and scope.flush() to control flushing if required - -extern crate dipstick; - -use std::time::Duration; -use std::thread::sleep; -use std::io; - -use dipstick::*; - -fn main() { - let output = Text::output(io::stdout()).with_buffering(Buffering::Unlimited); - - loop { - // add counts forever, non-stop - println!("\n------- open scope"); - - let metrics = output.open_scope(); - - let counter = metrics.counter("counter_a"); - let timer = metrics.timer("timer_a"); - let gauge = metrics.gauge("gauge_a"); - let marker = metrics.marker("marker_a"); - - counter.count(11); - counter.count(12); - counter.count(13); - - timer.interval_us(11_000_000); - timer.interval_us(12_000_000); - timer.interval_us(13_000_000); - - sleep(Duration::from_millis(1000)); - - gauge.value(11); - gauge.value(12); - gauge.value(13); - - marker.mark(); - - sleep(Duration::from_millis(1000)); - - println!("------- close scope: "); - } -} diff --git a/examples/buffered_flush_on_drop.rs b/examples/buffered_flush_on_drop.rs new file mode 100644 index 0000000..db8705a --- /dev/null +++ b/examples/buffered_flush_on_drop.rs @@ -0,0 +1,25 @@ +//! Metrics are printed at the end of every cycle as scope is dropped + +extern crate dipstick; + +use std::time::Duration; +use std::thread::sleep; +use std::io; + +use dipstick::*; + +fn main() { + let output = Text::output(io::stdout()).with_buffering(Buffering::Unlimited); + + loop { + println!("\n------- open scope"); + + let metrics = output.open_scope(); + + metrics.marker("marker_a").mark(); + + sleep(Duration::from_millis(1000)); + + println!("------- close scope: "); + } +} diff --git a/examples/multi_out.rs b/examples/multi_out.rs index b112f2a..d8c96f3 100644 --- a/examples/multi_out.rs +++ b/examples/multi_out.rs @@ -8,12 +8,13 @@ use std::io; fn main() { // will output metrics to graphite and to stdout - let different_type_metrics = MultiOutput::new() + let different_type_metrics = Multi::output() .add_target(Graphite::output("localhost:2003").expect("Connecting")) - .add_target(Text::output(io::stdout())).open_scope(); + .add_target(Text::output(io::stdout())) + .open_scope(); // will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix. - let same_type_metrics = MultiOutput::new() + let same_type_metrics = Multi::output() .add_target(Text::output(io::stdout()).add_prefix("yeah")) .add_target(Text::output(io::stdout()).add_prefix("ouch")) .add_prefix("cool").open_scope(); @@ -21,6 +22,6 @@ fn main() { loop { different_type_metrics.counter("counter_a").count(123); same_type_metrics.timer("timer_a").interval_us(2000000); - std::thread::sleep(Duration::from_millis(40)); + std::thread::sleep(Duration::from_millis(400)); } } diff --git a/examples/multi_out_raw.rs b/examples/multi_out_raw.rs new file mode 100644 index 0000000..abe3b5f --- /dev/null +++ b/examples/multi_out_raw.rs @@ -0,0 +1,27 @@ +//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout. + +extern crate dipstick; + +use dipstick::*; +use std::time::Duration; +use std::io; + +fn main() { + // will output metrics to graphite and to stdout + let different_type_metrics = MultiRaw::output() + .add_raw_target(Graphite::output("localhost:2003").expect("Connecting")) + .add_raw_target(Text::output(io::stdout())) + .open_scope_raw(); + + // will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix. + let same_type_metrics = MultiRaw::output() + .add_raw_target(Text::output(io::stdout()).add_prefix("yeah")) + .add_raw_target(Text::output(io::stdout()).add_prefix("ouch")) + .add_prefix("cool").open_scope(); + + loop { + different_type_metrics.new_metric_raw("counter_a".into(), Kind::Counter).write(123); + same_type_metrics.new_metric_raw("timer_a".into(), Kind::Timer).write(6677); + std::thread::sleep(Duration::from_millis(400)); + } +} diff --git a/examples/sampling.rs b/examples/sampling.rs deleted file mode 100644 index 62c3013..0000000 --- a/examples/sampling.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! An app demonstrating the basics of the metrics front-end. -//! Defines metrics of each kind and use them to print values to the console in multiple ways. - -extern crate dipstick; - -use dipstick::*; - -fn main() { - // print only 1 out of every 10000 metrics recorded - let app_metrics = Statsd::output("statsd:8125").expect("Statsd") - .with_sampling(Sampling::Random(0.0001)).open_scope_dyn(); - - let marker = app_metrics.marker("marker_a"); - - loop { - marker.mark(); - } -} diff --git a/src/bucket.rs b/src/bucket.rs index 3e1ae6b..a68bfdb 100755 --- a/src/bucket.rs +++ b/src/bucket.rs @@ -69,7 +69,7 @@ impl InnerBucket { let pub_scope = match self.output { Some(ref out) => out.open_scope_raw_dyn(), - None => output_none().open_scope_raw_dyn(), + None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().open_scope_raw_dyn(), }; self.flush_to(pub_scope.borrow(), stats_fn.as_ref()); diff --git a/src/core.rs b/src/core.rs index 5988993..a2925bf 100755 --- a/src/core.rs +++ b/src/core.rs @@ -4,8 +4,10 @@ use clock::TimeHandle; use queue; -use raw_queue; +use queue_raw; use cache; +use multi; +use multi_raw; use error; use std::sync::{Arc, Mutex}; @@ -281,6 +283,46 @@ pub trait Output: OutputDyn + Send + Sync + 'static + Sized { } +///// Wrap this output behind an asynchronous metrics dispatch queue. +///// This is not strictly required for multi threading since the provided scopes +///// are already Send + Sync but might be desired to lower the latency +//pub trait WithMultiOutput: OutputDyn + Send + Sync + 'static + Sized { +// /// Wrap this output with an asynchronous dispatch queue of specified length. +// fn add_target(self, target: OUT) -> multi::MultiOutput { +// multi::Multi::output().add_target(self).add_target(target) +// } +//} +// +///// Blanket output concatenation. +//impl WithMultiOutput for T {} +// +///// Wrap this output behind an asynchronous metrics dispatch queue. +///// This is not strictly required for multi threading since the provided scopes +///// are already Send + Sync but might be desired to lower the latency +//pub trait WithMultiRawOutput: RawOutputDyn + Send + Sync + 'static + Sized { +// /// Wrap this output with an asynchronous dispatch queue of specified length. +// fn add_raw_target(self, target: OUT) -> multi_raw::MultiRawOutput { +// multi_raw::MultiRaw::output().add_raw_target(self).add_raw_target(target) +// } +//} +// +///// Blanket output concatenation. +//impl WithMultiRawOutput for T {} + + +/// Wrap this output behind an asynchronous metrics dispatch queue. +/// This is not strictly required for multi threading since the provided scopes +/// are already Send + Sync but might be desired to lower the latency +pub trait WithMultiScope: Scope + Send + Sync + 'static + Sized { + /// Wrap this output with an asynchronous dispatch queue of specified length. + fn add_target(self, target: OUT) -> multi::Multi { + multi::Multi::new().add_target(self).add_target(target) + } +} + +/// Blanket scope concatenation. +impl WithMultiScope for T {} + /// Wrap this output behind an asynchronous metrics dispatch queue. /// This is not strictly required for multi threading since the provided scopes /// are already Send + Sync but might be desired to lower the latency @@ -389,8 +431,8 @@ pub trait RawOutput: RawOutputDyn + Send + Sync + 'static + Sized { /// Wrap this raw output behind an asynchronous metrics dispatch queue. pub trait WithRawQueue: RawOutput + Sized { /// Wrap this output with an asynchronous dispatch queue of specified length. - fn with_async_queue(self, queue_length: usize) -> raw_queue::RawQueueOutput { - raw_queue::RawQueueOutput::new(self, queue_length) + fn with_async_queue(self, queue_length: usize) -> queue_raw::RawQueueOutput { + queue_raw::RawQueueOutput::new(self, queue_length) } } diff --git a/src/error.rs b/src/error.rs index 547b626..966d209 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ use std::fmt::{self, Display, Formatter}; use std::result; use std::sync::mpsc; use queue; -use raw_queue; +use queue_raw; use self::Error::*; @@ -18,7 +18,7 @@ pub enum Error { /// An error from the async metric queue. Async(mpsc::SendError), /// An error from the async metric queue. - RawAsync(mpsc::SendError) + RawAsync(mpsc::SendError) } impl Display for Error { @@ -64,8 +64,8 @@ impl From> for Error { } } -impl From> for Error { - fn from(err: mpsc::SendError) -> Self { +impl From> for Error { + fn from(err: mpsc::SendError) -> Self { RawAsync(err) } } diff --git a/src/lib.rs b/src/lib.rs index f0e3141..007cfb3 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ pub use error::{Error, Result}; pub mod core; pub use core::{Value, Kind, Marker, Timer, Counter, Gauge, - Scope, Output, OutputDyn, + Flush, Scope, Output, OutputDyn, Name, AddPrefix, WithSampling, Sampling, Buffering, WithBuffering, WithMetricCache, WithQueue, WithRawQueue, RawScope, RawOutput, RawMetric, UnsafeScope, RawOutputDyn, output_none, VoidOutput}; @@ -52,9 +52,6 @@ mod pcg32; mod scores; pub use scores::ScoreType; -//mod statsd; -//pub use statsd::{StatsdOutput, Statsd}; - mod statds; pub use statds::{StatsdOutput, Statsd}; @@ -80,11 +77,14 @@ pub use cache::{Cache, CacheOutput}; mod multi; pub use multi::{MultiOutput, Multi}; +mod multi_raw; +pub use multi_raw::{MultiRawOutput, MultiRaw}; + mod queue; pub use queue::{Queue, QueueOutput}; -mod raw_queue; -pub use raw_queue::{RawQueue, RawQueueOutput}; +mod queue_raw; +pub use queue_raw::{RawQueue, RawQueueOutput}; mod scheduler; pub use scheduler::{set_schedule, CancelHandle, ScheduleFlush}; diff --git a/src/multi.rs b/src/multi.rs index b486784..71e5574 100755 --- a/src/multi.rs +++ b/src/multi.rs @@ -24,14 +24,6 @@ impl Output for MultiOutput { } impl MultiOutput { - /// Create a new multi dispatcher with no outputs configured. - pub fn new() -> Self { - MultiOutput { - attributes: Attributes::default(), - outputs: vec![], - } - } - /// Returns a clone of the dispatch with the new output added to the list. pub fn add_target(&self, out: OUT) -> Self { let mut cloned = self.clone(); @@ -63,7 +55,10 @@ impl Multi { /// Create a new multi-output. pub fn output() -> MultiOutput { - MultiOutput::new() + MultiOutput { + attributes: Attributes::default(), + outputs: vec![], + } } /// Returns a clone of the dispatch with the new output added to the list. diff --git a/src/multi_raw.rs b/src/multi_raw.rs new file mode 100755 index 0000000..ee96192 --- /dev/null +++ b/src/multi_raw.rs @@ -0,0 +1,98 @@ +//! Dispatch metrics to multiple sinks. + +use core::{RawOutput, RawScope, Name, AddPrefix, RawOutputDyn, Kind, RawMetric, WithAttributes, Attributes, Flush}; +use error; +use std::rc::Rc; +use std::sync::Arc; + +/// Opens multiple scopes at a time from just as many outputs. +#[derive(Clone)] +pub struct MultiRawOutput { + attributes: Attributes, + outputs: Vec>, +} + +impl RawOutput for MultiRawOutput { + type SCOPE = MultiRaw; + + fn open_scope_raw(&self) -> Self::SCOPE { + let scopes = self.outputs.iter().map(|out| out.open_scope_raw_dyn()).collect(); + MultiRaw { + attributes: self.attributes.clone(), + scopes, + } + } +} + +impl MultiRawOutput { + + /// Returns a clone of the dispatch with the new output added to the list. + pub fn add_raw_target(&self, out: OUT) -> Self { + let mut cloned = self.clone(); + cloned.outputs.push(Arc::new(out)); + cloned + } +} + +impl WithAttributes for MultiRawOutput { + fn get_attributes(&self) -> &Attributes { &self.attributes } + fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } +} + +/// Dispatch metric values to a list of scopes. +#[derive(Clone)] +pub struct MultiRaw { + attributes: Attributes, + scopes: Vec>, +} + +impl MultiRaw { + /// Create a new multi scope dispatcher with no scopes. + pub fn new() -> Self { + MultiRaw { + attributes: Attributes::default(), + scopes: vec![], + } + } + + /// Create a new multi-output. + pub fn output() -> MultiRawOutput { + MultiRawOutput { + attributes: Attributes::default(), + outputs: vec![], + } + } + + /// Returns a clone of the dispatch with the new output added to the list. + pub fn add_raw_target(&self, scope: IN) -> Self { + let mut cloned = self.clone(); + cloned.scopes.push(Rc::new(scope)); + cloned + } +} + +impl RawScope for MultiRaw { + fn new_metric_raw(&self, name: Name, kind: Kind) -> RawMetric { + let ref name = self.qualified_name(name); + let metrics: Vec = self.scopes.iter() + .map(move |scope| scope.new_metric_raw(name.clone(), kind)) + .collect(); + RawMetric::new(move |value| for metric in &metrics { + metric.write(value) + }) + } +} + +impl Flush for MultiRaw { + fn flush(&self) -> error::Result<()> { + for w in &self.scopes { + w.flush()?; + } + Ok(()) + } +} + +impl WithAttributes for MultiRaw { + fn get_attributes(&self) -> &Attributes { &self.attributes } + fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } +} diff --git a/src/raw_queue.rs b/src/queue_raw.rs similarity index 100% rename from src/raw_queue.rs rename to src/queue_raw.rs diff --git a/src/statsd_crap.rs b/src/statsd_crap.rs deleted file mode 100755 index bcd15a9..0000000 --- a/src/statsd_crap.rs +++ /dev/null @@ -1,217 +0,0 @@ -//! Send metrics to a statsd server. - -use core::{RawScope, RawOutput, Value, RawMetric, Attributes, WithAttributes, Kind, - Name, WithSamplingRate, AddPrefix, WithBuffering, Sampling, WithMetricCache, WithQueue, Flush}; -use pcg32; -use error; -use metrics; - -use std::net::UdpSocket; -use std::sync::Arc; -use std::rc::Rc; -use std::cell::{RefCell, RefMut}; - -pub use std::net::ToSocketAddrs; - -/// Statsd output holds a UDP client socket to a statsd host. -/// The output's connection is shared between all scopes originating from it. -#[derive(Debug, Clone)] -pub struct StatsdOutput { - attributes: Attributes, - socket: Arc, -} - -impl RawOutput for StatsdOutput { - type SCOPE = Statsd; - fn open_scope_raw(&self) -> Self::SCOPE { - Statsd { - attributes: self.attributes.clone(), - buffer: Rc::new(RefCell::new(String::with_capacity(MAX_UDP_PAYLOAD))), - socket: self.socket.clone(), - } - } -} - -impl WithAttributes for StatsdOutput { - fn get_attributes(&self) -> &Attributes { - &self.attributes - } - fn mut_attributes(&mut self) -> &mut Attributes { - &mut self.attributes - } -} - -impl WithBuffering for StatsdOutput {} -impl WithSamplingRate for StatsdOutput {} - -impl WithMetricCache for StatsdOutput {} -impl WithQueue for StatsdOutput {} - -/// Metrics input for statsd. -#[derive(Clone)] -pub struct Statsd { - attributes: Attributes, - buffer: Rc>, - socket: Arc, -} - -impl Statsd { - /// Send metrics to a statsd server at the address and port provided. - pub fn output(address: ADDR) -> error::Result { - let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?); - socket.set_nonblocking(true)?; - socket.connect(address)?; - - Ok(StatsdOutput { - attributes: Attributes::default(), - socket, - }) - } - - fn print(&self, mut buffer: RefMut, prefix: &str, suffix: &str, scale: u64, value: Value) { - let scaled_value = value / scale; - let value_str = scaled_value.to_string(); - let entry_len = prefix.len() + value_str.len() + suffix.len(); - - if entry_len > buffer.capacity() { - // TODO report entry too big to fit in buffer (!?) - return; - } - - let remaining = buffer.capacity() - buffer.len(); - if entry_len + 1 > remaining { - // buffer is full, flush before appending - let _ = self.flush(); - } else { - if !buffer.is_empty() { - // separate from previous entry - buffer.push('\n') - } - buffer.push_str(prefix); - buffer.push_str(&value_str); - buffer.push_str(suffix); - } - - if !self.is_buffering() { - if let Err(e) = self.flush_inner(buffer) { - debug!("Could not send to statsd {}", e) - } - } - } - - fn flush_inner(&self, mut buffer: RefMut) -> error::Result<()> { - if buffer.is_empty() { - match self.socket.send(buffer.as_bytes()) { - Ok(size) => { - metrics::STATSD_SENT_BYTES.count(size); - trace!("Sent {} bytes to statsd", buffer.len()); - } - Err(e) => { - metrics::STATSD_SEND_ERR.mark(); - return Err(e.into()) - } - }; - buffer.clear(); - } - Ok(()) - } - - -} - -impl RawScope for Statsd { - fn new_metric_raw(&self, name: Name, kind: Kind) -> RawMetric { - let mut prefix = self.qualified_name(name).join("."); - prefix.push(':'); - - let mut suffix = String::with_capacity(16); - suffix.push('|'); - suffix.push_str(match kind { - Kind::Marker | Kind::Counter => "c", - Kind::Gauge => "g", - Kind::Timer => "ms", - }); - - let scale = match kind { - // timers are in µs, statsd wants ms - Kind::Timer => 1000, - _ => 1, - }; - - let cloned = self.clone(); - - if let Sampling::SampleRate(float_rate) = self.get_sampling() { - suffix.push_str(&format!{"|@{}", float_rate}); - let int_sampling_rate = pcg32::to_int_rate(float_rate); - - RawMetric::new(move |value| { - if pcg32::accept_sample(int_sampling_rate) { - let buffer = cloned.buffer.borrow_mut(); - cloned.print(buffer, &prefix, &suffix, scale, value) - } - }) - } else { - RawMetric::new(move |value| { - let buffer = cloned.buffer.borrow_mut(); - cloned.print(buffer, &prefix, &suffix, scale, value) - }) - } - } -} - -impl Flush for Statsd { - - fn flush(&self) -> error::Result<()> { - let buf = self.buffer.borrow_mut(); - self.flush_inner(buf) - } - -} - -impl WithBuffering for Statsd {} - -impl WithSamplingRate for Statsd {} - -impl WithAttributes for Statsd { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } -} - -/// Use a safe maximum size for UDP to prevent fragmentation. -// TODO make configurable? -const MAX_UDP_PAYLOAD: usize = 576; - - -/// Any remaining buffered data is flushed on Drop. -impl Drop for Statsd { - fn drop(&mut self) { - if let Err(err) = self.flush() { - warn!("Couldn't flush statsd buffer on Drop: {}", err) - } - } -} - -#[cfg(feature = "bench")] -mod bench { - - use core::*; - use super::*; - use test; - - #[bench] - pub fn immediate_statsd(b: &mut test::Bencher) { - let sd = Statsd::output("localhost:8125").unwrap().open_scope_raw(); - let timer = sd.new_metric_raw("timer".into(), Kind::Timer); - - b.iter(|| test::black_box(timer.write(2000))); - } - - #[bench] - pub fn buffering_statsd(b: &mut test::Bencher) { - let sd = Statsd::output("localhost:8125").unwrap().with_buffering(Buffering::BufferSize(3534)).open_scope_raw(); - let timer = sd.new_metric_raw("timer".into(), Kind::Timer); - - b.iter(|| test::black_box(timer.write(2000))); - } - -}