diff --git a/Cargo.toml b/Cargo.toml index 5996b26..21a2d15 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,9 +26,6 @@ derivative = "1.0" atomic_refcell = "0.1" chrono = "^0.3" -#[build-dependencies] -#skeptic = "0.13" - [dev-dependencies] skeptic = "0.13" diff --git a/README.md b/README.md index 451e297..d541c26 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ timer.interval_us(123_456); Related metrics can share a namespace: ```rust,skt-run let app_metrics = metric_scope(to_stdout()); -let db_metrics = app_metrics.with_prefix("database"); +let db_metrics = app_metrics.with_suffix("database"); let _db_timer = db_metrics.timer("db_timer"); let _db_counter = db_metrics.counter("db_counter"); ``` diff --git a/examples/aggregate.rs b/examples/aggregate.rs index 67ae097..258b329 100755 --- a/examples/aggregate.rs +++ b/examples/aggregate.rs @@ -7,9 +7,10 @@ use std::time::Duration; use dipstick::*; fn main() { - let metrics = new_aggregate(); + let metrics = MetricAggregator::new(); - set_aggregate_default_output(to_stdout()); + // MetricAggregator::set_default_output(to_stdout()); + metrics.set_output(to_stdout()); metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/async.rs b/examples/async.rs index b51e2d3..3bd6ebf 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -13,7 +13,7 @@ fn main() { let counter = metrics.counter("counter_a"); let timer = metrics.timer("timer_b"); - let subsystem_metrics = metrics.with_name("subsystem"); + let subsystem_metrics = metrics.with_suffix("subsystem"); let event = subsystem_metrics.marker("event_c"); let gauge = subsystem_metrics.gauge("gauge_d"); diff --git a/examples/basics.rs b/examples/basics.rs index 64e4eab..3031120 100644 --- a/examples/basics.rs +++ b/examples/basics.rs @@ -19,7 +19,7 @@ fn main() { app_metrics.counter("just_once").count(4); // metric names can be prepended with a common prefix - let prefixed_metrics = app_metrics.with_name("subsystem".to_string()); + let prefixed_metrics = app_metrics.with_suffix("subsystem"); let event = prefixed_metrics.marker("event_c"); let gauge = prefixed_metrics.gauge("gauge_d"); diff --git a/examples/custom_publish.rs b/examples/custom_publish.rs index 24fc841..e98c361 100644 --- a/examples/custom_publish.rs +++ b/examples/custom_publish.rs @@ -35,10 +35,10 @@ fn main() { } // send application metrics to aggregator - set_aggregate_default_output(to_stdout()); - set_aggregate_default_stats(custom_statistics); + MetricAggregator::set_default_output(to_stdout()); + MetricAggregator::set_default_stats(custom_statistics); - let app_metrics = new_aggregate(); + let app_metrics = MetricAggregator::new(); // schedule aggregated metrics to be printed every 3 seconds app_metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/graphite.rs b/examples/graphite.rs index 88cf0af..6980186 100644 --- a/examples/graphite.rs +++ b/examples/graphite.rs @@ -12,7 +12,7 @@ fn main() { let metrics = metric_scope( to_graphite("localhost:2003") .expect("Connecting") - .with_namespace(&["my", "app"][..]), + .with_suffix("my_app"), ); loop { diff --git a/examples/macro_aggregate.rs b/examples/macro_aggregate.rs index fffcb33..b1f312e 100755 --- a/examples/macro_aggregate.rs +++ b/examples/macro_aggregate.rs @@ -18,14 +18,14 @@ metrics!( pub AGGREGATE = () => { }); -metrics!( AGGREGATE.with_prefix("module_prefix") => { +metrics!( AGGREGATE.with_suffix("module_prefix") => { // create counter "module_prefix.module_counter" Counter MOD_COUNTER: "module_counter"; }); fn main() { // print aggregated metrics to the console - set_aggregate_default_output(to_stdout()); + MetricAggregator::set_default_output(to_stdout()); // enable autoflush... AGGREGATE.flush_every(Duration::from_millis(4000)); diff --git a/examples/macro_delegate.rs b/examples/macro_delegate.rs index 04ee989..38f5da8 100755 --- a/examples/macro_delegate.rs +++ b/examples/macro_delegate.rs @@ -36,7 +36,7 @@ metrics!(LIB_METRICS => { }); fn main() { - dispatch().set_target(to_stdout()); + metric_dispatch().set_target(to_stdout()); loop { ROOT_COUNTER.count(123); diff --git a/examples/macro_deprecated.rs b/examples/macro_deprecated.rs index 57124c1..6aa16de 100644 --- a/examples/macro_deprecated.rs +++ b/examples/macro_deprecated.rs @@ -22,8 +22,8 @@ app_metrics!( Vec, SAME_TYPE = [ // combine multiple outputs of the same type by using an array - to_stdout().with_prefix("yeah"), - to_stdout().with_prefix("ouch"), + to_stdout().with_suffix("yeah"), + to_stdout().with_suffix("ouch"), to_stdout().with_sampling_rate(0.5), ] ); @@ -31,7 +31,7 @@ app_metrics!( #[ignore(deprecated)] app_metrics!( Vec, - MUTANT_CHILD = SAME_TYPE.with_prefix("super").with_prefix("duper") + MUTANT_CHILD = SAME_TYPE.with_suffix("super").with_suffix("duper") ); fn main() { diff --git a/examples/macro_multi.rs b/examples/macro_multi.rs index 13cfcbc..0cc120c 100644 --- a/examples/macro_multi.rs +++ b/examples/macro_multi.rs @@ -15,12 +15,12 @@ metrics!(<(Statsd, String)> DIFFERENT_TYPES = ( metrics!(> SAME_TYPE = [ // combine multiple outputs of the same type by using an array - to_stdout().with_prefix("yeah"), - to_stdout().with_prefix("ouch"), + to_stdout().with_suffix("yeah"), + to_stdout().with_suffix("ouch"), to_stdout().with_sampling_rate(0.5), ][..]); -metrics!(> MUTANT_CHILD = SAME_TYPE.with_prefix("super").with_prefix("duper")); +metrics!(> MUTANT_CHILD = SAME_TYPE.with_suffix("super").with_suffix("duper")); fn main() { loop { diff --git a/examples/multi_out.rs b/examples/multi_out.rs index ad454b3..e474579 100644 --- a/examples/multi_out.rs +++ b/examples/multi_out.rs @@ -17,8 +17,8 @@ fn main() { let same_type_metrics = metric_scope( &[ // use slices to combine multiple metrics of the same type - to_stdout().with_name("yeah"), - to_stdout().with_name("ouch"), + to_stdout().with_suffix("yeah"), + to_stdout().with_suffix("ouch"), to_stdout().with_sampling_rate(0.5), ][..], ); diff --git a/examples/raw_log.rs b/examples/raw_log.rs index 9fde490..6b03ca7 100644 --- a/examples/raw_log.rs +++ b/examples/raw_log.rs @@ -3,7 +3,7 @@ extern crate dipstick; -use dipstick::MetricInput; +use dipstick::{MetricInput, ROOT_NS}; fn main() { raw_write() @@ -15,6 +15,7 @@ pub fn raw_write() { // define and send metrics using raw channel API let counter = metrics_log.define_metric( + &ROOT_NS, dipstick::Kind::Counter, "count_a", dipstick::FULL_SAMPLING_RATE, diff --git a/examples/summary.rs b/examples/summary.rs index 925175e..c428182 100644 --- a/examples/summary.rs +++ b/examples/summary.rs @@ -7,9 +7,9 @@ use std::time::Duration; use dipstick::*; fn main() { - set_aggregate_default_output(to_stdout()); - let app_metrics = new_aggregate(); + let app_metrics = MetricAggregator::new(); + app_metrics.set_output(to_stdout()); app_metrics.flush_every(Duration::from_secs(3)); diff --git a/src/aggregate.rs b/src/aggregate.rs index 5ec7ae1..ce68380 100755 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -1,10 +1,9 @@ //! Maintain aggregated metrics for deferred reporting, //! -use core::{command_fn, Kind, Sampling, Command, Value}; +use core::{command_fn, Kind, Sampling, Command, Value, Namespace}; use core::Kind::*; use output::{OpenScope, NO_METRIC_OUTPUT, MetricOutput}; use scope::{self, MetricScope, MetricInput, Flush, ScheduleFlush, DefineMetric,}; -use namespace::WithNamespace; use scores::{ScoreSnapshot, ScoreType, Scoreboard}; use scores::ScoreType::*; @@ -16,51 +15,34 @@ use std::sync::{Arc, RwLock}; pub type StatsFn = Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static; +fn initial_stats() -> &'static StatsFn { + &summary +} + +fn initial_output() -> Arc { + NO_METRIC_OUTPUT.clone() +} + lazy_static! { - static ref DEFAULT_AGGREGATE_STATS: RwLock> = RwLock::new(Arc::new(summary)); + static ref DEFAULT_AGGREGATE_STATS: RwLock> = RwLock::new(Arc::new(initial_stats())); - static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = - RwLock::new(NO_METRIC_OUTPUT.clone()); -} - -/// Set the default aggregated metrics statistics generator. -pub fn set_aggregate_default_stats(func: F) -where - F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static, -{ - *DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func) -} - -/// Install a new receiver for all aggregateed metrics, replacing any previous receiver. -pub fn set_aggregate_default_output>, T: Send + Sync + Clone + 'static> - (new_config: IS) -{ - *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(new_config.into()); -} - -fn get_aggregate_default() -> Arc { - DEFAULT_AGGREGATE_OUTPUT.read().unwrap().clone() + static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = RwLock::new(initial_output()); } /// 1024 Metrics per scoreboard should be enough? const DEFAULT_CAPACITY: usize = 1024; -/// Get the default aggregate point. -pub fn new_aggregate() -> MetricAggregate { - MetricAggregate::with_capacity(DEFAULT_CAPACITY) -} - -impl From for MetricScope { - fn from(agg: MetricAggregate) -> MetricScope { +impl From for MetricScope { + fn from(agg: MetricAggregator) -> MetricScope { agg.into_scope() } } impl From<&'static str> for MetricScope { fn from(prefix: &'static str) -> MetricScope { - let scope: MetricScope = new_aggregate().into(); + let scope: MetricScope = MetricAggregator::new().into(); if !prefix.is_empty() { - scope.with_prefix(prefix) + scope.with_suffix(prefix) } else { scope } @@ -69,30 +51,104 @@ impl From<&'static str> for MetricScope { impl From<()> for MetricScope { fn from(_: ()) -> MetricScope { - new_aggregate().into_scope() + MetricAggregator::new().into_scope() } } /// Central aggregation structure. /// Maintains a list of metrics for enumeration when used as source. #[derive(Debug, Clone)] -pub struct MetricAggregate { - metrics: Arc>>>, +pub struct MetricAggregator { + namespace: Namespace, + inner: Arc>, } -impl MetricAggregate { +#[derive(Derivative)] +#[derivative(Debug)] +struct InnerAggregator { + metrics: HashMap>, + #[derivative(Debug = "ignore")] + stats: Option Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static>>, + output: Option>, +} + +impl MetricAggregator { + /// Build a new metric aggregation + pub fn new() -> MetricAggregator { + MetricAggregator::with_capacity(DEFAULT_CAPACITY) + } + /// Build a new metric aggregation point with initial capacity of metrics to aggregate. - pub fn with_capacity(size: usize) -> MetricAggregate { - MetricAggregate { - metrics: Arc::new(RwLock::new(HashMap::with_capacity(size))), + pub fn with_capacity(size: usize) -> MetricAggregator { + MetricAggregator { + namespace: "".into(), + inner: Arc::new(RwLock::new(InnerAggregator { + metrics: HashMap::with_capacity(size), + stats: None, + output: None, + })) } } + /// Set the default aggregated metrics statistics generator. + pub fn set_default_stats(func: F) + where + F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static + { + *DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func) + } + + /// Remove any global customization of the default aggregation statistics. + pub fn unset_default_stats() { + *DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(initial_stats()) + } + + /// Install a new receiver for all aggregateed metrics, replacing any previous receiver. + pub fn set_default_output(new_config: IS) + where IS: Into>, + T: Send + Sync + Clone + 'static + { + *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(new_config.into()); + } + + /// Install a new receiver for all aggregateed metrics, replacing any previous receiver. + pub fn unset_default_output() { + *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_output() + } + + /// Set the default aggregated metrics statistics generator. + pub fn set_stats(&self, func: F) + where + F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static + { + self.inner.write().expect("Lock Aggregator").stats = Some(Arc::new(func)) + } + + /// Set the default aggregated metrics statistics generator. + pub fn unset_stats(&self) { + self.inner.write().expect("Lock Aggregator").stats = None + } + + /// Install a new receiver for all aggregated metrics, replacing any previous receiver. + pub fn set_output(&self, new_config: IS) + where IS: Into>, + T: Send + Sync + Clone + 'static + { + self.inner.write().expect("Lock Aggregator").output = Some(Arc::new(new_config.into())) + } + + /// Install a new receiver for all aggregated metrics, replacing any previous receiver. + pub fn unset_output(&self) { + self.inner.write().expect("Lock Aggregator").output = None + } + fn into_scope(&self) -> MetricScope { let agg_0 = self.clone(); let agg_1 = self.clone(); MetricScope::new( - Arc::new(move |kind, name, rate| agg_0.define_metric(kind, name, rate)), + self.namespace.clone(), + Arc::new(move |ns, kind, name, rate| agg_0.define_metric(ns, kind, name, rate)), command_fn(move |cmd| match cmd { Command::Write(metric, value) => { let metric: &Aggregate = metric; @@ -105,26 +161,28 @@ impl MetricAggregate { /// Discard scores for ad-hoc metrics. pub fn cleanup(&self) { - let orphans: Vec = self.metrics.read().unwrap().iter() + let orphans: Vec = self.inner.read().expect("Lock Aggregator").metrics.iter() // is aggregator now the sole owner? // TODO use weak ref + impl Drop to mark abandoned metrics (see dispatch) .filter(|&(_k, v)| Arc::strong_count(v) == 1) .map(|(k, _v)| k.to_string()) .collect(); if !orphans.is_empty() { - let mut remover = self.metrics.write().unwrap(); + let remover = &mut self.inner.write().unwrap().metrics; orphans.iter().for_each(|k| { remover.remove(k); }); } } - /// + /// Take a snapshot of aggregated values and reset them. + /// Compute stats on captured values using assigned or default stats function. + /// Write stats to assigned or default output. pub fn flush_to(&self, publish_scope: &DefineMetric, stats_fn: Arc) { - let snapshot: Vec = { - let metrics = self.metrics.read().expect("Aggregate Lock"); - metrics.values().flat_map(|score| score.reset()).collect() - }; + let snapshot: Vec = self.inner.read().expect("Lock Aggregator") + .metrics.values() + .flat_map(|score| score.reset()) + .collect(); if snapshot.is_empty() { // no data was collected for this period @@ -135,7 +193,7 @@ impl MetricAggregate { for score in metric.2 { if let Some(ex) = (stats_fn)(metric.0, metric.1.as_ref(), score) { publish_scope - .define_metric_object(ex.0, &ex.1.concat(), 1.0) + .define_metric_object(&self.namespace, ex.0, &ex.1.concat(), 1.0) .write(ex.2); } } @@ -143,9 +201,10 @@ impl MetricAggregate { } } + } -impl MetricInput for MetricAggregate { +impl MetricInput for MetricAggregator { /// Define an event counter of the provided name. fn marker(&self, name: &str) -> scope::Marker { self.into_scope().marker(name) @@ -167,12 +226,14 @@ impl MetricInput for MetricAggregate { } /// Lookup or create a scoreboard for the requested metric. - fn define_metric(&self, kind: Kind, name: &str, _rate: Sampling) -> Aggregate { - self.metrics + fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, _rate: Sampling) -> Aggregate { + let ns = self.namespace.extend(source_ns); + self.inner .write() .expect("Locking aggregator") + .metrics .entry(name.to_string()) - .or_insert_with(|| Arc::new(Scoreboard::new(kind, name.to_string()))) + .or_insert_with(|| Arc::new(Scoreboard::new(ns, kind, name.to_string()))) .clone() } @@ -180,28 +241,53 @@ impl MetricInput for MetricAggregate { fn write(&self, metric: &Aggregate, value: Value) { metric.update(value) } + + fn with_suffix(&self, name: &str) -> Self { + MetricAggregator { + namespace: self.namespace.with_suffix(name), + inner: self.inner.clone(), + } + } + } -impl Flush for MetricAggregate { +//impl<'a> Index<&'a str> for MetricAggregator { +// type Output = Self; +// +// fn index(&self, index: &'a str) -> &Self::Output { +// &self.push(index) +// } +//} + +impl Flush for MetricAggregator { /// Collect and reset aggregated data. /// Publish statistics fn flush(&self) { - let default_publish_fn = DEFAULT_AGGREGATE_STATS.read().unwrap().clone(); - let publish_scope = get_aggregate_default().open_scope_object(); + let inner = self.inner.read().expect("Lock Aggregator"); - self.flush_to(publish_scope.as_ref(), default_publish_fn); + let stats_fn = match &inner.stats { + &Some(ref stats_fn) => stats_fn.clone(), + &None => DEFAULT_AGGREGATE_STATS.read().unwrap().clone(), + }; + + let pub_scope = match &inner.output { + &Some(ref out) => out.open_scope_object(), + &None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().open_scope_object(), + }; + + self.flush_to(pub_scope.as_ref(), stats_fn); // TODO parameterize whether to keep ad-hoc metrics after publish // source.cleanup(); - publish_scope.flush() + pub_scope.flush() } } -impl ScheduleFlush for MetricAggregate {} +impl ScheduleFlush for MetricAggregator {} -impl From for Arc { - fn from(metrics: MetricAggregate) -> Arc { - metrics.into() +impl From for Arc { + fn from(metrics: MetricAggregator) -> Arc { + Arc::new(metrics.into_scope()) } } @@ -265,33 +351,36 @@ pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s mod bench { use test; + use core::ROOT_NS; use core::Kind::{Counter, Marker}; - use aggregate::new_aggregate; + use aggregate::MetricAggregator; use scope::MetricInput; #[bench] fn aggregate_marker(b: &mut test::Bencher) { - let sink = new_aggregate(); - let metric = sink.define_metric(Marker, "event_a", 1.0); + let sink = MetricAggregator::new(); + let metric = sink.define_metric(&ROOT_NS, Marker, "event_a", 1.0); b.iter(|| test::black_box(sink.write(&metric, 1))); } #[bench] fn aggregate_counter(b: &mut test::Bencher) { - let sink = new_aggregate(); - let metric = sink.define_metric(Counter, "count_a", 1.0); + let sink = MetricAggregator::new(); + let metric = sink.define_metric(&ROOT_NS, Counter, "count_a", 1.0); b.iter(|| test::black_box(sink.write(&metric, 1))); } #[bench] fn reset_marker(b: &mut test::Bencher) { - let metric = new_aggregate().define_metric(Marker, "marker", 1.0); + let sink = MetricAggregator::new(); + let metric = sink.define_metric(&ROOT_NS, Marker, "marker", 1.0); b.iter(|| test::black_box(metric.reset())); } #[bench] fn reset_counter(b: &mut test::Bencher) { - let metric = new_aggregate().define_metric(Counter, "count_a", 1.0); + let sink = MetricAggregator::new(); + let metric = sink.define_metric(&ROOT_NS, Counter, "count_a", 1.0); b.iter(|| test::black_box(metric.reset())); } diff --git a/src/async_queue.rs b/src/async_queue.rs index 9d80bcc..9080480 100755 --- a/src/async_queue.rs +++ b/src/async_queue.rs @@ -11,7 +11,7 @@ use std::sync::mpsc; use std::thread; metrics!{ - DIPSTICK_METRICS.with_prefix("async_queue") => { + DIPSTICK_METRICS.with_suffix("async_queue") => { /// Maybe queue was full? Marker SEND_FAILED: "send_failed"; } diff --git a/src/cache.rs b/src/cache.rs index 68c7f3b..d2a26cf 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -23,7 +23,7 @@ where { let cache: RwLock> = RwLock::new(lru::LRUCache::with_capacity(cache_size)); - Arc::new(move |kind, name, rate| { + Arc::new(move |ns, kind, name, rate| { let mut cache = cache.write().expect("Locking metric cache"); let name_str = String::from(name); @@ -32,7 +32,7 @@ where return value.clone(); } - let new_value = (next)(kind, name, rate).clone(); + let new_value = (next)(ns, kind, name, rate).clone(); cache.insert(name_str, new_value.clone()); new_value }) diff --git a/src/core.rs b/src/core.rs index 97f1674..60bffff 100644 --- a/src/core.rs +++ b/src/core.rs @@ -8,6 +8,8 @@ use std::sync::Arc; use chrono::{Local, DateTime}; +use time; + // TODO define an 'AsValue' trait + impl for supported number types, then drop 'num' crate pub use num::ToPrimitive; @@ -20,19 +22,31 @@ pub type Value = u64; /// Wrapped so it may be changed safely later. pub struct TimeHandle(i64); -fn now_micros() -> i64 { +/// takes 250ns but works every time +pub fn accurate_clock_micros() -> i64 { let local: DateTime = Local::now(); let mut micros = local.timestamp() * 1_000_000; micros += local.timestamp_subsec_micros() as i64; micros } +/// takes 25ns but fails to advance time on occasion +pub fn fast_clock_micros() -> i64 { + (time::precise_time_ns() / 1000) as i64 +} + +// another quick way +//fn now_micros() -> i64 { +// let t = time::get_time(); +// (t.sec * 1_000_000) + (t.nsec as i64 / 1000) +//} + impl TimeHandle { /// Get a handle on current time. /// Used by the TimerMetric start_time() method. pub fn now() -> TimeHandle { - TimeHandle(now_micros()) + TimeHandle(fast_clock_micros()) } /// Get the elapsed time in microseconds since TimeHandle was obtained. @@ -46,17 +60,17 @@ impl TimeHandle { } } -impl From for TimeHandle { - fn from(s: usize) -> TimeHandle { - TimeHandle(s as i64) - } -} - -impl From for usize { - fn from(s: TimeHandle) -> usize { - s.0 as usize - } -} +//impl From for TimeHandle { +// fn from(s: usize) -> TimeHandle { +// TimeHandle(s as i64) +// } +//} +// +//impl From for usize { +// fn from(s: TimeHandle) -> usize { +// s.0 as usize +// } +//} /// Base type for sampling rate. /// - 1.0 records everything @@ -81,12 +95,98 @@ pub enum Kind { Timer, } +/// A namespace for metrics. +/// Does _not_ include the metric's "short" name itself. +/// Can be empty. +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct Namespace { + inner: Vec +} + +lazy_static! { + /// Root namespace contains no string parts. + pub static ref ROOT_NS: Namespace = Namespace { inner: vec![] }; +} + +//impl<'a> Index<&'a str> for Namespace { +// type Output = Self; +// +// /// Returns a copy of this namespace with the "index" appended to it. +// /// Returned reference should be dereferenceable: +// /// +// /// ``` +// /// let sub_ns = *ROOT_NS["sub_ns"]; +// /// ``` +// /// +// fn index(&self, index: &'a str) -> &Self::Output { +// let mut clone = self.inner.clone(); +// if !index.is_empty() { +// clone.push(index.into()); +// } +// &Namespace{ inner: clone } +// } +//} + +impl Namespace { + + /// Append name to the namespace, returning a modified copy. + pub fn with_suffix(&self, name: &str) -> Self { + let mut new = self.inner.clone(); + new.push(name.into()); + Namespace { inner: new } + } + + /// Returns a copy of this namespace with the second namespace appended. + /// Both original namespaces stay untouched. + pub fn extend(&self, names: &Namespace) -> Self { + Namespace { + inner: { + let mut new = self.inner.clone(); + new.extend_from_slice(&names.inner); + new + } + } + } + + /// Combine name parts into a string. + pub fn join(&self, name: &str, separator: &str) -> String { + if self.inner.is_empty() { + return name.into() + } + let mut buf = String::with_capacity(64); + for n in &self.inner { + buf.push_str(n.as_ref()); + buf.push_str(separator); + } + buf.push_str(name); + buf + } +} + +impl From<()> for Namespace { + fn from(_name: ()) -> Namespace { + ROOT_NS.clone() + } +} + +impl<'a> From<&'a str> for Namespace { + fn from(name: &'a str) -> Namespace { + ROOT_NS.with_suffix(name.as_ref()) + } +} + +impl From for Namespace { + fn from(name: String) -> Namespace { + ROOT_NS.with_suffix(name.as_ref()) + } +} + /// Dynamic metric definition function. /// Metrics can be defined from any thread, concurrently (Fn is Sync). /// The resulting metrics themselves can be also be safely shared across threads ( is Send + Sync). /// Concurrent usage of a metric is done using threaded scopes. /// Shared concurrent scopes may be provided by some backends (aggregate). -pub type DefineMetricFn = Arc M + Send + Sync>; +pub type DefineMetricFn = Arc M + Send + Sync>; /// A function trait that opens a new metric capture scope. pub type OpenScopeFn = Arc CommandFn + Send + Sync>; @@ -136,3 +236,79 @@ impl CommandFn { (self.inner)(Flush) } } + +//#[cfg(test)] +//mod test { +// use core::*; +// use test; +// use std::f64; +// +// const ITER: i64 = 5_000; +// const LOOP: i64 = 50000; +// +// // a retarded, dirty and generally incorrect tentative at jitter measurement +// fn jitter(clock: fn() -> i64) { +// let mut first = 0; +// let mut last = 0; +// let mut min = 999_000_000; +// let mut max = -8888888; +// let mut delta_sum = 0; +// let mut dev2_sum = 0; +// +// for i in 1..ITER { +// let ts = clock(); +// test::black_box(for _j in 0..LOOP {}); +// last = clock(); +// let delta = last - ts; +// +// delta_sum += delta; +// let mean = delta_sum / i; +// +// let dev2 = (delta - mean) ^ 2; +// dev2_sum += dev2; +// +// if delta > max { +// max = delta +// } +// if delta < min { +// min = delta +// } +// } +// +// println!("runt {}", last - first); +// println!("mean {}", delta_sum / ITER); +// println!("dev2 {}", (dev2_sum as f64).sqrt() / ITER as f64); +// println!("min {}", min); +// println!("max {}", max); +// } +// +// +// #[test] +// fn jitter_local_now() { +// jitter(|| super::slow_clock_micros()) +// } +// +// #[test] +// fn jitter_precise_time_ns() { +// jitter(|| super::imprecise_clock_micros()) +// } +// +//} + +#[cfg(feature = "bench")] +mod bench { + + use super::*; + use test; + + #[bench] + fn get_slow_time(b: &mut test::Bencher) { + b.iter(|| test::black_box(accurate_clock_micros())); + } + + #[bench] + fn get_imprecise_time(b: &mut test::Bencher) { + b.iter(|| test::black_box(fast_clock_micros())); + } + +} \ No newline at end of file diff --git a/src/dispatch.rs b/src/dispatch.rs index 2284327..1d65fae 100755 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -1,28 +1,21 @@ //! Decouple metric definition from configuration with trait objects. use core::*; -use namespace::*; use scope::{self, DefineMetric, MetricScope, WriteMetric, MetricInput, Flush, ScheduleFlush, NO_METRIC_SCOPE}; use std::collections::HashMap; use std::sync::{Arc, RwLock, Weak}; +//use std::ops::Index; use atomic_refcell::*; - lazy_static! { - static ref ROOT_DISPATCH: Arc> = Arc::new(RwLock::new( - InnerDispatch { - target: None, - parent: None, - metrics: HashMap::new(), - children: HashMap::new(), - } - )); + static ref ROOT_DISPATCH: Arc> = + Arc::new(RwLock::new(InnerDispatch::with_parent(None, ROOT_NS.clone()))); } -/// Get the default dispatch point. -pub fn dispatch() -> MetricDispatch { +/// Get the root dispatch point. +pub fn metric_dispatch() -> MetricDispatch { MetricDispatch { inner: ROOT_DISPATCH.clone() } } @@ -60,6 +53,7 @@ pub struct MetricDispatch { } struct InnerDispatch { + namespace: Namespace, target: Option>, metrics: HashMap>, parent: Option>>, @@ -69,14 +63,14 @@ struct InnerDispatch { /// Allow turning a 'static str into a Delegate, where str is the prefix. impl From<&'static str> for MetricScope { fn from(name: &'static str) -> MetricScope { - dispatch().into_scope().with_prefix(name) + metric_dispatch().into_scope().with_suffix(name) } } /// Allow turning a 'static str into a Delegate, where str is the prefix. impl From<()> for MetricScope { fn from(_: ()) -> MetricScope { - dispatch().into() + metric_dispatch().into() } } @@ -87,11 +81,22 @@ impl From for MetricScope { } impl InnerDispatch { - fn switch_scope(&mut self, target_scope: Arc) { + + fn with_parent(parent: Option>>, namespace: Namespace,) -> Self { + InnerDispatch { + namespace, + target: None, + parent, + metrics: HashMap::new(), + children: HashMap::new(), + } + } + + fn set_new_scope(&mut self, target_scope: Arc) { for mut metric in self.metrics.values() { if let Some(metric) = metric.upgrade() { let target_metric = target_scope - .define_metric_object(metric.kind, metric.name.as_ref(), metric.rate); + .define_metric_object(&self.namespace, metric.kind, metric.name.as_ref(), metric.rate); *metric.write_metric.borrow_mut() = target_metric; } } @@ -107,14 +112,14 @@ impl InnerDispatch { fn set_target(&mut self, target: Option>) { let new_scope = target.clone().unwrap_or_else(|| self.get_parent_target().unwrap_or(NO_METRIC_SCOPE.clone())); - self.switch_scope(new_scope); + self.set_new_scope(new_scope); self.target = target } fn parent_set_target(&mut self, target: Arc) { if self.target.is_none() { // overriding target from this point downward - self.switch_scope(target) + self.set_new_scope(target) } } @@ -122,13 +127,25 @@ impl InnerDispatch { } impl MetricDispatch { + + /// Create a new "private" metric dispatch root. This is usually not what you want. + /// Since this dispatch will not be part of the standard dispatch tree, + /// it will need to be configured independently and since downstream code may not know about + /// its existence this may never happen and metrics will not be dispatched anywhere. + /// If you want to use the standard dispatch tree, use #metric_dispatch() instead. + pub fn new() -> Self { + MetricDispatch { + inner: Arc::new(RwLock::new(InnerDispatch::with_parent(None, ROOT_NS.clone()))) + } + } + /// Replace target for this dispatch and it's children. pub fn set_target>>(&self, target: IS) { let mut inner = self.inner.write().expect("Dispatch Lock"); inner.set_target(Some(target.into())); } - /// Remove target. + /// Replace target for this dispatch and it's children. pub fn unset_target(&self) { let mut inner = self.inner.write().expect("Dispatch Lock"); inner.set_target(None); @@ -138,8 +155,9 @@ impl MetricDispatch { let disp_0 = self.clone(); let disp_1 = self.clone(); MetricScope::new( + self.inner.read().expect("Dispatch Lock").namespace.clone(), // define metric - Arc::new(move |kind, name, rate| disp_0.define_metric(kind, name, rate)), + Arc::new(move |ns, kind, name, rate| disp_0.define_metric(ns, kind, name, rate)), // write / flush metric command_fn(move |cmd| match cmd { Command::Write(metric, value) => { @@ -182,8 +200,8 @@ impl MetricInput for MetricDispatch { self.into_scope().gauge(name) } - /// Lookup or create a scoreboard for the requested metric. - fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> Dispatch { + /// Lookup or create a dispatch stub for the requested metric. + fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Dispatch { let mut inner = self.inner.write().expect("Dispatch Lock"); let target_scope = inner.target.clone().unwrap_or(NO_METRIC_SCOPE.clone()); inner @@ -191,7 +209,7 @@ impl MetricInput for MetricDispatch { .get(name) .and_then(|metric_ref| Weak::upgrade(metric_ref)) .unwrap_or_else(|| { - let metric_object = target_scope.define_metric_object(kind, name, rate); + let metric_object = target_scope.define_metric_object(source_ns, kind, name, rate); let define_metric = Arc::new(DispatchMetric { kind, name: name.to_string(), @@ -210,8 +228,31 @@ impl MetricInput for MetricDispatch { fn write(&self, metric: &Dispatch, value: Value) { metric.write_metric.borrow().write(value); } + + fn with_suffix(&self, name: &str) -> Self { + if name.is_empty() { + return self.clone() + } + let mut inner = self.inner.write().expect("Dispatch Lock"); + // FIXME namespace should be built only if required + let namespace = inner.namespace.with_suffix(name); + let child = inner.children.entry(name.to_string()) + .or_insert_with(|| Arc::new(RwLock::new(InnerDispatch::with_parent( + Some(self.inner.clone()), + namespace + )))).clone(); + MetricDispatch {inner: child} + } } +//impl<'a> Index<&'a str> for MetricDispatch { +// type Output = Self; +// +// fn index(&self, index: &'a str) -> &Self::Output { +// &self.push(index) +// } +//} + impl Flush for MetricDispatch { fn flush(&self) { if let Some(ref target) = self.inner.write().expect("Dispatch Lock").target { @@ -225,22 +266,23 @@ impl ScheduleFlush for MetricDispatch {} #[cfg(feature = "bench")] mod bench { - use dispatch::dispatch; + use dispatch::metric_dispatch; use test; - use aggregate::new_aggregate; + use aggregate::MetricAggregator; use scope::MetricInput; - #[bench] fn dispatch_marker_to_aggregate(b: &mut test::Bencher) { - dispatch().set_target(new_aggregate()); - let metric = dispatch().marker("event_a"); + println!("wewrwerwe"); + metric_dispatch().set_target(MetricAggregator::new()); + println!("sdfsdfsd"); + let metric = metric_dispatch().marker("event_a"); b.iter(|| test::black_box(metric.mark())); } #[bench] fn dispatch_marker_to_void(b: &mut test::Bencher) { - let metric = dispatch().marker("event_a"); + let metric = metric_dispatch().marker("event_a"); b.iter(|| test::black_box(metric.mark())); } diff --git a/src/graphite.rs b/src/graphite.rs index 7698f99..069c439 100755 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -15,7 +15,7 @@ use std::fmt::Debug; use socket::RetrySocket; metrics!{ - DIPSTICK_METRICS.with_prefix("graphite") => { + DIPSTICK_METRICS.with_suffix("graphite") => { Marker SEND_ERR: "send_failed"; Marker TRESHOLD_EXCEEDED: "bufsize_exceeded"; Counter SENT_BYTES: "sent_bytes"; @@ -46,7 +46,7 @@ where let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?)); Ok(metric_output( - move |kind, name, rate| graphite_metric(kind, name, rate), + move |ns, kind, name, rate| graphite_metric(ns, kind, name, rate), move || graphite_scope(&socket, false), )) } @@ -60,14 +60,13 @@ where let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?)); Ok(metric_output( - move |kind, name, rate| graphite_metric(kind, name, rate), + move |ns, kind, name, rate| graphite_metric(ns, kind, name, rate), move || graphite_scope(&socket, true), )) } -fn graphite_metric(kind: Kind, name: &str, rate: Sampling) -> Graphite { - let mut prefix = String::with_capacity(32); - prefix.push_str(name); +fn graphite_metric(namespace: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Graphite { + let mut prefix = namespace.join(name, "."); prefix.push(' '); let mut scale = match kind { @@ -196,7 +195,7 @@ mod bench { #[bench] pub fn timer_graphite(b: &mut test::Bencher) { let sd = to_graphite("localhost:8125").unwrap().open_scope(); - let timer = sd.define_metric(Kind::Timer, "timer", 1000000.0); + let timer = sd.define_metric(&ROOT_NS, Kind::Timer, "timer", 1000000.0); b.iter(|| test::black_box(sd.write(&timer, 2000))); } diff --git a/src/lib.rs b/src/lib.rs index 45f6af2..20acc16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,24 +20,24 @@ extern crate time; extern crate chrono; pub mod error; -pub use error::*; +pub use error::{Error, Result}; #[macro_use] pub mod macros; pub mod core; -pub use core::*; +pub use core::{Value, Sampling, FULL_SAMPLING_RATE, TimeHandle, Kind, ROOT_NS}; pub mod output; -pub use output::*; +pub use output::{MetricOutput, NO_METRIC_OUTPUT, OpenScope}; #[macro_use] pub mod dispatch; -pub use dispatch::*; +pub use dispatch::{MetricDispatch, Dispatch, metric_dispatch}; #[macro_use] mod aggregate; -pub use aggregate::*; +pub use aggregate::{MetricAggregator, Aggregate}; mod local; pub use local::*; @@ -54,9 +54,6 @@ pub use scores::*; mod statsd; pub use statsd::*; -mod namespace; -pub use namespace::*; - mod graphite; pub use graphite::*; diff --git a/src/local.rs b/src/local.rs index ac26ee7..f4c106c 100644 --- a/src/local.rs +++ b/src/local.rs @@ -7,7 +7,7 @@ use std::sync::RwLock; /// Write metric values to stdout using `println!`. pub fn to_stdout() -> MetricOutput { metric_output( - |_kind, name, _rate| String::from(name), + |ns, _kind, name, _rate| ns.join(name, "."), || { command_fn(|cmd| { if let Command::Write(m, v) = cmd { @@ -24,7 +24,7 @@ pub fn to_stdout() -> MetricOutput { /// If thread latency is a concern you may wish to also use #with_async_queue. pub fn to_buffered_stdout() -> MetricOutput { metric_output( - |_kind, name, _rate| String::from(name), + |ns, _kind, name, _rate| ns.join(name, "."), || { let buf = RwLock::new(String::new()); command_fn(move |cmd| { @@ -47,7 +47,7 @@ pub fn to_buffered_stdout() -> MetricOutput { // TODO parameterize log level pub fn to_log() -> MetricOutput { metric_output( - |_kind, name, _rate| String::from(name), + |ns, _kind, name, _rate| ns.join(name, "."), || { command_fn(|cmd| { if let Command::Write(m, v) = cmd { @@ -65,7 +65,7 @@ pub fn to_log() -> MetricOutput { // TODO parameterize log level pub fn to_buffered_log() -> MetricOutput { metric_output( - |_kind, name, _rate| String::from(name), + |ns, _kind, name, _rate| ns.join(name, "."), || { let buf = RwLock::new(String::new()); command_fn(move |cmd| { @@ -86,7 +86,7 @@ pub fn to_buffered_log() -> MetricOutput { /// Discard all metric values sent to it. pub fn to_void() -> MetricOutput<()> { - metric_output(move |_kind, _name, _rate| (), || command_fn(|_cmd| {})) + metric_output(move |_ns, _kind, _name, _rate| (), || command_fn(|_cmd| {})) } #[cfg(test)] @@ -97,21 +97,21 @@ mod test { #[test] fn sink_print() { let c = super::to_stdout().open_scope(); - let m = c.define_metric(Kind::Marker, "test", 1.0); + let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0); c.write(&m, 33); } #[test] fn test_to_log() { let c = super::to_log().open_scope(); - let m = c.define_metric(Kind::Marker, "test", 1.0); + let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0); c.write(&m, 33); } #[test] fn test_to_void() { let c = super::to_void().open_scope(); - let m = c.define_metric(Kind::Marker, "test", 1.0); + let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0); c.write(&m, 33); } diff --git a/src/macros.rs b/src/macros.rs index 70b8d83..cf990e6 100755 --- a/src/macros.rs +++ b/src/macros.rs @@ -261,7 +261,7 @@ macro_rules! mod_timer { mod test_app { use self_metrics::*; - metrics!( TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix");); + metrics!( TEST_METRICS = DIPSTICK_METRICS.with_suffix("test_prefix");); app_marker!( TEST_METRICS => { M1: "failed", diff --git a/src/multi.rs b/src/multi.rs index 94f016f..e1a3e18 100644 --- a/src/multi.rs +++ b/src/multi.rs @@ -22,10 +22,11 @@ where let scope1a = scope1.clone(); MetricScope::new( - Arc::new(move |kind, name, rate| { + ROOT_NS.clone(), + Arc::new(move |ns, kind, name, rate| { ( - scope0.define_metric(kind, name, rate), - scope1.define_metric(kind, name, rate), + scope0.define_metric(ns, kind, name, rate), + scope1.define_metric(ns, kind, name, rate), ) }), command_fn(move |cmd| match cmd { @@ -52,10 +53,11 @@ where let scopes2 = scopes.clone(); MetricScope::new( - Arc::new(move |kind, name, rate| { + ROOT_NS.clone(), + Arc::new(move |ns, kind, name, rate| { scopes .iter() - .map(|m| m.define_metric(kind, name, rate)) + .map(|m| m.define_metric(ns, kind, name, rate)) .collect() }), command_fn(move |cmd| match cmd { diff --git a/src/namespace.rs b/src/namespace.rs index b64cc6e..9ccd17b 100755 --- a/src/namespace.rs +++ b/src/namespace.rs @@ -2,44 +2,75 @@ use core::*; use std::sync::Arc; +use std::collections::HashMap; const DEFAULT_SEPARATOR: &'static str = "."; -/// A list of parts of a metric's name. +/// A namespace for metrics. +/// Does _not_ include the metric's "short" name itself. +/// Can be empty. #[derive(Debug, Clone)] -pub struct Namespace(Vec); +pub struct Namespace { + inner: Vec +} impl Namespace { - /// Make this namespace a subspace of the parent. - pub fn subspace_of(self, parent: &Namespace) -> Self { - Namespace([parent.0.clone(), self.0].concat()) + + pub fn split_first(&self) -> Option<(&String, &[String])> { + self.inner.split_first() + } + + pub fn with_suffix(&self, names: &Namespace) -> Self { + Namespace { inner: self.inner.clone().extend(names) } } /// Combine name parts into a string. pub fn join(&self, separator: &str) -> String { - self.0.join(separator) + self.inner.join(separator) } } impl<'a> From<&'a str> for Namespace { fn from(name: &'a str) -> Namespace { - Namespace(vec![name.to_string()]) + Namespace { inner: vec![name.to_string()] } } } impl From for Namespace { fn from(name: String) -> Namespace { - Namespace(vec![name]) + Namespace { inner: vec![name] } } } -impl<'a, 'b: 'a> From<&'b [&'a str]> for Namespace { - fn from(names: &'a [&'a str]) -> Namespace { - Namespace(names.iter().map(|n| n.to_string()).collect()) - } +pub trait Registry { + fn with_prefix(&self, prefix: &str) -> Self; + + +// fn parent(&self) -> Option<&Registry>; +// +// fn namespace(&self) -> &Namespace; +// +// fn children(&mut self) -> &mut HashMap; +// +// fn create_children(parent: R, name: String) -> Self; + +// fn with_names(&mut self, namespace: Namespace) -> Self { +// +// let namespace = &names.into(); +// let (first, rest) = namespace.split_first(); +// // recursively find or create children for every namespace component +// first.map(|f| { +// f.with_pre +// Self::make_new(self.children().entry(*first) +// .or_insert_with(|| InnerDispatch::with_parent(Some(self.inner.clone()))) +// .clone() +// ).with_name(rest) +// }).unwrap_or_else(self.clone()) +// +// } } -/// Prepend metric names with custom prefix. +///// Prepend metric names with custom prefix. pub trait WithNamespace where Self: Sized, diff --git a/src/output.rs b/src/output.rs index 3209469..84077c3 100755 --- a/src/output.rs +++ b/src/output.rs @@ -3,8 +3,8 @@ use core::*; use scope::MetricScope; -use namespace::{add_namespace, Namespace, WithNamespace}; use std::sync::Arc; +use std::fmt::Debug; use scope::DefineMetric; use local; @@ -15,7 +15,7 @@ lazy_static! { } /// Wrap a MetricConfig in a non-generic trait. -pub trait OpenScope { +pub trait OpenScope: Debug { /// Open a new metrics scope fn open_scope_object(&self) -> Arc; } @@ -25,6 +25,8 @@ pub trait OpenScope { #[derive(Derivative, Clone)] #[derivative(Debug)] pub struct MetricOutput { + namespace: Namespace, + #[derivative(Debug = "ignore")] define_metric_fn: DefineMetricFn, @@ -44,17 +46,18 @@ impl MetricOutput { /// ``` /// pub fn open_scope(&self) -> MetricScope { - MetricScope::new(self.define_metric_fn.clone(), (self.open_scope_fn)()) + MetricScope::new(self.namespace.clone(), self.define_metric_fn.clone(), (self.open_scope_fn)()) } } /// Create a new metric chain with the provided metric definition and scope creation functions. pub fn metric_output(define_fn: MF, open_scope_fn: WF) -> MetricOutput where - MF: Fn(Kind, &str, Sampling) -> M + Send + Sync + 'static, + MF: Fn(&Namespace, Kind, &str, Sampling) -> M + Send + Sync + 'static, WF: Fn() -> CommandFn + Send + Sync + 'static, { MetricOutput { + namespace: ().into(), define_metric_fn: Arc::new(define_fn), open_scope_fn: Arc::new(open_scope_fn), } @@ -70,6 +73,7 @@ impl MetricOutput { let (define_metric_fn, open_scope_fn) = mod_fn(self.define_metric_fn.clone(), self.open_scope_fn.clone()); MetricOutput { + namespace: self.namespace.clone(), define_metric_fn, open_scope_fn, } @@ -81,12 +85,31 @@ impl MetricOutput { MF: Fn(OpenScopeFn) -> OpenScopeFn, { MetricOutput { + namespace: self.namespace.clone(), define_metric_fn: self.define_metric_fn.clone(), open_scope_fn: mod_fn(self.open_scope_fn.clone()), } } + + /// Return a copy of this output with the specified name appended to the namespace. + pub fn with_suffix(&self, name: &str) -> Self { + MetricOutput { + namespace: self.namespace.with_suffix(name), + define_metric_fn: self.define_metric_fn.clone(), + open_scope_fn: self.open_scope_fn.clone(), + } + } + } +//impl<'a, M: Send + Sync + Clone + 'static> Index<&'a str> for MetricOutput { +// type Output = Self; +// +// fn index(&self, index: &'a str) -> &Self::Output { +// &self.push(index) +// } +//} + impl OpenScope for MetricOutput { fn open_scope_object(&self) -> Arc { Arc::new(self.open_scope()) @@ -105,12 +128,3 @@ impl From> for Arc WithNamespace for MetricOutput { - fn with_name>(&self, names: IN) -> Self { - let ref ninto = names.into(); - MetricOutput { - define_metric_fn: add_namespace(ninto, self.define_metric_fn.clone()), - open_scope_fn: self.open_scope_fn.clone(), - } - } -} diff --git a/src/sample.rs b/src/sample.rs index 549d0ae..939241c 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -20,7 +20,7 @@ impl WithSamplingRate for MetricOutput { self.wrap_all(|metric_fn, scope_fn| { ( - Arc::new(move |kind, name, rate| { + Arc::new(move |ns, kind, name, rate| { // TODO override only if FULL_SAMPLING else warn!() if rate != FULL_SAMPLING_RATE { info!( @@ -30,7 +30,7 @@ impl WithSamplingRate for MetricOutput { } let new_rate = sampling_rate * rate; - metric_fn(kind, name, new_rate) + metric_fn(ns, kind, name, new_rate) }), Arc::new(move || { let next_scope = scope_fn(); diff --git a/src/scope.rs b/src/scope.rs index 6818129..ec61508 100755 --- a/src/scope.rs +++ b/src/scope.rs @@ -9,7 +9,6 @@ //! use core::*; use core::Kind::*; -use namespace::*; use cache::*; use schedule::{schedule, CancelHandle}; use output; @@ -31,7 +30,7 @@ pub trait DefineMetric: Flush { /// Register a new metric. /// Only one metric of a certain name will be defined. /// Observer must return a MetricHandle that uniquely identifies the metric. - fn define_metric_object(&self, kind: Kind, name: &str, rate: Sampling) + fn define_metric_object(&self, namespace: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Box; } @@ -186,6 +185,7 @@ pub type AppTimer = Timer; /// Variations of this should also provide control of the metric recording scope. #[derive(Derivative, Clone)] pub struct MetricScope { + namespace: Namespace, flush_on_drop: bool, #[derivative(Debug = "ignore")] define_fn: DefineMetricFn, @@ -195,8 +195,9 @@ pub struct MetricScope { impl MetricScope { /// Create new application metrics instance. - pub fn new(define_metric_fn: DefineMetricFn, scope: CommandFn) -> Self { + pub fn new(namespace: Namespace, define_metric_fn: DefineMetricFn, scope: CommandFn) -> Self { MetricScope { + namespace, flush_on_drop: true, define_fn: define_metric_fn, command_fn: scope, @@ -204,12 +205,12 @@ impl MetricScope { } } -fn write_fn + Clone + Send + Sync + 'static>(scope: &D, kind: Kind, name: &str) -> WriteFn +fn scope_write_fn + Clone + Send + Sync + 'static>(scope: &D, kind: Kind, name: &str) -> WriteFn where M: Clone + Send + Sync + 'static, { let scope = scope.clone(); - let metric = scope.define_metric(kind, name, 1.0); + let metric = scope.define_metric(&ROOT_NS, kind, name, 1.0); Arc::new(move |value| scope.write(&metric, value)) } @@ -219,30 +220,32 @@ pub trait MetricInput: Clone + Flush + Send + Sync + 'static M: Clone + Send + Sync + 'static, { /// Define an event counter of the provided name. - fn marker(&self, name: &str) -> Marker { - Marker { write: write_fn(self, Marker, name) } - } + fn marker(&self, name: &str) -> Marker; /// Define a counter of the provided name. - fn counter(&self, name: &str) -> Counter { - Counter { write: write_fn(self, Counter, name) } - } + fn counter(&self, name: &str) -> Counter; /// Define a timer of the provided name. - fn timer(&self, name: &str) -> Timer { - Timer { write: write_fn(self, Timer, name) } - } + fn timer(&self, name: &str) -> Timer; /// Define a gauge of the provided name. - fn gauge(&self, name: &str) -> Gauge { - Gauge { write: write_fn(self, Gauge, name) } - } + fn gauge(&self, name: &str) -> Gauge; /// Define a metric of the specified type. - fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> M; + fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> M; /// Record or send a value for a previously defined metric. fn write(&self, metric: &M, value: Value); + + /// Join namespace and prepend in newly defined metrics. + #[deprecated(since = "0.7.0", note = "Misleading terminology, use with_suffix() instead.")] + fn with_prefix(&self, name: &str) -> Self { + self.with_suffix(name) + } + + /// Join namespace and prepend in newly defined metrics. + fn with_suffix(&self, name: &str) -> Self; + } /// Scopes can implement buffering, requiring flush operations to commit metric values. @@ -256,13 +259,44 @@ impl MetricInput for MetricScope where M: Clone + Send + Sync + 'static, { - fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> M { - (self.define_fn)(kind, name, rate) + + /// Define an event counter of the provided name. + fn marker(&self, name: &str) -> Marker { + Marker { write: scope_write_fn(self, Marker, name) } + } + + /// Define a counter of the provided name. + fn counter(&self, name: &str) -> Counter { + Counter { write: scope_write_fn(self, Counter, name) } + } + + /// Define a timer of the provided name. + fn timer(&self, name: &str) -> Timer { + Timer { write: scope_write_fn(self, Timer, name) } + } + + /// Define a gauge of the provided name. + fn gauge(&self, name: &str) -> Gauge { + Gauge { write: scope_write_fn(self, Gauge, name) } + } + + fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> M { + (self.define_fn)(source_ns, kind, name, rate) } fn write(&self, metric: &M, value: Value) { self.command_fn.write(metric, value); } + + fn with_suffix(&self, name: &str) -> Self { + MetricScope { + namespace: self.namespace.with_suffix(name), + flush_on_drop: self.flush_on_drop, + define_fn: self.define_fn.clone(), + command_fn: self.command_fn.clone(), + } + } + } /// Scopes can implement buffering, in which case they can be flushed. @@ -304,11 +338,11 @@ struct MetricWriter { } impl DefineMetric for MetricScope { - fn define_metric_object(&self, kind: Kind, name: &str, rate: Sampling) + fn define_metric_object(&self, namespace: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Box { Box::new(MetricWriter { - define_fn: self.define_metric(kind, name, rate), + define_fn: self.define_metric(namespace, kind, name, rate), command_fn: self.command_fn.clone(), }) } @@ -322,20 +356,10 @@ impl WriteMetric for MetricWriter { //// Mutators impl -impl WithNamespace for MetricScope { - fn with_name>(&self, names: IN) -> Self { - let ns = &names.into(); - MetricScope { - flush_on_drop: self.flush_on_drop, - define_fn: add_namespace(ns, self.define_fn.clone()), - command_fn: self.command_fn.clone(), - } - } -} - impl WithCache for MetricScope { fn with_cache(&self, cache_size: usize) -> Self { MetricScope { + namespace: self.namespace.clone(), flush_on_drop: self.flush_on_drop, define_fn: add_cache(cache_size, self.define_fn.clone()), command_fn: self.command_fn.clone(), @@ -351,7 +375,7 @@ mod bench { #[bench] fn time_bench_direct_dispatch_event(b: &mut test::Bencher) { - let metrics = new_aggregate(); + let metrics = MetricAggregator::new(); let marker = metrics.marker("aaa"); b.iter(|| test::black_box(marker.mark())); } diff --git a/src/scores.rs b/src/scores.rs index b503305..e20ab7a 100644 --- a/src/scores.rs +++ b/src/scores.rs @@ -33,6 +33,8 @@ pub type ScoreSnapshot = (Kind, String, Vec); /// Some fields are kept public to ease publishing. #[derive(Debug)] pub struct Scoreboard { + namespace: Namespace, + /// The kind of metric. kind: Kind, @@ -44,11 +46,12 @@ pub struct Scoreboard { impl Scoreboard { /// Create a new Scoreboard to track summary values of a metric - pub fn new(kind: Kind, name: String) -> Self { + pub fn new(namespace: Namespace, kind: Kind, name: String) -> Self { Scoreboard { + namespace, kind, name, - scores: unsafe { mem::transmute(Scoreboard::blank(TimeHandle::now().into())) }, + scores: unsafe { mem::transmute(Scoreboard::blank(accurate_clock_micros() as usize)) }, } } @@ -92,7 +95,7 @@ impl Scoreboard { /// Map raw scores (if any) to applicable statistics pub fn reset(&self) -> Option { - let now: usize = TimeHandle::now().into(); + let now: usize = accurate_clock_micros() as usize; let mut scores = Scoreboard::blank(now); if self.snapshot(now, &mut scores) { let duration_seconds = (now - scores[0]) as f64 / 1_000.0; @@ -108,13 +111,24 @@ impl Scoreboard { snapshot.push(Min(scores[4] as u64)); snapshot.push(Mean(scores[2] as f64 / scores[1] as f64)); } - Timer | Counter => { + Timer => { snapshot.push(Count(scores[1] as u64)); snapshot.push(Sum(scores[2] as u64)); snapshot.push(Max(scores[3] as u64)); snapshot.push(Min(scores[4] as u64)); snapshot.push(Mean(scores[2] as f64 / scores[1] as f64)); + // timer rate uses the COUNT of timer calls per second (not SUM) + snapshot.push(Rate(scores[1] as f64 / duration_seconds)) + } + Counter => { + snapshot.push(Count(scores[1] as u64)); + snapshot.push(Sum(scores[2] as u64)); + + snapshot.push(Max(scores[3] as u64)); + snapshot.push(Min(scores[4] as u64)); + snapshot.push(Mean(scores[2] as f64 / scores[1] as f64)); + // counter rate uses the SUM of values per second (e.g. to get bytes/s) snapshot.push(Rate(scores[2] as f64 / duration_seconds)) } } @@ -147,19 +161,19 @@ mod bench { #[bench] fn bench_score_update_marker(b: &mut test::Bencher) { - let metric = Scoreboard::new(Marker, "event_a".to_string()); + let metric = Scoreboard::new(ROOT_NS.clone(), Marker, "event_a".to_string()); b.iter(|| test::black_box(metric.update(1))); } #[bench] fn bench_score_update_count(b: &mut test::Bencher) { - let metric = Scoreboard::new(Counter, "event_a".to_string()); + let metric = Scoreboard::new(ROOT_NS.clone(), Counter, "event_a".to_string()); b.iter(|| test::black_box(metric.update(4))); } #[bench] fn bench_score_empty_snapshot(b: &mut test::Bencher) { - let metric = Scoreboard::new(Counter, "event_a".to_string()); + let metric = Scoreboard::new(ROOT_NS.clone(), Counter, "event_a".to_string()); let mut scores = Scoreboard::blank(0); b.iter(|| test::black_box(metric.snapshot(0, &mut scores))); } diff --git a/src/self_metrics.rs b/src/self_metrics.rs index bab452e..ffa3bff 100644 --- a/src/self_metrics.rs +++ b/src/self_metrics.rs @@ -6,6 +6,5 @@ pub use core::*; pub use scope::*; pub use aggregate::*; -pub use namespace::*; metrics!( pub DIPSTICK_METRICS = "dipstick"); diff --git a/src/statsd.rs b/src/statsd.rs index 8404c5b..e36e826 100755 --- a/src/statsd.rs +++ b/src/statsd.rs @@ -11,7 +11,7 @@ use std::sync::{Arc, RwLock}; pub use std::net::ToSocketAddrs; metrics! { - DIPSTICK_METRICS.with_prefix("statsd") => { + DIPSTICK_METRICS.with_suffix("statsd") => { Marker SEND_ERR: "send_failed"; Counter SENT_BYTES: "sent_bytes"; } @@ -30,9 +30,8 @@ where let buffered = false; Ok(metric_output( - move |kind, name, rate| { - let mut prefix = String::with_capacity(32); - prefix.push_str(name); + move |namespace, kind, name, rate| { + let mut prefix = namespace.join(name, "."); prefix.push(':'); let mut suffix = String::with_capacity(16); @@ -160,7 +159,7 @@ mod bench { #[bench] pub fn timer_statsd(b: &mut test::Bencher) { let sd = to_statsd("localhost:8125").unwrap().open_scope(); - let timer = sd.define_metric(Kind::Timer, "timer", 1000000.0); + let timer = sd.define_metric(&ROOT_NS, Kind::Timer, "timer", 1000000.0); b.iter(|| test::black_box(sd.write(&timer, 2000))); }