diff --git a/build.rs b/build.rs index dfa9078..a72bf33 100755 --- a/build.rs +++ b/build.rs @@ -1,10 +1,8 @@ -#[cfg(feature="skeptic")] +#[cfg(feature = "skeptic")] extern crate skeptic; fn main() { // generates documentation tests. - #[cfg(feature="skeptic")] + #[cfg(feature = "skeptic")] skeptic::generate_doc_tests(&["README.md", "HANDBOOK.md"]); } - - diff --git a/examples/async_queue.rs b/examples/async_queue.rs index 510ea38..22a26a8 100755 --- a/examples/async_queue.rs +++ b/examples/async_queue.rs @@ -2,10 +2,10 @@ extern crate dipstick; +use dipstick::{Input, InputScope, QueuedOutput, Stream}; +use std::thread; use std::thread::sleep; use std::time::Duration; -use dipstick::{Stream, InputScope, QueuedOutput, Input}; -use std::thread; fn main() { let async_metrics = Stream::to_stdout().queued(100).metrics(); @@ -21,5 +21,4 @@ fn main() { }); } sleep(Duration::from_secs(5000)); - } diff --git a/examples/basics.rs b/examples/basics.rs index 485ef26..4114e40 100644 --- a/examples/basics.rs +++ b/examples/basics.rs @@ -3,10 +3,10 @@ extern crate dipstick; -use std::thread::sleep; -use std::io; -use std::time::Duration; use dipstick::*; +use std::io; +use std::thread::sleep; +use std::time::Duration; fn main() { // for this demo, print metric values to the console diff --git a/examples/bench_bucket.rs b/examples/bench_bucket.rs index 35c80ff..fe2c75e 100755 --- a/examples/bench_bucket.rs +++ b/examples/bench_bucket.rs @@ -2,12 +2,12 @@ extern crate dipstick; -use std::thread::sleep; -use std::time::Duration; use dipstick::*; -use std::thread; use std::env::args; use std::str::FromStr; +use std::thread; +use std::thread::sleep; +use std::time::Duration; fn main() { let bucket = AtomicBucket::new(); @@ -27,5 +27,4 @@ fn main() { sleep(Duration::from_secs(5)); bucket.stats(stats_all); bucket.flush_to(&Stream::to_stdout().new_scope()).unwrap(); - } diff --git a/examples/bench_bucket_proxy.rs b/examples/bench_bucket_proxy.rs index 085907e..55f5089 100755 --- a/examples/bench_bucket_proxy.rs +++ b/examples/bench_bucket_proxy.rs @@ -2,12 +2,12 @@ extern crate dipstick; -use std::thread::sleep; -use std::time::Duration; use dipstick::*; -use std::thread; use std::env::args; use std::str::FromStr; +use std::thread; +use std::thread::sleep; +use std::time::Duration; fn main() { let event = Proxy::default().marker("a"); @@ -30,5 +30,4 @@ fn main() { } sleep(Duration::from_secs(5)); bucket.flush_to(&Stream::to_stdout().new_scope()).unwrap(); - } diff --git a/examples/bench_queue.rs b/examples/bench_queue.rs index 89f801f..53f5968 100755 --- a/examples/bench_queue.rs +++ b/examples/bench_queue.rs @@ -2,12 +2,12 @@ extern crate dipstick; -use std::thread::sleep; -use std::time::Duration; use dipstick::*; -use std::thread; use std::env::args; use std::str::FromStr; +use std::thread; +use std::thread::sleep; +use std::time::Duration; fn main() { let bucket = AtomicBucket::new(); @@ -27,5 +27,4 @@ fn main() { } sleep(Duration::from_secs(5)); bucket.flush_to(&Stream::to_stdout().new_scope()).unwrap(); - } diff --git a/examples/bucket2graphite.rs b/examples/bucket2graphite.rs index 8c766c9..c30c5db 100755 --- a/examples/bucket2graphite.rs +++ b/examples/bucket2graphite.rs @@ -3,8 +3,8 @@ extern crate dipstick; -use std::time::Duration; use dipstick::*; +use std::time::Duration; fn main() { // adding a name to the bucket @@ -12,8 +12,12 @@ fn main() { // adding two names to Graphite output // metrics will be prefixed with "machine1.application.test" - bucket.drain(Graphite::send_to("localhost:2003").expect("Socket") - .named("machine1").add_name("application")); + bucket.drain( + Graphite::send_to("localhost:2003") + .expect("Socket") + .named("machine1") + .add_name("application"), + ); bucket.flush_every(Duration::from_secs(3)); diff --git a/examples/bucket2stdout.rs b/examples/bucket2stdout.rs index a8c3c04..2c591d2 100755 --- a/examples/bucket2stdout.rs +++ b/examples/bucket2stdout.rs @@ -3,8 +3,8 @@ extern crate dipstick; -use std::time::Duration; use dipstick::*; +use std::time::Duration; fn main() { let metrics = AtomicBucket::new().named("test"); diff --git a/examples/bucket_cleanup.rs b/examples/bucket_cleanup.rs index ae6b4d0..5d51e0a 100755 --- a/examples/bucket_cleanup.rs +++ b/examples/bucket_cleanup.rs @@ -4,9 +4,8 @@ extern crate dipstick; use dipstick::*; -use std::time::Duration; use std::thread::sleep; - +use std::time::Duration; fn main() { let bucket = AtomicBucket::new(); diff --git a/examples/bucket_summary.rs b/examples/bucket_summary.rs index 71afeb8..2661c76 100644 --- a/examples/bucket_summary.rs +++ b/examples/bucket_summary.rs @@ -3,11 +3,10 @@ extern crate dipstick; -use std::time::Duration; use dipstick::*; +use std::time::Duration; fn main() { - let app_metrics = AtomicBucket::new(); app_metrics.drain(Stream::to_stdout()); diff --git a/examples/buffered_flush_on_drop.rs b/examples/buffered_flush_on_drop.rs index b5fbf65..acc8e3e 100755 --- a/examples/buffered_flush_on_drop.rs +++ b/examples/buffered_flush_on_drop.rs @@ -2,8 +2,8 @@ extern crate dipstick; -use std::time::Duration; use std::thread::sleep; +use std::time::Duration; use dipstick::*; diff --git a/examples/cache.rs b/examples/cache.rs index d62e238..845db84 100755 --- a/examples/cache.rs +++ b/examples/cache.rs @@ -2,13 +2,16 @@ extern crate dipstick; +use dipstick::*; +use std::io; use std::thread::sleep; use std::time::Duration; -use std::io; -use dipstick::*; fn main() { - let metrics = Stream::write_to(io::stdout()).cached(5).metrics().named("cache"); + let metrics = Stream::write_to(io::stdout()) + .cached(5) + .metrics() + .named("cache"); loop { // report some ad-hoc metric values from our "application" loop diff --git a/examples/clopwizard.rs b/examples/clopwizard.rs index 7e50901..8961fec 100644 --- a/examples/clopwizard.rs +++ b/examples/clopwizard.rs @@ -3,18 +3,17 @@ extern crate dipstick; -use std::time::Duration; use dipstick::*; use std::thread::sleep; +use std::time::Duration; -metrics!{ +metrics! { APP = "application" => { pub COUNTER: Counter = "counter"; } } fn main() { - let one_minute = AtomicBucket::new(); one_minute.flush_every(Duration::from_secs(60)); diff --git a/examples/custom_publish.rs b/examples/custom_publish.rs index f3666b3..2809867 100755 --- a/examples/custom_publish.rs +++ b/examples/custom_publish.rs @@ -3,8 +3,8 @@ extern crate dipstick; -use std::time::Duration; use dipstick::*; +use std::time::Duration; fn main() { fn custom_statistics( @@ -28,7 +28,7 @@ fn main() { } else { None } - }, + } // scaling the score value and appending unit to name (kind, ScoreType::Sum(sum)) => Some((kind, name.append("per_thousand"), sum / 1000)), diff --git a/examples/graphite.rs b/examples/graphite.rs index 9aa8981..91248ed 100644 --- a/examples/graphite.rs +++ b/examples/graphite.rs @@ -6,11 +6,10 @@ use dipstick::*; use std::time::Duration; fn main() { - let metrics = - Graphite::send_to("localhost:2003") - .expect("Connected") - .named("my_app") - .metrics(); + let metrics = Graphite::send_to("localhost:2003") + .expect("Connected") + .named("my_app") + .metrics(); loop { metrics.counter("counter_a").count(123); diff --git a/examples/multi_input.rs b/examples/multi_input.rs index 1a161a3..3701c24 100644 --- a/examples/multi_input.rs +++ b/examples/multi_input.rs @@ -2,7 +2,7 @@ extern crate dipstick; -use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Prefixed}; +use dipstick::{Graphite, Input, InputScope, MultiInput, Prefixed, Stream}; use std::time::Duration; fn main() { diff --git a/examples/multi_output.rs b/examples/multi_output.rs index 6db8a66..4cf572e 100644 --- a/examples/multi_output.rs +++ b/examples/multi_output.rs @@ -16,11 +16,16 @@ fn main() { let same_type_metrics = MultiOutput::new() .add_target(Stream::to_stderr().named("yeah")) .add_target(Stream::to_stderr().named("ouch")) - .named("both").metrics(); + .named("both") + .metrics(); loop { - different_type_metrics.new_metric("counter_a".into(), InputKind::Counter).write(123, labels![]); - same_type_metrics.new_metric("timer_a".into(), InputKind::Timer).write(6677, labels![]); + different_type_metrics + .new_metric("counter_a".into(), InputKind::Counter) + .write(123, labels![]); + same_type_metrics + .new_metric("timer_a".into(), InputKind::Timer) + .write(6677, labels![]); std::thread::sleep(Duration::from_millis(400)); } } diff --git a/examples/observer.rs b/examples/observer.rs index ede0eb8..7ca87ec 100644 --- a/examples/observer.rs +++ b/examples/observer.rs @@ -15,7 +15,7 @@ extern crate dipstick; -use std::time::{Duration}; +use std::time::Duration; use dipstick::*; @@ -28,7 +28,9 @@ fn main() { metrics.observe(uptime, || 6).on_flush(); let threads = metrics.gauge("threads"); - metrics.observe(threads, thread_count).every(Duration::from_secs(1)); + metrics + .observe(threads, thread_count) + .every(Duration::from_secs(1)); loop { std::thread::sleep(Duration::from_millis(40)); diff --git a/examples/per_metric_sampling.rs b/examples/per_metric_sampling.rs index a25a7b4..9710e16 100755 --- a/examples/per_metric_sampling.rs +++ b/examples/per_metric_sampling.rs @@ -6,12 +6,11 @@ use dipstick::*; use std::time::Duration; fn main() { - let statsd = - Statsd::send_to("localhost:8125") - .expect("Connected") - .named("my_app"); - // Sampling::Full is the default - // .sampled(Sampling::Full); + let statsd = Statsd::send_to("localhost:8125") + .expect("Connected") + .named("my_app"); + // Sampling::Full is the default + // .sampled(Sampling::Full); let unsampled_marker = statsd.metrics().marker("marker_a"); @@ -24,10 +23,10 @@ fn main() { .sampled(Sampling::Random(0.001)) .metrics() .marker("hi_freq_marker"); - + loop { unsampled_marker.mark(); - + for _i in 0..10 { low_freq_marker.mark(); } diff --git a/examples/prometheus.rs b/examples/prometheus.rs index 9adaf0c..49a2e4a 100644 --- a/examples/prometheus.rs +++ b/examples/prometheus.rs @@ -6,11 +6,10 @@ use dipstick::*; use std::time::Duration; fn main() { - let metrics = - Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example") - .expect("Prometheus Socket") - .named("my_app") - .metrics(); + let metrics = Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example") + .expect("Prometheus Socket") + .named("my_app") + .metrics(); loop { metrics.counter("counter_a").count(123); diff --git a/examples/proxy.rs b/examples/proxy.rs index 40f3468..a4fa38b 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -2,10 +2,9 @@ extern crate dipstick; +use dipstick::{Input, InputScope, Prefixed, Proxy, Stream}; use std::thread::sleep; use std::time::Duration; -use dipstick::{Proxy, Stream, InputScope, Input, Prefixed}; - fn main() { let root_proxy = Proxy::default(); @@ -50,5 +49,4 @@ fn main() { println!() } - } diff --git a/examples/raw_log.rs b/examples/raw_log.rs index 8bc6c1b..b7593bf 100755 --- a/examples/raw_log.rs +++ b/examples/raw_log.rs @@ -15,9 +15,6 @@ pub fn raw_write() { let metrics_log = dipstick::Log::to_log().metrics(); // define and send metrics using raw channel API - let counter = metrics_log.new_metric( - "count_a".into(), - dipstick::InputKind::Counter, - ); + let counter = metrics_log.new_metric("count_a".into(), dipstick::InputKind::Counter); counter.write(1, labels![]); } diff --git a/examples/statsd_nosampling.rs b/examples/statsd_nosampling.rs index 968c642..f24140a 100644 --- a/examples/statsd_nosampling.rs +++ b/examples/statsd_nosampling.rs @@ -6,12 +6,11 @@ use dipstick::*; use std::time::Duration; fn main() { - let metrics = - Statsd::send_to("localhost:8125") - .expect("Connected") -// .with_sampling(Sampling::Random(0.2)) - .named("my_app") - .metrics(); + let metrics = Statsd::send_to("localhost:8125") + .expect("Connected") + // .with_sampling(Sampling::Random(0.2)) + .named("my_app") + .metrics(); let counter = metrics.counter("counter_a"); diff --git a/examples/statsd_sampling.rs b/examples/statsd_sampling.rs index 3f6d91d..82bd00c 100755 --- a/examples/statsd_sampling.rs +++ b/examples/statsd_sampling.rs @@ -6,12 +6,11 @@ use dipstick::*; use std::time::Duration; fn main() { - let metrics = - Statsd::send_to("localhost:8125") - .expect("Connected") - .sampled(Sampling::Random(0.2)) - .named("my_app") - .metrics(); + let metrics = Statsd::send_to("localhost:8125") + .expect("Connected") + .sampled(Sampling::Random(0.2)) + .named("my_app") + .metrics(); let counter = metrics.counter("counter_a"); diff --git a/examples/text_format_label.rs b/examples/text_format_label.rs index 727f093..21adbb1 100644 --- a/examples/text_format_label.rs +++ b/examples/text_format_label.rs @@ -2,10 +2,12 @@ extern crate dipstick; +use dipstick::{ + AppLabel, Formatting, Input, InputKind, InputScope, LabelOp, LineFormat, LineOp, LineTemplate, + MetricName, Stream, +}; use std::thread::sleep; use std::time::Duration; -use dipstick::{Stream, InputScope, Input, Formatting, AppLabel, - MetricName, InputKind, LineTemplate, LineFormat, LineOp, LabelOp}; /// Generates template like "$METRIC $value $label_value["abc"]\n" struct MyFormat; @@ -16,21 +18,29 @@ impl LineFormat for MyFormat { LineOp::Literal(format!("{} ", name.join(".")).to_uppercase().into()), LineOp::ValueAsText, LineOp::Literal(" ".into()), - LineOp::LabelExists("abc".into(), - vec![LabelOp::LabelKey, LabelOp::Literal(":".into()), LabelOp::LabelValue], + LineOp::LabelExists( + "abc".into(), + vec![ + LabelOp::LabelKey, + LabelOp::Literal(":".into()), + LabelOp::LabelValue, + ], ), LineOp::NewLine, - ].into() + ] + .into() } } fn main() { - let counter = Stream::to_stderr().formatting(MyFormat).metrics().counter("counter_a"); + let counter = Stream::to_stderr() + .formatting(MyFormat) + .metrics() + .counter("counter_a"); AppLabel::set("abc", "xyz"); loop { // report some metric values from our "application" loop counter.count(11); sleep(Duration::from_millis(500)); } - } diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index dfe007c..ebfed39 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -1,33 +1,37 @@ //! Maintain aggregated metrics for deferred reporting, -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::{MetricName}; -use core::input::{InputKind, InputScope, InputMetric}; -use core::output::{OutputDyn, OutputScope, OutputMetric, Output, output_none}; -use core::clock::TimeHandle; -use core::{MetricValue, Flush}; -use bucket::{ScoreType, stats_summary}; use bucket::ScoreType::*; +use bucket::{stats_summary, ScoreType}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::clock::TimeHandle; use core::error; +use core::input::{InputKind, InputMetric, InputScope}; +use core::name::MetricName; +use core::output::{output_none, Output, OutputDyn, OutputMetric, OutputScope}; +use core::{Flush, MetricValue}; -use std::mem; +use std::collections::BTreeMap; use std::isize; -use std::collections::{BTreeMap}; +use std::mem; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering::*; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; -use std::fmt; use std::borrow::Borrow; +use std::fmt; /// A function type to transform aggregated scores into publishable statistics. -pub type StatsFn = Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static; +pub type StatsFn = + Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + + Send + + Sync + + 'static; fn initial_stats() -> &'static StatsFn { &stats_summary @@ -38,9 +42,10 @@ fn initial_drain() -> Arc { } lazy_static! { - static ref DEFAULT_AGGREGATE_STATS: RwLock> = RwLock::new(Arc::new(initial_stats())); - - static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = RwLock::new(initial_drain()); + static ref DEFAULT_AGGREGATE_STATS: RwLock> = + RwLock::new(Arc::new(initial_stats())); + static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = + RwLock::new(initial_drain()); } /// Central aggregation structure. @@ -54,8 +59,14 @@ pub struct AtomicBucket { struct InnerAtomicBucket { metrics: BTreeMap>, period_start: TimeHandle, - stats: Option Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>, + stats: Option< + Arc< + Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + + Send + + Sync + + 'static, + >, + >, drain: Option>, publish_metadata: bool, } @@ -72,7 +83,6 @@ lazy_static! { } impl InnerAtomicBucket { - fn flush(&mut self) -> error::Result<()> { let pub_scope = match self.drain { Some(ref out) => out.output_dyn(), @@ -85,10 +95,13 @@ impl InnerAtomicBucket { // purge: if bucket is the last owner of the metric, remove it // TODO parameterize whether to keep ad-hoc metrics after publish let mut purged = self.metrics.clone(); - self.metrics.iter() + self.metrics + .iter() .filter(|&(_k, v)| Arc::strong_count(v) == 1) .map(|(k, _v)| k) - .for_each(|k| { purged.remove(k); }); + .for_each(|k| { + purged.remove(k); + }); self.metrics = purged; Ok(()) @@ -98,16 +111,19 @@ impl InnerAtomicBucket { /// Compute stats on captured values using assigned or default stats function. /// Write stats to assigned or default output. fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> { - let now = TimeHandle::now(); let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0; self.period_start = now; - let mut snapshot: Vec<(&MetricName, InputKind, Vec)> = self.metrics.iter() - .flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) { - Some((name, scores.metric_kind(), values)) - } else { - None + let mut snapshot: Vec<(&MetricName, InputKind, Vec)> = self + .metrics + .iter() + .flat_map(|(name, scores)| { + if let Some(values) = scores.reset(duration_seconds) { + Some((name, scores.metric_kind(), values)) + } else { + None + } }) .collect(); @@ -119,7 +135,11 @@ impl InnerAtomicBucket { } else { // TODO add switch for metadata such as PERIOD_LENGTH if self.publish_metadata { - snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as isize)])); + snapshot.push(( + &PERIOD_LENGTH, + InputKind::Timer, + vec![Sum((duration_seconds * 1000.0) as isize)], + )); } let stats_fn = match self.stats { @@ -140,7 +160,6 @@ impl InnerAtomicBucket { target.flush() } } - } impl> From for AtomicBucket { @@ -161,14 +180,17 @@ impl AtomicBucket { drain: None, // TODO add API toggle for metadata publish publish_metadata: false, - })) + })), } } /// Set the default aggregated metrics statistics generator. pub fn default_stats(func: F) - where - F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static + where + F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + + Send + + Sync + + 'static, { *write_lock!(DEFAULT_AGGREGATE_STATS) = Arc::new(func) } @@ -189,18 +211,24 @@ impl AtomicBucket { } /// Set this bucket's statistics generator. - #[deprecated(since="0.7.2", note="Use stats()")] + #[deprecated(since = "0.7.2", note = "Use stats()")] pub fn set_stats(&self, func: F) - where - F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static + where + F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + + Send + + Sync + + 'static, { self.stats(func) } /// Set this bucket's statistics generator. pub fn stats(&self, func: F) - where - F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static + where + F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + + Send + + Sync + + 'static, { write_lock!(self.inner).stats = Some(Arc::new(func)) } @@ -211,7 +239,7 @@ impl AtomicBucket { } /// Set this bucket's aggregated metrics flush output. - #[deprecated(since="0.7.2", note="Use sink()")] + #[deprecated(since = "0.7.2", note = "Use sink()")] pub fn set_drain(&self, new_drain: impl Output + Send + Sync + 'static) { self.drain(new_drain) } @@ -231,7 +259,6 @@ impl AtomicBucket { let mut inner = write_lock!(self.inner); inner.flush_to(publish_scope) } - } impl InputScope for AtomicBucket { @@ -257,8 +284,12 @@ impl Flush for AtomicBucket { } impl WithAttributes for AtomicBucket { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } const HIT: usize = 0; @@ -354,7 +385,6 @@ impl AtomicScores { pub fn reset(&self, duration_seconds: f64) -> Option> { let mut scores = AtomicScores::blank(); if self.snapshot(&mut scores) { - let mut snapshot = Vec::new(); match self.kind { InputKind::Marker => { @@ -421,8 +451,8 @@ fn swap_if(counter: &AtomicIsize, new_value: isize, compare: fn(isize, isize) -> #[cfg(feature = "bench")] mod bench { - use test; use super::*; + use test; #[bench] fn update_marker(b: &mut test::Bencher) { @@ -467,8 +497,8 @@ mod test { use core::clock::{mock_clock_advance, mock_clock_reset}; use output::map::StatsMapScope; - use std::time::Duration; use std::collections::BTreeMap; + use std::time::Duration; fn make_stats(stats_fn: &'static StatsFn) -> BTreeMap { mock_clock_reset(); diff --git a/src/bucket/mod.rs b/src/bucket/mod.rs index 7601a2e..a76a406 100755 --- a/src/bucket/mod.rs +++ b/src/bucket/mod.rs @@ -1,8 +1,8 @@ pub mod atomic; use core::input::InputKind; +use core::name::MetricName; use core::MetricValue; -use core::name::{MetricName}; /// Possibly aggregated scores. #[derive(Debug, Clone, Copy)] @@ -24,16 +24,22 @@ pub enum ScoreType { /// A predefined export strategy reporting all aggregated stats for all metric types. /// Resulting stats are named by appending a short suffix to each metric's name. #[allow(dead_code)] -pub fn stats_all(kind: InputKind, name: MetricName, score: ScoreType) - -> Option<(InputKind, MetricName, MetricValue)> -{ +pub fn stats_all( + kind: InputKind, + name: MetricName, + score: ScoreType, +) -> Option<(InputKind, MetricName, MetricValue)> { match score { ScoreType::Count(hit) => Some((InputKind::Counter, name.make_name("count"), hit)), ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)), ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as MetricValue)), ScoreType::Max(max) => Some((InputKind::Gauge, name.make_name("max"), max)), ScoreType::Min(min) => Some((InputKind::Gauge, name.make_name("min"), min)), - ScoreType::Rate(rate) => Some((InputKind::Gauge, name.make_name("rate"), rate.round() as MetricValue)), + ScoreType::Rate(rate) => Some(( + InputKind::Gauge, + name.make_name("rate"), + rate.round() as MetricValue, + )), } } @@ -42,9 +48,11 @@ pub fn stats_all(kind: InputKind, name: MetricName, score: ScoreType) /// Since there is only one stat per metric, there is no risk of collision /// and so exported stats copy their metric's name. #[allow(dead_code)] -pub fn stats_average(kind: InputKind, name: MetricName, score: ScoreType) - -> Option<(InputKind, MetricName, MetricValue)> -{ +pub fn stats_average( + kind: InputKind, + name: MetricName, + score: ScoreType, +) -> Option<(InputKind, MetricName, MetricValue)> { match kind { InputKind::Marker => match score { ScoreType::Count(count) => Some((InputKind::Counter, name, count)), @@ -64,9 +72,11 @@ pub fn stats_average(kind: InputKind, name: MetricName, score: ScoreType) /// Since there is only one stat per metric, there is no risk of collision /// and so exported stats copy their metric's name. #[allow(dead_code)] -pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType) - -> Option<(InputKind, MetricName, MetricValue)> -{ +pub fn stats_summary( + kind: InputKind, + name: MetricName, + score: ScoreType, +) -> Option<(InputKind, MetricName, MetricValue)> { match kind { InputKind::Marker => match score { ScoreType::Count(count) => Some((InputKind::Counter, name, count)), @@ -81,4 +91,4 @@ pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType) _ => None, }, } -} \ No newline at end of file +} diff --git a/src/cache/cache_in.rs b/src/cache/cache_in.rs index 981df3e..2d2709d 100755 --- a/src/cache/cache_in.rs +++ b/src/cache/cache_in.rs @@ -1,19 +1,19 @@ //! Metric input scope caching. -use core::Flush; -use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn}; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; use cache::lru_cache as lru; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; +use core::name::MetricName; +use core::Flush; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// Wrap an input with a metric definition cache. /// This can provide performance benefits for metrics that are dynamically defined at runtime on each access. @@ -43,14 +43,18 @@ impl InputCache { InputCache { attributes: Attributes::default(), target: Arc::new(target), - cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size))) + cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size))), } } } impl WithAttributes for InputCache { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Input for InputCache { @@ -75,16 +79,18 @@ pub struct InputScopeCache { } impl WithAttributes for InputScopeCache { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl InputScope for InputScopeCache { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - let lookup = { - write_lock!(self.cache).get(&name).cloned() - }; + let lookup = { write_lock!(self.cache).get(&name).cloned() }; lookup.unwrap_or_else(|| { let new_metric = self.target.new_metric(name.clone(), kind); // FIXME (perf) having to take another write lock for a cache miss @@ -95,7 +101,6 @@ impl InputScope for InputScopeCache { } impl Flush for InputScopeCache { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); self.target.flush() diff --git a/src/cache/cache_out.rs b/src/cache/cache_out.rs index 537eb0c..5d2d90b 100755 --- a/src/cache/cache_out.rs +++ b/src/cache/cache_out.rs @@ -1,20 +1,20 @@ //! Metric output scope caching. -use core::Flush; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::output::{Output, OutputMetric, OutputScope, OutputDyn}; -use core::input::InputKind; use cache::lru_cache as lru; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::InputKind; +use core::name::MetricName; +use core::output::{Output, OutputDyn, OutputMetric, OutputScope}; +use core::Flush; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; use std::rc::Rc; @@ -46,14 +46,18 @@ impl OutputCache { OutputCache { attributes: Attributes::default(), target: Arc::new(target), - cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size))) + cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size))), } } } impl WithAttributes for OutputCache { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Output for OutputCache { @@ -78,16 +82,18 @@ pub struct OutputScopeCache { } impl WithAttributes for OutputScopeCache { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl OutputScope for OutputScopeCache { fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { let name = self.prefix_append(name); - let lookup = { - write_lock!(self.cache).get(&name).cloned() - }; + let lookup = { write_lock!(self.cache).get(&name).cloned() }; lookup.unwrap_or_else(|| { let new_metric = self.target.new_metric(name.clone(), kind); // FIXME (perf) having to take another write lock for a cache miss @@ -98,10 +104,8 @@ impl OutputScope for OutputScopeCache { } impl Flush for OutputScopeCache { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); self.target.flush() } } - diff --git a/src/cache/lru_cache.rs b/src/cache/lru_cache.rs index c27799a..b65118a 100755 --- a/src/cache/lru_cache.rs +++ b/src/cache/lru_cache.rs @@ -2,8 +2,8 @@ //! Stored values will be held onto as long as there is space. //! When space runs out, the oldest unused value will get evicted to make room for a new value. -use std::hash::Hash; use std::collections::HashMap; +use std::hash::Hash; struct CacheEntry { key: K, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index a19958e..e4933eb 100755 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,3 +1,3 @@ -pub mod lru_cache; -pub mod cache_out; pub mod cache_in; +pub mod cache_out; +pub mod lru_cache; diff --git a/src/core/attributes.rs b/src/core/attributes.rs index 021bbd7..2afcd5e 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -1,21 +1,20 @@ -use std::sync::{Arc}; -use std::collections::{HashMap}; +use std::collections::HashMap; use std::default::Default; +use std::sync::Arc; +use core::name::{MetricName, NameParts}; use core::scheduler::SCHEDULER; -use core::name::{NameParts, MetricName}; -use ::{Flush, CancelHandle}; use std::fmt; use std::time::Duration; -use ::{InputScope, Gauge}; use MetricValue; +use {CancelHandle, Flush}; +use {Gauge, InputScope}; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; - -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method. #[derive(Debug, Clone, Copy)] @@ -28,7 +27,7 @@ pub enum Sampling { /// - 1.0+ records everything /// - 0.5 records one of two values /// - 0.0 records nothing - Random(f64) + Random(f64), } impl Default for Sampling { @@ -102,7 +101,10 @@ pub trait OnFlush { fn notify_flush_listeners(&self); } -impl OnFlush for T where T: Flush + WithAttributes { +impl OnFlush for T +where + T: Flush + WithAttributes, +{ fn notify_flush_listeners(&self) { for listener in read_lock!(self.get_attributes().flush_listeners).iter() { (listener)() @@ -117,16 +119,18 @@ pub struct ObserveWhen<'a, T, F> { } impl<'a, T, F> ObserveWhen<'a, T, F> - where F: Fn() -> MetricValue + Send + Sync + 'static, - T: InputScope + WithAttributes + Send + Sync, +where + F: Fn() -> MetricValue + Send + Sync + 'static, + T: InputScope + WithAttributes + Send + Sync, { pub fn on_flush(self) { let gauge = self.gauge; let op = self.operation; - write_lock!(self.target.mut_attributes().flush_listeners).push(Arc::new(move || gauge.value(op()))); + write_lock!(self.target.mut_attributes().flush_listeners) + .push(Arc::new(move || gauge.value(op()))); } - pub fn every(self, period: Duration,) -> CancelHandle { + pub fn every(self, period: Duration) -> CancelHandle { let gauge = self.gauge; let op = self.operation; let handle = SCHEDULER.schedule(period, move || gauge.value(op())); @@ -137,17 +141,19 @@ impl<'a, T, F> ObserveWhen<'a, T, F> /// Schedule a recurring task pub trait Observe { - /// Schedule a recurring task. /// The returned handle can be used to cancel the task. fn observe(&mut self, gauge: Gauge, operation: F) -> ObserveWhen - where F: Fn() -> MetricValue + Send + Sync + 'static, Self: Sized; - + where + F: Fn() -> MetricValue + Send + Sync + 'static, + Self: Sized; } impl Observe for T { fn observe(&mut self, gauge: Gauge, operation: F) -> ObserveWhen - where F: Fn() -> MetricValue + Send + Sync + 'static, Self: Sized + where + F: Fn() -> MetricValue + Send + Sync + 'static, + Self: Sized, { ObserveWhen { target: self, @@ -173,7 +179,7 @@ pub trait Prefixed { /// Append a name to the existing names. /// Return a clone of the component with the updated names. - #[deprecated(since="0.7.2", note="Use named() or add_name()")] + #[deprecated(since = "0.7.2", note = "Use named() or add_name()")] fn add_prefix>(&self, name: S) -> Self; /// Append a name to the existing names. @@ -203,11 +209,9 @@ pub trait Label { /// Join namespace and prepend in newly defined metrics. fn label(&self, name: &str) -> Self; - } impl Prefixed for T { - /// Returns namespace of component. fn get_prefixes(&self) -> &NameParts { &self.get_attributes().naming @@ -233,7 +237,6 @@ impl Prefixed for T { let parts = NameParts::from(name); self.with_attributes(|new_attr| new_attr.naming = parts.clone()) } - } /// Apply statistical sampling to collected metrics data. @@ -272,11 +275,11 @@ pub trait Buffered: WithAttributes { #[cfg(test)] mod test { - use output::map::StatsMap; use core::attributes::*; + use core::input::Input; use core::input::*; use core::Flush; - use core::input::Input; + use output::map::StatsMap; use StatsMapScope; #[test] @@ -287,4 +290,4 @@ mod test { metrics.flush().unwrap(); assert_eq!(Some(&4), metrics.into_map().get("my_gauge")) } -} \ No newline at end of file +} diff --git a/src/core/clock.rs b/src/core/clock.rs index 627b860..853d9a5 100755 --- a/src/core/clock.rs +++ b/src/core/clock.rs @@ -52,7 +52,6 @@ pub fn mock_clock_reset() { }) } - /// Advance the mock clock by a certain amount of time. /// Enables writing reproducible metrics tests in combination with #mock_clock_reset() /// Should be after metrics have been produced but before they are published. @@ -75,7 +74,5 @@ fn now() -> Instant { /// thread::sleep will have no effect on metrics. /// Use advance_time() to simulate passing time. fn now() -> Instant { - MOCK_CLOCK.with(|now| { - *now.borrow() - }) + MOCK_CLOCK.with(|now| *now.borrow()) } diff --git a/src/core/error.rs b/src/core/error.rs index 9af7378..9d4a21a 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -1,6 +1,5 @@ -use std::result; use std::error; +use std::result; /// Just put any error in a box. pub type Result = result::Result>; - diff --git a/src/core/input.rs b/src/core/input.rs index 8bcc3d7..04468d8 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -1,14 +1,14 @@ use core::clock::TimeHandle; -use core::{MetricValue, Flush}; -use core::name::MetricName; use core::label::Labels; +use core::name::MetricName; +use core::{Flush, MetricValue}; -use std::sync::Arc; use std::fmt; +use std::sync::Arc; // TODO maybe define an 'AsValue' trait + impl for supported number types, then drop 'num' crate -pub use num::{ToPrimitive}; pub use num::integer; +pub use num::ToPrimitive; /// A function trait that opens a new metric capture scope. pub trait Input: Send + Sync + 'static + InputDyn { @@ -19,7 +19,7 @@ pub trait Input: Send + Sync + 'static + InputDyn { fn metrics(&self) -> Self::SCOPE; /// Open a new scope from this input. - #[deprecated(since="0.7.2", note="Use metrics()")] + #[deprecated(since = "0.7.2", note = "Use metrics()")] fn input(&self) -> Self::SCOPE { self.metrics() } @@ -74,7 +74,7 @@ pub trait InputScope: Flush { /// A metric is actually a function that knows to write a metric value to a metric output. #[derive(Clone)] pub struct InputMetric { - inner: Arc + inner: Arc, } impl fmt::Debug for InputMetric { @@ -86,7 +86,9 @@ impl fmt::Debug for InputMetric { impl InputMetric { /// Utility constructor pub fn new(metric: F) -> InputMetric { - InputMetric { inner: Arc::new(metric) } + InputMetric { + inner: Arc::new(metric), + } } /// Collect a new value for this metric. @@ -120,7 +122,7 @@ impl<'a> From<&'a str> for InputKind { "Gauge" => InputKind::Gauge, "Timer" => InputKind::Timer, "Level" => InputKind::Level, - _ => panic!("No InputKind '{}' defined", s) + _ => panic!("No InputKind '{}' defined", s), } } } @@ -187,7 +189,6 @@ impl Gauge { pub fn value(&self, value: V) { self.inner.write(value.to_isize().unwrap(), labels![]) } - } /// A timer that sends values to the metrics backend diff --git a/src/core/label.rs b/src/core/label.rs index 5823459..a4b2d87 100644 --- a/src/core/label.rs +++ b/src/core/label.rs @@ -1,13 +1,13 @@ -use std::collections::{HashMap}; use std::cell::RefCell; +use std::collections::HashMap; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// Label values are immutable but can move around a lot. type LabelValue = Arc; @@ -18,7 +18,7 @@ type LabelValue = Arc; /// All write operations return a mutated clone of the original. #[derive(Debug, Clone, Default)] struct LabelScope { - pairs: Option>> + pairs: Option>>, } impl LabelScope { @@ -26,11 +26,13 @@ impl LabelScope { fn set(&self, key: String, value: LabelValue) -> Self { let mut new_pairs = match self.pairs { None => HashMap::new(), - Some(ref old_pairs) => old_pairs.as_ref().clone() + Some(ref old_pairs) => old_pairs.as_ref().clone(), }; new_pairs.insert(key, value); - LabelScope { pairs: Some(Arc::new(new_pairs)) } + LabelScope { + pairs: Some(Arc::new(new_pairs)), + } } fn unset(&self, key: &str) -> Self { @@ -42,7 +44,9 @@ impl LabelScope { if new_pairs.is_empty() { LabelScope { pairs: None } } else { - LabelScope { pairs: Some(Arc::new(new_pairs)) } + LabelScope { + pairs: Some(Arc::new(new_pairs)), + } } } else { // key wasn't set, labels unchanged @@ -56,7 +60,7 @@ impl LabelScope { // FIXME should use .and_then(), how? match &self.pairs { None => None, - Some(pairs) => pairs.get(key).cloned() + Some(pairs) => pairs.get(key).cloned(), } } @@ -67,9 +71,9 @@ impl LabelScope { } } -lazy_static!( +lazy_static! { static ref APP_LABELS: RwLock = RwLock::new(LabelScope::default()); -); +} thread_local! { static THREAD_LABELS: RefCell = RefCell::new(LabelScope::default()); @@ -104,9 +108,7 @@ impl ThreadLabel { } fn collect(map: &mut HashMap) { - THREAD_LABELS.with(|mop| { - mop.borrow().collect(map) - }); + THREAD_LABELS.with(|mop| mop.borrow().collect(map)); } } @@ -139,7 +141,6 @@ impl AppLabel { } } - /// Base structure to carry metric labels from the application to the metric backend(s). /// Can carry both one-off labels and exported context labels (if async metrics are enabled). /// Used in applications through the labels!() macro. @@ -152,8 +153,8 @@ impl From> for Labels { fn from(map: HashMap) -> Self { Labels { scopes: vec![LabelScope { - pairs: Some(Arc::new(map)) - }] + pairs: Some(Arc::new(map)), + }], } } } @@ -168,10 +169,10 @@ impl Default for Labels { } impl Labels { - /// Used to save metric context before enqueuing value for async output. pub fn save_context(&mut self) { - self.scopes.push(THREAD_LABELS.with(|map| map.borrow().clone())); + self.scopes + .push(THREAD_LABELS.with(|map| map.borrow().clone())); self.scopes.push(read_lock!(APP_LABELS).clone()); } @@ -179,7 +180,6 @@ impl Labels { /// Searches provided labels, provided scopes or default scopes. // TODO needs less magic, add checks? pub fn lookup(&self, key: &str) -> Option { - fn lookup_current_context(key: &str) -> Option { ThreadLabel::get(key).or_else(|| AppLabel::get(key)) } @@ -191,14 +191,16 @@ impl Labels { // some value labels, no saved context labels // lookup value label, then lookup implicit context - 1 => self.scopes[0].get(key).or_else(|| lookup_current_context(key)), + 1 => self.scopes[0] + .get(key) + .or_else(|| lookup_current_context(key)), // value + saved context labels // lookup explicit context in turn _ => { for src in &self.scopes { if let Some(label_value) = src.get(key) { - return Some(label_value) + return Some(label_value); } } None @@ -225,7 +227,7 @@ impl Labels { AppLabel::collect(&mut map); ThreadLabel::collect(&mut map); self.scopes[0].collect(&mut map); - }, + } // value + saved context labels // lookup explicit context in turn @@ -240,14 +242,13 @@ impl Labels { } } - #[cfg(test)] pub mod test { use super::*; use std::sync::Mutex; - lazy_static!{ + lazy_static! { /// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently. /// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite. /// Instead we use a local mutex to make sure the label tests run in sequence. @@ -261,10 +262,16 @@ pub mod test { AppLabel::set("abc", "456"); ThreadLabel::set("abc", "123"); - assert_eq!(Arc::new("123".into()), labels!().lookup("abc").expect("ThreadLabel Value")); + assert_eq!( + Arc::new("123".into()), + labels!().lookup("abc").expect("ThreadLabel Value") + ); ThreadLabel::unset("abc"); - assert_eq!(Arc::new("456".into()), labels!().lookup("abc").expect("AppLabel Value")); + assert_eq!( + Arc::new("456".into()), + labels!().lookup("abc").expect("AppLabel Value") + ); AppLabel::unset("abc"); assert_eq!(true, labels!().lookup("abc").is_none()); @@ -274,27 +281,41 @@ pub mod test { fn labels_macro() { let _lock = TEST_SEQUENCE.lock().expect("Test Sequence"); - let labels = labels!{ + let labels = labels! { "abc" => "789", "xyz" => "123" }; - assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value")); - assert_eq!(Arc::new("123".into()), labels.lookup("xyz").expect("Label Value")); + assert_eq!( + Arc::new("789".into()), + labels.lookup("abc").expect("Label Value") + ); + assert_eq!( + Arc::new("123".into()), + labels.lookup("xyz").expect("Label Value") + ); } - #[test] fn value_labels() { let _lock = TEST_SEQUENCE.lock().expect("Test Sequence"); - let labels = labels!{ "abc" => "789" }; - assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value")); + let labels = labels! { "abc" => "789" }; + assert_eq!( + Arc::new("789".into()), + labels.lookup("abc").expect("Label Value") + ); AppLabel::set("abc", "456"); - assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value")); + assert_eq!( + Arc::new("789".into()), + labels.lookup("abc").expect("Label Value") + ); ThreadLabel::set("abc", "123"); - assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value")); + assert_eq!( + Arc::new("789".into()), + labels.lookup("abc").expect("Label Value") + ); } } diff --git a/src/core/locking.rs b/src/core/locking.rs index c054ed1..b5b1b9d 100755 --- a/src/core/locking.rs +++ b/src/core/locking.rs @@ -2,43 +2,49 @@ //! This makes all outputs also immediately usable as inputs. //! The alternatives are queuing or thread local. -use core::input::{InputScope, InputMetric, Input, InputKind}; -use core::output::{Output, OutputScope}; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::Flush; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{Input, InputKind, InputMetric, InputScope}; +use core::name::MetricName; +use core::output::{Output, OutputScope}; +use core::Flush; use std::rc::Rc; -use std::sync::{Arc, Mutex}; use std::ops; +use std::sync::{Arc, Mutex}; /// Synchronous thread-safety for metric output using basic locking. #[derive(Clone)] pub struct LockingOutput { attributes: Attributes, - inner: Arc> + inner: Arc>, } impl WithAttributes for LockingOutput { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl InputScope for LockingOutput { - fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); // lock when creating metrics - let raw_metric = self.inner.lock().expect("LockingOutput").new_metric(name, kind); + let raw_metric = self + .inner + .lock() + .expect("LockingOutput") + .new_metric(name, kind); let mutex = self.inner.clone(); InputMetric::new(move |value, labels| { // lock when collecting values let _guard = mutex.lock().expect("LockingOutput"); raw_metric.write(value, labels) - } ) + }) } - } impl Flush for LockingOutput { @@ -54,7 +60,7 @@ impl Input for T { fn metrics(&self) -> Self::SCOPE { LockingOutput { attributes: Attributes::default(), - inner: Arc::new(Mutex::new(LockedOutputScope(self.output_dyn()))) + inner: Arc::new(Mutex::new(LockedOutputScope(self.output_dyn()))), } } } @@ -62,7 +68,7 @@ impl Input for T { /// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads. /// Obviously, it should only still be used from a single thread at a time or dragons may occur. #[derive(Clone)] -struct LockedOutputScope(Rc ); +struct LockedOutputScope(Rc); impl ops::Deref for LockedOutputScope { type Target = OutputScope + 'static; @@ -73,4 +79,3 @@ impl ops::Deref for LockedOutputScope { unsafe impl Send for LockedOutputScope {} unsafe impl Sync for LockedOutputScope {} - diff --git a/src/core/metrics.rs b/src/core/metrics.rs index 8d20b33..a45ebda 100755 --- a/src/core/metrics.rs +++ b/src/core/metrics.rs @@ -2,11 +2,11 @@ //! Because the possibly high volume of data, this is pre-set to use aggregation. //! This is also kept in a separate module because it is not to be exposed outside of the crate. -use core::input::{Marker, InputScope, Counter}; use core::attributes::Prefixed; +use core::input::{Counter, InputScope, Marker}; use core::proxy::Proxy; -metrics!{ +metrics! { /// Dipstick's own internal metrics. pub DIPSTICK_METRICS = "dipstick" => { diff --git a/src/core/mod.rs b/src/core/mod.rs index 7de2996..52a057b 100755 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,16 +1,16 @@ -pub mod error; -pub mod name; pub mod attributes; -pub mod input; -pub mod output; -pub mod locking; pub mod clock; -pub mod void; -pub mod proxy; +pub mod error; +pub mod input; pub mod label; -pub mod pcg32; -pub mod scheduler; +pub mod locking; pub mod metrics; +pub mod name; +pub mod output; +pub mod pcg32; +pub mod proxy; +pub mod scheduler; +pub mod void; /// Base type for recorded metric values. pub type MetricValue = isize; @@ -19,13 +19,12 @@ pub type MetricValue = isize; pub trait Flush { /// Flush does nothing by default. fn flush(&self) -> error::Result<()>; - } #[cfg(test)] pub mod test { - use super::*; use super::input::*; + use super::*; #[test] fn test_to_void() { @@ -38,9 +37,9 @@ pub mod test { #[cfg(feature = "bench")] pub mod bench { - use super::input::*; - use super::clock::*; use super::super::bucket::atomic::*; + use super::clock::*; + use super::input::*; use test; #[bench] diff --git a/src/core/name.rs b/src/core/name.rs index d021dc7..1183d18 100644 --- a/src/core/name.rs +++ b/src/core/name.rs @@ -1,5 +1,5 @@ -use std::ops::{Deref,DerefMut}; -use std::collections::{VecDeque}; +use std::collections::VecDeque; +use std::ops::{Deref, DerefMut}; /// A double-ended vec of strings constituting a metric name or a future part thereof. #[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)] @@ -10,18 +10,17 @@ pub struct NameParts { } impl NameParts { - /// Returns true if this instance is equal to or a subset (more specific) of the target instance. /// e.g. `a.b.c` is within `a.b` /// e.g. `a.d.c` is not within `a.b` pub fn is_within(&self, other: &NameParts) -> bool { // quick check: if this name has less parts it cannot be equal or more specific if self.len() < other.nodes.len() { - return false + return false; } for (i, part) in other.nodes.iter().enumerate() { if part != &self.nodes[i] { - return false + return false; } } true @@ -75,20 +74,20 @@ pub struct MetricName { } impl MetricName { - /// Prepend to the existing namespace. pub fn prepend>(mut self, namespace: S) -> Self { - let parts: NameParts = namespace.into(); - parts.iter().rev().for_each(|node| - self.nodes.push_front(node.clone()) - ); + let parts: NameParts = namespace.into(); + parts + .iter() + .rev() + .for_each(|node| self.nodes.push_front(node.clone())); self } /// Append to the existing namespace. pub fn append>(mut self, namespace: S) -> Self { let offset = self.nodes.len() - 1; - let parts: NameParts = namespace.into(); + let parts: NameParts = namespace.into(); for (i, part) in parts.iter().enumerate() { self.nodes.insert(i + offset, part.clone()) } @@ -97,13 +96,19 @@ impl MetricName { /// Combine name parts into a string. pub fn join(&self, separator: &str) -> String { - self.nodes.iter().map(|s| &**s).collect::>().join(separator) + self.nodes + .iter() + .map(|s| &**s) + .collect::>() + .join(separator) } } impl> From for MetricName { fn from(name: S) -> Self { - MetricName { nodes: NameParts::from(name) } + MetricName { + nodes: NameParts::from(name), + } } } @@ -120,7 +125,6 @@ impl DerefMut for MetricName { } } - #[cfg(test)] mod test { @@ -147,4 +151,4 @@ mod test { assert_eq!(false, sd1.is_within(&sd2)); } -} \ No newline at end of file +} diff --git a/src/core/output.rs b/src/core/output.rs index c9027bd..c7a7ab8 100755 --- a/src/core/output.rs +++ b/src/core/output.rs @@ -1,29 +1,29 @@ -use core::{Flush, MetricValue}; use core::input::InputKind; +use core::label::Labels; use core::name::MetricName; use core::void::Void; -use core::label::Labels; +use core::{Flush, MetricValue}; use std::rc::Rc; /// Define metrics, write values and flush them. pub trait OutputScope: Flush { - /// Define a raw metric of the specified type. fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric; - } /// Output metrics are not thread safe. #[derive(Clone)] pub struct OutputMetric { - inner: Rc + inner: Rc, } impl OutputMetric { /// Utility constructor pub fn new(metric: F) -> OutputMetric { - OutputMetric { inner: Rc::new(metric) } + OutputMetric { + inner: Rc::new(metric), + } } /// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form. @@ -34,7 +34,6 @@ impl OutputMetric { } } - /// A function trait that opens a new metric capture scope. pub trait Output: Send + Sync + 'static + OutputDyn { /// The type of Scope returned byt this output. @@ -44,7 +43,7 @@ pub trait Output: Send + Sync + 'static + OutputDyn { fn new_scope(&self) -> Self::SCOPE; /// Open a new scope for this output. - #[deprecated(since="0.7.2", note="Use new_scope()")] + #[deprecated(since = "0.7.2", note = "Use new_scope()")] fn output(&self) -> Self::SCOPE { self.new_scope() } @@ -66,4 +65,4 @@ impl OutputDyn for T { /// Discard all metric values sent to it. pub fn output_none() -> Void { Void {} -} \ No newline at end of file +} diff --git a/src/core/pcg32.rs b/src/core/pcg32.rs index aa469b9..b5230a0 100644 --- a/src/core/pcg32.rs +++ b/src/core/pcg32.rs @@ -8,7 +8,8 @@ use time; fn seed() -> u64 { let seed = 5573589319906701683_u64; - let seed = seed.wrapping_mul(6364136223846793005) + let seed = seed + .wrapping_mul(6364136223846793005) .wrapping_add(1442695040888963407) .wrapping_add(time::precise_time_ns()); seed.wrapping_mul(6364136223846793005) diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 00867a0..96523a0 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -1,21 +1,21 @@ //! Decouple metric definition from configuration with trait objects. -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::{MetricName, NameParts}; -use core::Flush; -use core::input::{InputKind, InputMetric, InputScope}; -use core::void::VOID_INPUT; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{InputKind, InputMetric, InputScope}; +use core::name::{MetricName, NameParts}; +use core::void::VOID_INPUT; +use core::Flush; -use std::collections::{HashMap, BTreeMap}; -use std::sync::{Arc, Weak}; +use std::collections::{BTreeMap, HashMap}; use std::fmt; +use std::sync::{Arc, Weak}; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; use atomic_refcell::*; @@ -67,7 +67,6 @@ impl Default for Proxy { } } - struct InnerProxy { // namespaces can target one, many or no metrics targets: HashMap>, @@ -83,7 +82,6 @@ impl fmt::Debug for InnerProxy { } impl InnerProxy { - fn new() -> Self { Self { targets: HashMap::new(), @@ -97,10 +95,14 @@ impl InnerProxy { for (metric_name, metric) in self.metrics.range_mut(namespace.clone()..) { if let Some(metric) = metric.upgrade() { // check for range end - if !metric_name.is_within(namespace) { break } + if !metric_name.is_within(namespace) { + break; + } // check if metric targeted by _lower_ namespace - if metric.target.borrow().1 > namespace.len() { continue } + if metric.target.borrow().1 > namespace.len() { + continue; + } let target_metric = target_scope.new_metric(metric.name.short(), metric.kind); *metric.target.borrow_mut() = (target_metric, namespace.len()); @@ -108,7 +110,10 @@ impl InnerProxy { } } - fn get_effective_target(&self, namespace: &NameParts) -> Option<(Arc, usize)> { + fn get_effective_target( + &self, + namespace: &NameParts, + ) -> Option<(Arc, usize)> { if let Some(target) = self.targets.get(namespace) { return Some((target.clone(), namespace.len())); } @@ -117,7 +122,7 @@ impl InnerProxy { let mut name = namespace.clone(); while let Some(_popped) = name.pop_back() { if let Some(target) = self.targets.get(&name) { - return Some((target.clone(), name.len())) + return Some((target.clone(), name.len())); } } None @@ -126,20 +131,25 @@ impl InnerProxy { fn unset_target(&mut self, namespace: &NameParts) { if self.targets.remove(namespace).is_none() { // nothing to do - return + return; } - let (up_target, up_nslen) = self.get_effective_target(namespace) + let (up_target, up_nslen) = self + .get_effective_target(namespace) .unwrap_or_else(|| (VOID_INPUT.input_dyn(), 0)); // update all affected metrics to next upper targeted namespace for (name, metric) in self.metrics.range_mut(namespace.clone()..) { // check for range end - if !name.is_within(namespace) { break } + if !name.is_within(namespace) { + break; + } if let Some(mut metric) = metric.upgrade() { // check if metric targeted by _lower_ namespace - if metric.target.borrow().1 > namespace.len() { continue } + if metric.target.borrow().1 > namespace.len() { + continue; + } let new_metric = up_target.new_metric(name.short(), metric.kind); *metric.target.borrow_mut() = (new_metric, up_nslen); @@ -160,11 +170,9 @@ impl InnerProxy { Ok(()) } } - } impl Proxy { - /// Create a new "private" metric proxy root. This is usually not what you want. /// Since this proxy will not be part of the standard proxy tree, /// it will need to be configured independently and since downstream code may not know about @@ -178,7 +186,7 @@ impl Proxy { } /// Replace target for this proxy and its children. - #[deprecated(since="0.7.2", note="Use target()")] + #[deprecated(since = "0.7.2", note = "Use target()")] pub fn set_target(&self, target: T) { self.target(target) } @@ -194,7 +202,7 @@ impl Proxy { } /// Install a new default target for all proxies. - #[deprecated(since="0.7.2", note="Use default_target()")] + #[deprecated(since = "0.7.2", note = "Use default_target()")] pub fn set_default_target(target: T) { Self::default_target(target) } @@ -208,7 +216,6 @@ impl Proxy { pub fn unset_default_target(&self) { ROOT_PROXY.unset_target() } - } impl> From for Proxy { @@ -231,7 +238,8 @@ impl InputScope for Proxy { let namespace = &*name; { // not found, define new - let (target, target_namespace_length) = inner.get_effective_target(namespace) + let (target, target_namespace_length) = inner + .get_effective_target(namespace) .unwrap_or_else(|| (VOID_INPUT.input_dyn(), 0)); let metric_object = target.new_metric(namespace.short(), kind); let proxy = Arc::new(ProxyMetric { @@ -240,7 +248,9 @@ impl InputScope for Proxy { target: AtomicRefCell::new((metric_object, target_namespace_length)), proxy: self.inner.clone(), }); - inner.metrics.insert(namespace.clone(), Arc::downgrade(&proxy)); + inner + .metrics + .insert(namespace.clone(), Arc::downgrade(&proxy)); proxy } }); @@ -249,7 +259,6 @@ impl InputScope for Proxy { } impl Flush for Proxy { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); write_lock!(self.inner).flush(self.get_prefixes()) @@ -257,16 +266,20 @@ impl Flush for Proxy { } impl WithAttributes for Proxy { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } #[cfg(feature = "bench")] mod bench { use super::*; - use test; use bucket::atomic::AtomicBucket; + use test; #[bench] fn proxy_marker_to_aggregate(b: &mut test::Bencher) { diff --git a/src/core/scheduler.rs b/src/core/scheduler.rs index d852a1c..78ffc97 100644 --- a/src/core/scheduler.rs +++ b/src/core/scheduler.rs @@ -2,13 +2,13 @@ use core::input::InputScope; -use std::time::{Duration, Instant}; -use std::sync::{Arc, Condvar, Mutex}; -use std::sync::atomic::{AtomicBool}; +use std::cmp::{max, Ordering}; +use std::collections::BinaryHeap; +use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::SeqCst; -use std::collections::{BinaryHeap}; -use std::cmp::{Ordering, max}; -use std::thread::{self}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; /// A handle to cancel a scheduled task if required. #[derive(Debug, Clone)] @@ -89,7 +89,8 @@ pub static MIN_DELAY: Duration = Duration::from_millis(50); impl Scheduler { /// Launch a new scheduler thread. fn new() -> Self { - let sched: Arc<(Mutex>, Condvar)> = Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())); + let sched: Arc<(Mutex>, Condvar)> = + Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())); let sched1 = Arc::downgrade(&sched); thread::Builder::new() @@ -106,31 +107,29 @@ impl Scheduler { Some(task) if task.next_time > now => { // next task is not ready yet, update schedule wait_for = max(MIN_DELAY, task.next_time - now); - break 'work + break 'work; } None => { // TODO no tasks left. exit thread? - break 'work - }, + break 'work; + } _ => {} } if let Some(mut task) = tasks.pop() { if task.handle.is_cancelled() { // do not execute, do not reinsert - continue + continue; } (task.operation)(); task.next_time = now + task.period; tasks.push(task); } - }; + } } }) .unwrap(); - Scheduler { - next_tasks: sched - } + Scheduler { next_tasks: sched } } #[cfg(test)] @@ -140,7 +139,9 @@ impl Scheduler { /// Schedule a task to run periodically. pub fn schedule(&self, period: Duration, operation: F) -> CancelHandle - where F: Fn() -> () + Send + Sync + 'static { + where + F: Fn() -> () + Send + Sync + 'static, + { let handle = CancelHandle::new(); let new_task = ScheduledTask { next_time: Instant::now() + period, @@ -157,7 +158,7 @@ impl Scheduler { #[cfg(test)] pub mod test { use super::*; - use std::sync::atomic::{AtomicUsize}; + use std::sync::atomic::AtomicUsize; #[test] fn schedule_one_and_cancel() { @@ -166,7 +167,9 @@ pub mod test { let sched = Scheduler::new(); - let handle1 = sched.schedule(Duration::from_millis(50), move || {trig1b.fetch_add(1, SeqCst);}); + let handle1 = sched.schedule(Duration::from_millis(50), move || { + trig1b.fetch_add(1, SeqCst); + }); assert_eq!(sched.task_count(), 1); thread::sleep(Duration::from_millis(170)); assert_eq!(3, trig1a.load(SeqCst)); @@ -241,4 +244,3 @@ pub mod test { handle2.cancel(); } } - diff --git a/src/core/void.rs b/src/core/void.rs index 765f2d7..f0ba48d 100755 --- a/src/core/void.rs +++ b/src/core/void.rs @@ -1,10 +1,10 @@ -use core::output::{Output, OutputScope, OutputMetric}; +use core::input::{InputDyn, InputKind, InputScope}; use core::name::MetricName; -use core::input::{InputKind, InputDyn, InputScope}; +use core::output::{Output, OutputMetric, OutputScope}; use core::Flush; -use std::sync::Arc; use std::error::Error; +use std::sync::Arc; lazy_static! { /// The reference instance identifying an uninitialized metric config. @@ -18,14 +18,13 @@ lazy_static! { #[derive(Clone)] pub struct Void {} - /// Discard metrics output. #[derive(Clone)] pub struct VoidOutput {} impl Void { /// Void metrics builder. - #[deprecated(since="0.7.2", note="Use new()")] + #[deprecated(since = "0.7.2", note = "Use new()")] pub fn metrics() -> Self { Self::new() } diff --git a/src/lib.rs b/src/lib.rs index 6488872..8686556 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,14 @@ //! A quick, modular metrics toolkit for Rust applications. #![cfg_attr(feature = "bench", feature(test))] -#![warn(missing_docs, trivial_casts, trivial_numeric_casts, unused_extern_crates, - unused_qualifications)] -#![recursion_limit="32"] +#![warn( + missing_docs, + trivial_casts, + trivial_numeric_casts, + unused_extern_crates, + unused_qualifications +)] +#![recursion_limit = "32"] #[cfg(feature = "bench")] extern crate test; @@ -19,48 +24,58 @@ extern crate num; // FIXME required only for pcg32 seed (for sampling) extern crate time; -#[cfg(feature="crossbeam-channel")] +#[cfg(feature = "crossbeam-channel")] extern crate crossbeam_channel; -#[cfg(feature="parking_lot")] +#[cfg(feature = "parking_lot")] extern crate parking_lot; #[macro_use] mod macros; pub use macros::*; -#[cfg(not(feature="parking_lot"))] +#[cfg(not(feature = "parking_lot"))] macro_rules! write_lock { - ($WUT:expr) => { $WUT.write().unwrap() }; + ($WUT:expr) => { + $WUT.write().unwrap() + }; } -#[cfg(feature="parking_lot")] +#[cfg(feature = "parking_lot")] macro_rules! write_lock { - ($WUT:expr) => { $WUT.write() }; + ($WUT:expr) => { + $WUT.write() + }; } -#[cfg(not(feature="parking_lot"))] +#[cfg(not(feature = "parking_lot"))] macro_rules! read_lock { - ($WUT:expr) => { $WUT.read().unwrap() }; + ($WUT:expr) => { + $WUT.read().unwrap() + }; } -#[cfg(feature="parking_lot")] +#[cfg(feature = "parking_lot")] macro_rules! read_lock { - ($WUT:expr) => { $WUT.read() }; + ($WUT:expr) => { + $WUT.read() + }; } mod core; -pub use core::{Flush, MetricValue}; -pub use core::attributes::{Prefixed, Sampling, Sampled, Buffered, Buffering, OnFlush, Observe}; -pub use core::name::{MetricName, NameParts}; -pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, Level, InputKind}; -pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric}; -pub use core::scheduler::{ScheduleFlush, CancelHandle}; +pub use core::attributes::{Buffered, Buffering, Observe, OnFlush, Prefixed, Sampled, Sampling}; +pub use core::clock::TimeHandle; +pub use core::error::Result; +pub use core::input::{ + Counter, Gauge, Input, InputDyn, InputKind, InputMetric, InputScope, Level, Marker, Timer, +}; +pub use core::label::{AppLabel, Labels, ThreadLabel}; pub use core::locking::LockingOutput; -pub use core::error::{Result}; -pub use core::void::{Void}; -pub use core::clock::{TimeHandle}; -pub use core::label::{Labels, AppLabel, ThreadLabel}; +pub use core::name::{MetricName, NameParts}; +pub use core::output::{Output, OutputDyn, OutputMetric, OutputScope}; +pub use core::scheduler::{CancelHandle, ScheduleFlush}; +pub use core::void::Void; +pub use core::{Flush, MetricValue}; #[cfg(test)] pub use core::clock::{mock_clock_advance, mock_clock_reset}; @@ -68,19 +83,19 @@ pub use core::clock::{mock_clock_advance, mock_clock_reset}; pub use core::proxy::Proxy; mod output; -pub use output::format::{LineFormat, SimpleFormat, LineOp, LabelOp, LineTemplate, Formatting}; -pub use output::stream::{Stream, TextScope}; -pub use output::graphite::{Graphite, GraphiteScope, GraphiteMetric}; -pub use output::statsd::{Statsd, StatsdScope, StatsdMetric}; -pub use output::map::StatsMapScope; +pub use output::format::{Formatting, LabelOp, LineFormat, LineOp, LineTemplate, SimpleFormat}; +pub use output::graphite::{Graphite, GraphiteMetric, GraphiteScope}; pub use output::log::{Log, LogScope}; +pub use output::map::StatsMapScope; +pub use output::statsd::{Statsd, StatsdMetric, StatsdScope}; +pub use output::stream::{Stream, TextScope}; //#[cfg(feature="prometheus")] pub use output::prometheus::{Prometheus, PrometheusScope}; mod bucket; -pub use bucket::{ScoreType, stats_all, stats_average, stats_summary}; -pub use bucket::atomic::{AtomicBucket}; +pub use bucket::atomic::AtomicBucket; +pub use bucket::{stats_all, stats_average, stats_summary, ScoreType}; mod cache; pub use cache::cache_in::CachedInput; @@ -91,5 +106,5 @@ pub use multi::multi_in::{MultiInput, MultiInputScope}; pub use multi::multi_out::{MultiOutput, MultiOutputScope}; mod queue; -pub use queue::queue_in::{QueuedInput, InputQueue, InputQueueScope}; -pub use queue::queue_out::{QueuedOutput, OutputQueue, OutputQueueScope}; +pub use queue::queue_in::{InputQueue, InputQueueScope, QueuedInput}; +pub use queue::queue_out::{OutputQueue, OutputQueueScope, QueuedOutput}; diff --git a/src/macros.rs b/src/macros.rs index 59d725e..f90a2f5 100755 --- a/src/macros.rs +++ b/src/macros.rs @@ -162,7 +162,7 @@ macro_rules! metrics { }; (@internal $WITH:expr; $TYPE:ty;) => () - + } #[cfg(test)] @@ -170,7 +170,7 @@ mod test { use core::input::*; use core::proxy::Proxy; - metrics!{TEST: Proxy = "test_prefix" => { + metrics! {TEST: Proxy = "test_prefix" => { pub M1: Marker = "failed"; C1: Counter = "failed"; G1: Gauge = "failed"; diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index deecd35..c7f520f 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -1,10 +1,10 @@ //! Dispatch metrics to multiple sinks. -use core::Flush; -use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn}; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; +use core::name::MetricName; +use core::Flush; use std::sync::Arc; @@ -29,7 +29,7 @@ impl Input for MultiInput { impl MultiInput { /// Create a new multi-input dispatcher. - #[deprecated(since="0.7.2", note="Use new()")] + #[deprecated(since = "0.7.2", note = "Use new()")] pub fn input() -> Self { Self::new() } @@ -51,8 +51,12 @@ impl MultiInput { } impl WithAttributes for MultiInput { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } /// Dispatch metric values to a list of scopes. @@ -78,17 +82,20 @@ impl MultiInputScope { cloned.scopes.push(Arc::new(scope)); cloned } - } impl InputScope for MultiInputScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = &self.prefix_append(name); - let metrics: Vec = self.scopes.iter() + let metrics: Vec = self + .scopes + .iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); - InputMetric::new(move |value, labels| for metric in &metrics { - metric.write(value, labels.clone()) + InputMetric::new(move |value, labels| { + for metric in &metrics { + metric.write(value, labels.clone()) + } }) } } @@ -104,6 +111,10 @@ impl Flush for MultiInputScope { } impl WithAttributes for MultiInputScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } diff --git a/src/multi/multi_out.rs b/src/multi/multi_out.rs index e2b3e87..d03e552 100755 --- a/src/multi/multi_out.rs +++ b/src/multi/multi_out.rs @@ -1,11 +1,11 @@ //! Dispatch metrics to multiple sinks. -use core::Flush; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::input::InputKind; -use core::output::{Output, OutputMetric, OutputScope, OutputDyn}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::InputKind; +use core::name::MetricName; +use core::output::{Output, OutputDyn, OutputMetric, OutputScope}; +use core::Flush; use std::rc::Rc; use std::sync::Arc; @@ -31,7 +31,7 @@ impl Output for MultiOutput { impl MultiOutput { /// Create a new multi-output dispatcher. - #[deprecated(since="0.7.2", note="Use new()")] + #[deprecated(since = "0.7.2", note = "Use new()")] pub fn output() -> Self { Self::new() } @@ -51,12 +51,15 @@ impl MultiOutput { cloned.outputs.push(Arc::new(out)); cloned } - } impl WithAttributes for MultiOutput { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } /// Dispatch metric values to a list of scopes. @@ -81,17 +84,20 @@ impl MultiOutputScope { cloned.scopes.push(Rc::new(scope)); cloned } - } impl OutputScope for MultiOutputScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { let name = &self.prefix_append(name); - let metrics: Vec = self.scopes.iter() + let metrics: Vec = self + .scopes + .iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); - OutputMetric::new(move |value, labels| for metric in &metrics { - metric.write(value, labels.clone()) + OutputMetric::new(move |value, labels| { + for metric in &metrics { + metric.write(value, labels.clone()) + } }) } } @@ -107,6 +113,10 @@ impl Flush for MultiOutputScope { } impl WithAttributes for MultiOutputScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } diff --git a/src/output/format.rs b/src/output/format.rs index aae893f..ef3bf4d 100644 --- a/src/output/format.rs +++ b/src/output/format.rs @@ -1,7 +1,7 @@ -use core::name::MetricName; -use core::input::InputKind; -use core::MetricValue; use self::LineOp::*; +use core::input::InputKind; +use core::name::MetricName; +use core::MetricValue; use std::io; use std::sync::Arc; @@ -32,7 +32,7 @@ pub enum LabelOp { /// An sequence of print commands, embodying an output strategy for a single metric. pub struct LineTemplate { - ops: Vec + ops: Vec, } impl From> for LineTemplate { @@ -43,8 +43,14 @@ impl From> for LineTemplate { impl LineTemplate { /// Template execution applies commands in turn, writing to the output. - pub fn print(&self, output: &mut io::Write, value: MetricValue, lookup: L) -> Result<(), io::Error> - where L: Fn(&str) -> Option> + pub fn print( + &self, + output: &mut io::Write, + value: MetricValue, + lookup: L, + ) -> Result<(), io::Error> + where + L: Fn(&str) -> Option>, { for cmd in &self.ops { match cmd { @@ -53,22 +59,19 @@ impl LineTemplate { ScaledValueAsText(scale) => { let scaled = value as f64 / scale; output.write_all(format!("{}", scaled).as_ref())? - }, + } NewLine => writeln!(output)?, LabelExists(label_key, print_label) => { if let Some(label_value) = lookup(label_key.as_ref()) { for label_cmd in print_label { match label_cmd { - LabelOp::LabelValue => - output.write_all(label_value.as_bytes())?, - LabelOp::LabelKey => - output.write_all(label_key.as_bytes())?, - LabelOp::Literal(src) => - output.write_all(src.as_ref())?, + LabelOp::LabelValue => output.write_all(label_value.as_bytes())?, + LabelOp::LabelKey => output.write_all(label_key.as_bytes())?, + LabelOp::Literal(src) => output.write_all(src.as_ref())?, } } } - }, + } }; } Ok(()) @@ -83,7 +86,6 @@ pub trait Formatting { /// Forges metric-specific printers pub trait LineFormat: Send + Sync { - /// Prepare a template for output of metric values. fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate; } @@ -100,11 +102,7 @@ impl LineFormat for SimpleFormat { let mut header = name.join("."); header.push(' '); LineTemplate { - ops: vec![ - Literal(header.into_bytes()), - ValueAsText, - NewLine, - ] + ops: vec![Literal(header.into_bytes()), ValueAsText, NewLine], } } } @@ -129,12 +127,16 @@ pub mod test { Literal(" ".into()), ScaledValueAsText(1000.0), Literal(" ".into()), - LabelExists("test_key".into(), vec![ - LabelOp::LabelKey, - LabelOp::Literal("=".into()), - LabelOp::LabelValue]), + LabelExists( + "test_key".into(), + vec![ + LabelOp::LabelKey, + LabelOp::Literal("=".into()), + LabelOp::LabelValue, + ], + ), NewLine, - ] + ], } } } @@ -147,8 +149,13 @@ pub mod test { name = name.prepend("xyz"); let template = format.template(&name, InputKind::Counter); let mut out = vec![]; - template.print(&mut out, 123000, |key| labels.lookup(key)).unwrap(); - assert_eq!("Counter/xyz.abc 123000 123 test_key=456\n", String::from_utf8(out).unwrap()); + template + .print(&mut out, 123000, |key| labels.lookup(key)) + .unwrap(); + assert_eq!( + "Counter/xyz.abc 123000 123 test_key=456\n", + String::from_utf8(out).unwrap() + ); } #[test] @@ -159,6 +166,9 @@ pub mod test { let template = format.template(&name, InputKind::Counter); let mut out = vec![]; template.print(&mut out, 123000, |_key| None).unwrap(); - assert_eq!("Counter/xyz.abc 123000 123 \n", String::from_utf8(out).unwrap()); + assert_eq!( + "Counter/xyz.abc 123000 123 \n", + String::from_utf8(out).unwrap() + ); } } diff --git a/src/output/graphite.rs b/src/output/graphite.rs index 1d2154c..ef42159 100755 --- a/src/output/graphite.rs +++ b/src/output/graphite.rs @@ -1,32 +1,32 @@ //! Send metrics to a graphite server. -use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::{Flush, MetricValue}; +use cache::cache_out; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; +use core::error; use core::input::InputKind; use core::metrics; -use core::output::{Output, OutputScope, OutputMetric}; -use core::error; -use queue::queue_out; -use cache::cache_out; +use core::name::MetricName; +use core::output::{Output, OutputMetric, OutputScope}; +use core::{Flush, MetricValue}; use output::socket::RetrySocket; +use queue::queue_out; use std::net::ToSocketAddrs; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::io::Write; use std::fmt::Debug; +use std::io::Write; +use std::time::{SystemTime, UNIX_EPOCH}; -use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::rc::Rc; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// Graphite output holds a socket to a graphite server. /// The socket is shared between scopes opened from the output. @@ -62,8 +62,12 @@ impl Graphite { } impl WithAttributes for Graphite { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for Graphite {} @@ -98,7 +102,6 @@ impl OutputScope for GraphiteScope { } impl Flush for GraphiteScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); @@ -107,7 +110,7 @@ impl Flush for GraphiteScope { } impl GraphiteScope { - fn print(&self, metric: &GraphiteMetric, value: MetricValue) { + fn print(&self, metric: &GraphiteMetric, value: MetricValue) { let scaled_value = value / metric.scale; let value_str = scaled_value.to_string(); @@ -142,7 +145,9 @@ impl GraphiteScope { } fn flush_inner(&self, mut buf: RefMut) -> error::Result<()> { - if buf.is_empty() { return Ok(()) } + if buf.is_empty() { + return Ok(()); + } let mut sock = write_lock!(self.socket); match sock.write_all(buf.as_bytes()) { @@ -158,13 +163,16 @@ impl GraphiteScope { Err(e.into()) } } - } } impl WithAttributes for GraphiteScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for GraphiteScope {} @@ -195,9 +203,9 @@ impl Drop for GraphiteScope { #[cfg(feature = "bench")] mod bench { + use super::*; use core::attributes::*; use core::input::*; - use super::*; use test; #[bench] @@ -210,8 +218,10 @@ mod bench { #[bench] pub fn buffering_graphite(b: &mut test::Bencher) { - let sd = Graphite::send_to("localhost:2003").unwrap() - .buffered(Buffering::BufferSize(65465)).metrics(); + let sd = Graphite::send_to("localhost:2003") + .unwrap() + .buffered(Buffering::BufferSize(65465)) + .metrics(); let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); diff --git a/src/output/log.rs b/src/output/log.rs index 60ed7b0..be30e9d 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -1,22 +1,22 @@ -use core::{Flush}; -use core::input::{InputKind, Input, InputScope, InputMetric}; -use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed, OnFlush}; -use core::name::MetricName; -use core::error; use cache::cache_in; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; +use core::error; +use core::input::{Input, InputKind, InputMetric, InputScope}; +use core::name::MetricName; +use core::Flush; +use output::format::{Formatting, LineFormat, SimpleFormat}; use queue::queue_in; -use output::format::{LineFormat, SimpleFormat, Formatting}; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; -use std::io::Write; use log; +use std::io::Write; /// Buffered metrics log output. #[derive(Clone)] @@ -40,8 +40,12 @@ impl Input for Log { } impl WithAttributes for Log { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for Log {} @@ -70,7 +74,7 @@ impl Log { attributes: Attributes::default(), format: Arc::new(SimpleFormat::default()), level: log::Level::Info, - target: None + target: None, } } @@ -89,12 +93,15 @@ impl Log { cloned.target = Some(target.to_string()); cloned } - } impl WithAttributes for LogScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for LogScope {} @@ -116,7 +123,7 @@ impl InputScope for LogScope { Ok(()) => { let mut entries = write_lock!(entries); entries.push(buffer) - }, + } Err(err) => debug!("Could not format buffered log metric: {}", err), } }) @@ -127,10 +134,12 @@ impl InputScope for LogScope { InputMetric::new(move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { - Ok(()) => if let Some(target) = &target { - log!(target: target, level, "{:?}", &buffer) - } else { - log!(level, "{:?}", &buffer) + Ok(()) => { + if let Some(target) = &target { + log!(target: target, level, "{:?}", &buffer) + } else { + log!(level, "{:?}", &buffer) + } } Err(err) => debug!("Could not format buffered log metric: {}", err), } @@ -140,7 +149,6 @@ impl InputScope for LogScope { } impl Flush for LogScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); let mut entries = write_lock!(self.entries); diff --git a/src/output/map.rs b/src/output/map.rs index 8ef7a84..e43b8b9 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -1,14 +1,14 @@ -use core::{Flush, MetricValue}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::input::InputKind; +use core::input::{Input, InputMetric, InputScope}; use core::name::MetricName; -use core::input::{InputMetric, InputScope, Input}; -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; +use core::{Flush, MetricValue}; use std::collections::BTreeMap; use std::error::Error; use std::sync::{Arc, RwLock}; -use ::{OutputScope, OutputMetric}; +use {OutputMetric, OutputScope}; /// A BTreeMap wrapper to receive metrics or stats values. /// Every received value for a metric replaces the previous one (if any). @@ -18,8 +18,12 @@ pub struct StatsMap { } impl WithAttributes for StatsMap { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Input for StatsMap { @@ -42,8 +46,12 @@ pub struct StatsMapScope { } impl WithAttributes for StatsMapScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl InputScope for StatsMapScope { @@ -68,7 +76,6 @@ impl OutputScope for StatsMapScope { } } - impl Flush for StatsMapScope { fn flush(&self) -> Result<(), Box> { self.notify_flush_listeners(); diff --git a/src/output/prometheus.rs b/src/output/prometheus.rs index 8e039bc..678b7aa 100755 --- a/src/output/prometheus.rs +++ b/src/output/prometheus.rs @@ -1,18 +1,18 @@ //! Send metrics to a Prometheus server. -use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::{Flush, MetricValue}; -use core::input::InputKind; -use core::metrics; -use core::output::{Output, OutputScope, OutputMetric}; -use core::error; -use queue::queue_out; use cache::cache_out; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; +use core::error; +use core::input::InputKind; use core::label::Labels; +use core::metrics; +use core::name::MetricName; +use core::output::{Output, OutputMetric, OutputScope}; +use core::{Flush, MetricValue}; +use queue::queue_out; -use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::rc::Rc; /// Prometheus output holds a socket to a Prometheus server. /// The socket is shared between scopes opened from the output. @@ -50,8 +50,12 @@ impl Prometheus { } impl WithAttributes for Prometheus { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for Prometheus {} @@ -85,7 +89,6 @@ impl OutputScope for PrometheusScope { } impl Flush for PrometheusScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); @@ -94,7 +97,7 @@ impl Flush for PrometheusScope { } impl PrometheusScope { - fn print(&self, metric: &PrometheusMetric, value: MetricValue, labels: Labels) { + fn print(&self, metric: &PrometheusMetric, value: MetricValue, labels: Labels) { let scaled_value = value / metric.scale; let value_str = scaled_value.to_string(); @@ -127,7 +130,10 @@ impl PrometheusScope { let buffer = self.buffer.borrow_mut(); if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD { metrics::PROMETHEUS_OVERFLOW.mark(); - warn!("Prometheus Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD); + warn!( + "Prometheus Buffer Size Exceeded: {}", + BUFFER_FLUSH_THRESHOLD + ); let _ = self.flush_inner(buffer); } else { if !self.is_buffered() { @@ -139,9 +145,14 @@ impl PrometheusScope { } fn flush_inner(&self, mut buf: RefMut) -> error::Result<()> { - if buf.is_empty() { return Ok(()) } + if buf.is_empty() { + return Ok(()); + } - match minreq::get(self.push_url.as_ref()).with_body(buf.as_ref()).send() { + match minreq::get(self.push_url.as_ref()) + .with_body(buf.as_ref()) + .send() + { Ok(_res) => { metrics::PROMETHEUS_SENT_BYTES.count(buf.len()); trace!("Sent {} bytes to Prometheus", buf.len()); @@ -158,8 +169,12 @@ impl PrometheusScope { } impl WithAttributes for PrometheusScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for PrometheusScope {} @@ -190,9 +205,9 @@ impl Drop for PrometheusScope { #[cfg(feature = "bench")] mod bench { + use super::*; use core::attributes::*; use core::input::*; - use super::*; use test; #[bench] @@ -205,8 +220,10 @@ mod bench { #[bench] pub fn buffering_prometheus(b: &mut test::Bencher) { - let sd = Prometheus::push_to("localhost:2003").unwrap() - .buffered(Buffering::BufferSize(65465)).metrics(); + let sd = Prometheus::push_to("localhost:2003") + .unwrap() + .buffered(Buffering::BufferSize(65465)) + .metrics(); let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); diff --git a/src/output/socket.rs b/src/output/socket.rs index 21de29e..957c3e7 100755 --- a/src/output/socket.rs +++ b/src/output/socket.rs @@ -1,11 +1,11 @@ //! A TCP Socket wrapper that reconnects automatically. +use std::fmt; +use std::io; +use std::io::Write; use std::net::TcpStream; use std::net::{SocketAddr, ToSocketAddrs}; -use std::io; use std::time::{Duration, Instant}; -use std::fmt; -use std::io::Write; const MIN_RECONNECT_DELAY_MS: u64 = 50; const MAX_RECONNECT_DELAY_MS: u64 = 10_000; diff --git a/src/output/statsd.rs b/src/output/statsd.rs index d155a60..daef10f 100755 --- a/src/output/statsd.rs +++ b/src/output/statsd.rs @@ -1,21 +1,23 @@ //! Send metrics to a statsd server. -use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::pcg32; -use core::{Flush, MetricValue}; +use cache::cache_out; +use core::attributes::{ + Attributes, Buffered, OnFlush, Prefixed, Sampled, Sampling, WithAttributes, +}; +use core::error; use core::input::InputKind; use core::metrics; -use core::output::{Output, OutputScope, OutputMetric}; -use core::error; -use cache::cache_out; +use core::name::MetricName; +use core::output::{Output, OutputMetric, OutputScope}; +use core::pcg32; +use core::{Flush, MetricValue}; use queue::queue_out; +use std::cell::{RefCell, RefMut}; use std::net::ToSocketAddrs; -use std::sync::Arc; use std::net::UdpSocket; use std::rc::Rc; -use std::cell::{RefCell, RefMut}; +use std::sync::Arc; /// Use a safe maximum size for UDP to prevent fragmentation. // TODO make configurable? @@ -62,8 +64,12 @@ impl Output for Statsd { } impl WithAttributes for Statsd { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } /// Statsd Input @@ -99,9 +105,13 @@ impl OutputScope for StatsdScope { let cloned = self.clone(); if let Sampling::Random(float_rate) = self.get_sampling() { - suffix.push_str(&format!{"|@{}\n", float_rate}); + suffix.push_str(&format! {"|@{}\n", float_rate}); let int_sampling_rate = pcg32::to_int_rate(float_rate); - let metric = StatsdMetric { prefix, suffix, scale }; + let metric = StatsdMetric { + prefix, + suffix, + scale, + }; OutputMetric::new(move |value, _labels| { if pcg32::accept_sample(int_sampling_rate) { @@ -110,16 +120,17 @@ impl OutputScope for StatsdScope { }) } else { suffix.push_str("\n"); - let metric = StatsdMetric { prefix, suffix, scale }; - OutputMetric::new(move |value, _labels| { - cloned.print(&metric, value) - }) + let metric = StatsdMetric { + prefix, + suffix, + scale, + }; + OutputMetric::new(move |value, _labels| cloned.print(&metric, value)) } } } impl Flush for StatsdScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); @@ -128,7 +139,7 @@ impl Flush for StatsdScope { } impl StatsdScope { - fn print(&self, metric: &StatsdMetric, value: MetricValue) { + fn print(&self, metric: &StatsdMetric, value: MetricValue) { let scaled_value = value / metric.scale; let value_str = scaled_value.to_string(); let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len(); @@ -144,7 +155,6 @@ impl StatsdScope { // buffer is nearly full, make room let _ = self.flush_inner(buffer); buffer = self.buffer.borrow_mut(); - } else { if !buffer.is_empty() { // separate from previous entry @@ -171,7 +181,7 @@ impl StatsdScope { } Err(e) => { metrics::STATSD_SEND_ERR.mark(); - return Err(e.into()) + return Err(e.into()); } }; buffer.clear(); @@ -181,8 +191,12 @@ impl StatsdScope { } impl WithAttributes for StatsdScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for StatsdScope {} @@ -251,9 +265,9 @@ impl Drop for StatsdScope { #[cfg(feature = "bench")] mod bench { + use super::*; use core::attributes::*; use core::input::*; - use super::*; use test; #[bench] @@ -266,8 +280,10 @@ mod bench { #[bench] pub fn buffering_statsd(b: &mut test::Bencher) { - let sd = Statsd::send_to("localhost:2003").unwrap() - .buffered(Buffering::BufferSize(65465)).metrics(); + let sd = Statsd::send_to("localhost:2003") + .unwrap() + .buffered(Buffering::BufferSize(65465)) + .metrics(); let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); diff --git a/src/output/stream.rs b/src/output/stream.rs index e4dd47b..7b18a24 100755 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -2,30 +2,30 @@ // TODO parameterize templates -use core::{Flush}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; +use core::error; use core::input::InputKind; -use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed, OnFlush}; use core::name::MetricName; use core::output::{Output, OutputMetric, OutputScope}; -use core::error; +use core::Flush; use cache::cache_out; +use output::format::{Formatting, LineFormat, SimpleFormat}; use queue::queue_out; -use output::format::{LineFormat, SimpleFormat, Formatting}; -use std::io::{Write, self}; -use std::path::Path; -use std::fs::File; -use std::rc::Rc; use std::cell::RefCell; +use std::fs::File; +use std::io::{self, Write}; +use std::path::Path; +use std::rc::Rc; -use std::sync::{Arc}; +use std::sync::Arc; -#[cfg(not(feature="parking_lot"))] -use std::sync::{RwLock}; +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; -#[cfg(feature="parking_lot")] -use parking_lot::{RwLock}; +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// Buffered metrics text output. pub struct Stream { @@ -45,7 +45,7 @@ impl Formatting for Stream { } } -impl Stream { +impl Stream { /// Write metric values to provided Write target. pub fn write_to(write: W) -> Stream { Stream { @@ -77,7 +77,6 @@ impl Stream { } } - // FIXME manual Clone impl required because auto-derive is borked (https://github.com/rust-lang/rust/issues/26925) impl Clone for Stream { fn clone(&self) -> Self { @@ -90,8 +89,12 @@ impl Clone for Stream { } impl WithAttributes for Stream { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for Stream {} @@ -115,7 +118,6 @@ pub struct TextScope { output: Stream, } - impl Clone for TextScope { fn clone(&self) -> Self { TextScope { @@ -127,8 +129,12 @@ impl Clone for TextScope { } impl WithAttributes for TextScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Buffered for TextScope {} @@ -147,7 +153,7 @@ impl OutputScope for TextScope { Ok(()) => { let mut entries = entries.borrow_mut(); entries.push(buffer) - }, + } Err(err) => debug!("{}", err), } }) @@ -162,7 +168,7 @@ impl OutputScope for TextScope { if let Err(e) = output.write_all(&buffer).and_then(|_| output.flush()) { debug!("Could not write text metrics: {}", e) } - }, + } Err(err) => debug!("{}", err), } }) @@ -171,7 +177,6 @@ impl OutputScope for TextScope { } impl Flush for TextScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); let mut entries = self.entries.borrow_mut(); diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 6083f1c..79a6718 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -2,21 +2,21 @@ //! Metrics definitions are still synchronous. //! If queue size is exceeded, calling code reverts to blocking. -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::input::{InputKind, Input, InputScope, InputDyn, InputMetric}; -use core::{MetricValue, Flush}; -use core::metrics; use cache::cache_in::CachedInput; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::label::Labels; +use core::metrics; +use core::name::MetricName; +use core::{Flush, MetricValue}; -use std::sync::Arc; -#[cfg(not(feature="crossbeam-channel"))] +#[cfg(not(feature = "crossbeam-channel"))] use std::sync::mpsc; +use std::sync::Arc; use std::thread; -#[cfg(feature="crossbeam-channel")] +#[cfg(feature = "crossbeam-channel")] use crossbeam_channel as crossbeam; /// Wrap this output behind an asynchronous metrics dispatch queue. @@ -32,7 +32,7 @@ pub trait QueuedInput: Input + Send + Sync + 'static + Sized { /// # Panics /// /// Panics if the OS fails to create a thread. -#[cfg(not(feature="crossbeam-channel"))] +#[cfg(not(feature = "crossbeam-channel"))] fn new_async_channel(length: usize) -> Arc> { let (sender, receiver) = mpsc::sync_channel::(length); @@ -43,9 +43,11 @@ fn new_async_channel(length: usize) -> Arc> { while !done { match receiver.recv() { Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels), - Ok(InputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() { - debug!("Could not asynchronously flush metrics: {}", e); - }, + Ok(InputQueueCmd::Flush(scope)) => { + if let Err(e) = scope.flush() { + debug!("Could not asynchronously flush metrics: {}", e); + } + } Err(e) => { debug!("Async metrics receive loop terminated: {}", e); // cannot break from within match, use safety pin instead @@ -61,7 +63,7 @@ fn new_async_channel(length: usize) -> Arc> { /// # Panics /// /// Panics if the OS fails to create a thread. -#[cfg(feature="crossbeam-channel")] +#[cfg(feature = "crossbeam-channel")] fn new_async_channel(length: usize) -> Arc> { let (sender, receiver) = crossbeam::bounded::(length); @@ -72,9 +74,11 @@ fn new_async_channel(length: usize) -> Arc> { while !done { match receiver.recv() { Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels), - Ok(InputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() { - debug!("Could not asynchronously flush metrics: {}", e); - }, + Ok(InputQueueCmd::Flush(scope)) => { + if let Err(e) = scope.flush() { + debug!("Could not asynchronously flush metrics: {}", e); + } + } Err(e) => { debug!("Async metrics receive loop terminated: {}", e); // cannot break from within match, use safety pin instead @@ -92,9 +96,9 @@ fn new_async_channel(length: usize) -> Arc> { pub struct InputQueue { attributes: Attributes, target: Arc, - #[cfg(not(feature="crossbeam-channel"))] + #[cfg(not(feature = "crossbeam-channel"))] sender: Arc>, - #[cfg(feature="crossbeam-channel")] + #[cfg(feature = "crossbeam-channel")] sender: Arc>, } @@ -112,8 +116,12 @@ impl InputQueue { impl CachedInput for InputQueue {} impl WithAttributes for InputQueue { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl Input for InputQueue { @@ -144,16 +152,19 @@ pub enum InputQueueCmd { #[derive(Clone)] pub struct InputQueueScope { attributes: Attributes, - #[cfg(not(feature="crossbeam-channel"))] + #[cfg(not(feature = "crossbeam-channel"))] sender: Arc>, - #[cfg(feature="crossbeam-channel")] + #[cfg(feature = "crossbeam-channel")] sender: Arc>, target: Arc, } impl InputQueueScope { /// Wrap new scopes with an asynchronous metric write & flush dispatcher. - pub fn wrap(target_scope: SC, queue_length: usize) -> Self { + pub fn wrap( + target_scope: SC, + queue_length: usize, + ) -> Self { InputQueueScope { attributes: Attributes::default(), sender: new_async_channel(queue_length), @@ -163,8 +174,12 @@ impl InputQueueScope { } impl WithAttributes for InputQueueScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl InputScope for InputQueueScope { @@ -174,7 +189,8 @@ impl InputScope for InputQueueScope { let sender = self.sender.clone(); InputMetric::new(move |value, mut labels| { labels.save_context(); - if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) { + if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) + { metrics::SEND_FAILED.mark(); debug!("Failed to send async metrics: {}", e); } @@ -183,7 +199,6 @@ impl InputScope for InputQueueScope { } impl Flush for InputQueueScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); if let Err(e) = self.sender.send(InputQueueCmd::Flush(self.target.clone())) { @@ -195,4 +210,3 @@ impl Flush for InputQueueScope { } } } - diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index c4e6aab..1628ec1 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -2,26 +2,26 @@ //! RawMetrics definitions are still synchronous. //! If queue size is exceeded, calling code reverts to blocking. -use core::attributes::{Attributes, WithAttributes, Prefixed, OnFlush}; -use core::name::MetricName; -use core::input::{InputKind, Input, InputScope, InputMetric}; -use core::output::{OutputDyn, OutputScope, OutputMetric, Output}; -use core::{MetricValue, Flush}; -use core::metrics; use cache::cache_in; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; +use core::input::{Input, InputKind, InputMetric, InputScope}; use core::label::Labels; +use core::metrics; +use core::name::MetricName; +use core::output::{Output, OutputDyn, OutputMetric, OutputScope}; +use core::{Flush, MetricValue}; -use std::rc::Rc; -use std::ops; use std::fmt; +use std::ops; +use std::rc::Rc; -use std::sync::Arc; -#[cfg(not(feature="crossbeam-channel"))] +#[cfg(not(feature = "crossbeam-channel"))] use std::sync::mpsc; +use std::sync::Arc; use std::thread; -#[cfg(feature="crossbeam-channel")] +#[cfg(feature = "crossbeam-channel")] use crossbeam_channel as crossbeam; /// Wrap this raw output behind an asynchronous metrics dispatch queue. @@ -35,7 +35,7 @@ pub trait QueuedOutput: Output + Sized { /// # Panics /// /// Panics if the OS fails to create a thread. -#[cfg(not(feature="crossbeam-channel"))] +#[cfg(not(feature = "crossbeam-channel"))] fn new_async_channel(length: usize) -> Arc> { let (sender, receiver) = mpsc::sync_channel::(length); @@ -46,9 +46,11 @@ fn new_async_channel(length: usize) -> Arc> { while !done { match receiver.recv() { Ok(OutputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels), - Ok(OutputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() { - debug!("Could not asynchronously flush metrics: {}", e); - }, + Ok(OutputQueueCmd::Flush(scope)) => { + if let Err(e) = scope.flush() { + debug!("Could not asynchronously flush metrics: {}", e); + } + } Err(e) => { debug!("Async metrics receive loop terminated: {}", e); // cannot break from within match, use safety pin instead @@ -64,7 +66,7 @@ fn new_async_channel(length: usize) -> Arc> { /// # Panics /// /// Panics if the OS fails to create a thread. -#[cfg(feature="crossbeam-channel")] +#[cfg(feature = "crossbeam-channel")] fn new_async_channel(length: usize) -> Arc> { let (sender, receiver) = crossbeam::bounded::(length); @@ -75,9 +77,11 @@ fn new_async_channel(length: usize) -> Arc> { while !done { match receiver.recv() { Ok(OutputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels), - Ok(OutputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() { - debug!("Could not asynchronously flush metrics: {}", e); - }, + Ok(OutputQueueCmd::Flush(scope)) => { + if let Err(e) = scope.flush() { + debug!("Could not asynchronously flush metrics: {}", e); + } + } Err(e) => { debug!("Async metrics receive loop terminated: {}", e); // cannot break from within match, use safety pin instead @@ -90,15 +94,14 @@ fn new_async_channel(length: usize) -> Arc> { Arc::new(sender) } - /// Wrap scope with an asynchronous metric write & flush dispatcher. #[derive(Clone)] pub struct OutputQueue { attributes: Attributes, target: Arc, - #[cfg(not(feature="crossbeam-channel"))] + #[cfg(not(feature = "crossbeam-channel"))] q_sender: Arc>, - #[cfg(feature="crossbeam-channel")] + #[cfg(feature = "crossbeam-channel")] q_sender: Arc>, } @@ -114,8 +117,12 @@ impl OutputQueue { } impl WithAttributes for OutputQueue { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl cache_in::CachedInput for OutputQueue {} @@ -132,7 +139,6 @@ impl Input for OutputQueue { target: Arc::new(target_scope), } } - } /// This is only `pub` because `error` module needs to know about it. @@ -149,16 +155,20 @@ pub enum OutputQueueCmd { #[derive(Clone)] pub struct OutputQueueScope { attributes: Attributes, - #[cfg(not(feature="crossbeam-channel"))] + #[cfg(not(feature = "crossbeam-channel"))] sender: Arc>, - #[cfg(feature="crossbeam-channel")] + #[cfg(feature = "crossbeam-channel")] sender: Arc>, target: Arc, } impl WithAttributes for OutputQueueScope { - fn get_attributes(&self) -> &Attributes { &self.attributes } - fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } } impl InputScope for OutputQueueScope { @@ -168,7 +178,8 @@ impl InputScope for OutputQueueScope { let sender = self.sender.clone(); InputMetric::new(move |value, mut labels| { labels.save_context(); - if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) { + if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) + { metrics::SEND_FAILED.mark(); debug!("Failed to send async metrics: {}", e); } @@ -177,7 +188,6 @@ impl InputScope for OutputQueueScope { } impl Flush for OutputQueueScope { - fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); if let Err(e) = self.sender.send(OutputQueueCmd::Flush(self.target.clone())) { @@ -193,7 +203,7 @@ impl Flush for OutputQueueScope { /// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads. /// Obviously, it should only still be used from a single thread or dragons may occur. #[derive(Clone)] -pub struct UnsafeScope(Rc ); +pub struct UnsafeScope(Rc); /// This is ok because scope will only ever be used by the dispatcher thread. unsafe impl Send for UnsafeScope {} @@ -215,7 +225,6 @@ impl ops::Deref for UnsafeScope { } } - impl fmt::Debug for OutputMetric { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Box") @@ -225,4 +234,3 @@ impl fmt::Debug for OutputMetric { unsafe impl Send for OutputMetric {} unsafe impl Sync for OutputMetric {} - diff --git a/tests/skeptic.rs b/tests/skeptic.rs index 34e6884..08d989f 100644 --- a/tests/skeptic.rs +++ b/tests/skeptic.rs @@ -1,2 +1,2 @@ -#[cfg(feature="skeptic")] +#[cfg(feature = "skeptic")] include!(concat!(env!("OUT_DIR"), "/skeptic-tests.rs"));