From ea812eb9a02d6c9e9bb2ce577d6e7033c91c40e0 Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Fri, 26 Oct 2018 01:20:47 +0000 Subject: [PATCH] superdoc --- README.md | 2 +- examples/basics.rs | 2 +- examples/bench_bucket.rs | 2 +- examples/bench_bucket_proxy.rs | 2 +- examples/bench_queue.rs | 2 +- examples/bucket2graphite.rs | 6 +- examples/bucket2stdout.rs | 4 +- examples/bucket_cleanup.rs | 4 +- examples/bucket_summary.rs | 4 +- examples/cache.rs | 2 +- examples/clopwizard.rs | 12 +- examples/custom_publish.rs | 16 +- examples/graphite.rs | 2 +- examples/multi_input.rs | 8 +- examples/multi_output.rs | 10 +- examples/proxy.rs | 8 +- examples/raw_log.rs | 2 +- examples/statsd_nosampling.rs | 3 +- examples/statsd_sampling.rs | 3 +- examples/text_format_label.rs | 9 +- handbook/01_basics.md | 20 +- handbook/02_inputs.md | 26 +- src/aggregate/mod.rs | 2 - src/aggregate/scores.rs | 174 ----------- src/{aggregate/bucket.rs => bucket/atomic.rs} | 285 ++++++++++++------ src/bucket/mod.rs | 84 ++++++ src/cache/cache_in.rs | 14 +- src/cache/cache_out.rs | 14 +- src/cache/lru_cache.rs | 6 +- src/core/attributes.rs | 26 +- src/core/clock.rs | 8 +- src/core/input.rs | 40 +-- src/core/metrics.rs | 2 +- src/core/mod.rs | 9 +- src/core/name.rs | 18 +- src/core/out_lock.rs | 12 +- src/core/output.rs | 16 +- src/core/proxy.rs | 26 +- src/core/void.rs | 6 +- src/lib.rs | 14 +- src/macros.rs | 8 +- src/multi/multi_in.rs | 10 +- src/multi/multi_out.rs | 10 +- src/output/format.rs | 26 +- src/output/graphite.rs | 20 +- src/output/log.rs | 12 +- src/output/map.rs | 12 +- src/output/prometheus.rs | 12 +- src/output/statsd.rs | 36 +-- src/output/stream.rs | 14 +- src/queue/queue_in.rs | 16 +- src/queue/queue_out.rs | 18 +- 52 files changed, 564 insertions(+), 535 deletions(-) delete mode 100755 src/aggregate/mod.rs delete mode 100755 src/aggregate/scores.rs rename src/{aggregate/bucket.rs => bucket/atomic.rs} (52%) create mode 100755 src/bucket/mod.rs diff --git a/README.md b/README.md index 092de1a..9d205df 100755 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ minimal impact on applications and a choice of output to downstream systems. Dipstick is a toolkit to help all sorts of application collect and send out metrics. As such, it needs a bit of set up to suit one's needs. -Skimming through the handbook [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook) +Skimming through the [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook) should help you get an idea of the possible configurations. In short, dipstick-enabled apps _can_: diff --git a/examples/basics.rs b/examples/basics.rs index 599036d..ce8672d 100644 --- a/examples/basics.rs +++ b/examples/basics.rs @@ -20,7 +20,7 @@ fn main() { app_metrics.counter("just_once").count(4); // metric names can be prepended with a common prefix - let prefixed_metrics = app_metrics.add_naming("subsystem"); + let prefixed_metrics = app_metrics.add_prefix("subsystem"); let event = prefixed_metrics.marker("event_c"); let gauge = prefixed_metrics.gauge("gauge_d"); diff --git a/examples/bench_bucket.rs b/examples/bench_bucket.rs index 6019742..84ae9c6 100755 --- a/examples/bench_bucket.rs +++ b/examples/bench_bucket.rs @@ -11,7 +11,7 @@ use std::env::args; use std::str::FromStr; fn main() { - let bucket = Bucket::new(); + let bucket = AtomicBucket::new(); let event = bucket.marker("a"); let args = &mut args(); args.next(); diff --git a/examples/bench_bucket_proxy.rs b/examples/bench_bucket_proxy.rs index 4c7d94c..a44858e 100755 --- a/examples/bench_bucket_proxy.rs +++ b/examples/bench_bucket_proxy.rs @@ -13,7 +13,7 @@ use std::str::FromStr; fn main() { let event = Proxy::default().marker("a"); - let bucket = Bucket::new(); + let bucket = AtomicBucket::new(); Proxy::default().set_target(bucket.clone()); diff --git a/examples/bench_queue.rs b/examples/bench_queue.rs index dd06998..4fa9395 100755 --- a/examples/bench_queue.rs +++ b/examples/bench_queue.rs @@ -11,7 +11,7 @@ use std::env::args; use std::str::FromStr; fn main() { - let bucket = Bucket::new(); + let bucket = AtomicBucket::new(); let queue = InputQueueScope::wrap(bucket.clone(), 10000); let event = queue.marker("a"); let args = &mut args(); diff --git a/examples/bucket2graphite.rs b/examples/bucket2graphite.rs index d18bd96..3c4960c 100755 --- a/examples/bucket2graphite.rs +++ b/examples/bucket2graphite.rs @@ -7,11 +7,11 @@ use std::time::Duration; use dipstick::*; fn main() { - let bucket = Bucket::new().add_naming("test"); + let bucket = AtomicBucket::new().add_prefix("test"); // Bucket::set_default_output(to_stdout()); - bucket.set_target(Graphite::send_to("localhost:2003").expect("Socket") - .add_naming("machine1").add_naming("application")); + bucket.set_flush_target(Graphite::send_to("localhost:2003").expect("Socket") + .add_prefix("machine1").add_prefix("application")); bucket.flush_every(Duration::from_secs(3)); diff --git a/examples/bucket2stdout.rs b/examples/bucket2stdout.rs index ab4fdd8..62c0275 100755 --- a/examples/bucket2stdout.rs +++ b/examples/bucket2stdout.rs @@ -8,10 +8,10 @@ use std::io; use dipstick::*; fn main() { - let metrics = Bucket::new().add_naming("test"); + let metrics = AtomicBucket::new().add_prefix("test"); // Bucket::set_default_output(to_stdout()); - metrics.set_target(Stream::write_to(io::stdout())); + metrics.set_flush_target(Stream::write_to(io::stdout())); metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/bucket_cleanup.rs b/examples/bucket_cleanup.rs index e2865eb..226051a 100755 --- a/examples/bucket_cleanup.rs +++ b/examples/bucket_cleanup.rs @@ -10,8 +10,8 @@ use std::thread::sleep; fn main() { - let bucket = Bucket::new(); - Bucket::set_default_target(Stream::write_to(io::stdout())); + let bucket = AtomicBucket::new(); + AtomicBucket::set_default_target(Stream::write_to(io::stdout())); let persistent_marker = bucket.marker("persistent"); diff --git a/examples/bucket_summary.rs b/examples/bucket_summary.rs index eb48d7d..1751bde 100644 --- a/examples/bucket_summary.rs +++ b/examples/bucket_summary.rs @@ -9,8 +9,8 @@ use std::io; fn main() { - let app_metrics = Bucket::new(); - app_metrics.set_target(Stream::write_to(io::stdout())); + let app_metrics = AtomicBucket::new(); + app_metrics.set_flush_target(Stream::write_to(io::stdout())); app_metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/cache.rs b/examples/cache.rs index a3a3670..8b33bb1 100755 --- a/examples/cache.rs +++ b/examples/cache.rs @@ -8,7 +8,7 @@ use std::io; use dipstick::*; fn main() { - let metrics = Stream::write_to(io::stdout()).cached(5).input().add_naming("cache"); + let metrics = Stream::write_to(io::stdout()).cached(5).input().add_prefix("cache"); loop { // report some ad-hoc metric values from our "application" loop diff --git a/examples/clopwizard.rs b/examples/clopwizard.rs index 7dc2e4d..50d4540 100644 --- a/examples/clopwizard.rs +++ b/examples/clopwizard.rs @@ -17,25 +17,25 @@ metrics!{ fn main() { - let one_minute = Bucket::new(); + let one_minute = AtomicBucket::new(); one_minute.flush_every(Duration::from_secs(60)); - let five_minutes = Bucket::new(); + let five_minutes = AtomicBucket::new(); five_minutes.flush_every(Duration::from_secs(300)); - let fifteen_minutes = Bucket::new(); + let fifteen_minutes = AtomicBucket::new(); fifteen_minutes.flush_every(Duration::from_secs(900)); let all_buckets = MultiInputScope::new() .add_target(one_minute) .add_target(five_minutes) .add_target(fifteen_minutes) - .add_naming("machine_name"); + .add_prefix("machine_name"); // send application metrics to aggregator Proxy::default().set_target(all_buckets); - Bucket::set_default_target(Stream::write_to(io::stdout())); - Bucket::set_default_stats(stats_all); + AtomicBucket::set_default_target(Stream::write_to(io::stdout())); + AtomicBucket::set_default_stats(stats_all); loop { COUNTER.count(17); diff --git a/examples/custom_publish.rs b/examples/custom_publish.rs index 0b827f7..98bb833 100755 --- a/examples/custom_publish.rs +++ b/examples/custom_publish.rs @@ -8,19 +8,19 @@ use dipstick::*; fn main() { fn custom_statistics( - kind: Kind, - mut name: Name, + kind: InputKind, + mut name: MetricName, score: ScoreType, - ) -> Option<(Kind, Name, Value)> { + ) -> Option<(InputKind, MetricName, MetricValue)> { match (kind, score) { // do not export gauge scores - (Kind::Gauge, _) => None, + (InputKind::Gauge, _) => None, // prepend and append to metric name (_, ScoreType::Count(count)) => { if let Some(last) = name.pop_back() { Some(( - Kind::Counter, + InputKind::Counter, name.append("customized_add_prefix") .append(format!("{}_and_a_suffix", last)), count, @@ -42,10 +42,10 @@ fn main() { } // send application metrics to aggregator - Bucket::set_default_target(Stream::stderr()); - Bucket::set_default_stats(custom_statistics); + AtomicBucket::set_default_target(Stream::stderr()); + AtomicBucket::set_default_stats(custom_statistics); - let app_metrics = Bucket::new(); + let app_metrics = AtomicBucket::new(); // schedule aggregated metrics to be printed every 3 seconds app_metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/graphite.rs b/examples/graphite.rs index 617de8c..9993d40 100644 --- a/examples/graphite.rs +++ b/examples/graphite.rs @@ -10,7 +10,7 @@ fn main() { let metrics = Graphite::send_to("localhost:2003") .expect("Connected") - .add_naming("my_app") + .add_prefix("my_app") .input(); loop { diff --git a/examples/multi_input.rs b/examples/multi_input.rs index c308db6..35476db 100644 --- a/examples/multi_input.rs +++ b/examples/multi_input.rs @@ -2,7 +2,7 @@ extern crate dipstick; -use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Naming}; +use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Prefixed}; use std::time::Duration; use std::io; @@ -15,9 +15,9 @@ fn main() { // will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix. let same_type_metrics = MultiInput::input() - .add_target(Stream::write_to(io::stdout()).add_naming("yeah")) - .add_target(Stream::write_to(io::stdout()).add_naming("ouch")) - .add_naming("cool") + .add_target(Stream::write_to(io::stdout()).add_prefix("yeah")) + .add_target(Stream::write_to(io::stdout()).add_prefix("ouch")) + .add_prefix("cool") .input(); loop { diff --git a/examples/multi_output.rs b/examples/multi_output.rs index a6d2dfe..ecff6db 100644 --- a/examples/multi_output.rs +++ b/examples/multi_output.rs @@ -15,13 +15,13 @@ fn main() { // will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix. let same_type_metrics = MultiOutput::output() - .add_target(Stream::write_to(io::stderr()).add_naming("out_1")) - .add_target(Stream::write_to(io::stderr()).add_naming("out_2")) - .add_naming("out_both").input(); + .add_target(Stream::write_to(io::stderr()).add_prefix("out_1")) + .add_target(Stream::write_to(io::stderr()).add_prefix("out_2")) + .add_prefix("out_both").input(); loop { - different_type_metrics.new_metric("counter_a".into(), Kind::Counter).write(123, labels![]); - same_type_metrics.new_metric("timer_a".into(), Kind::Timer).write(6677, labels![]); + different_type_metrics.new_metric("counter_a".into(), InputKind::Counter).write(123, labels![]); + same_type_metrics.new_metric("timer_a".into(), InputKind::Timer).write(6677, labels![]); std::thread::sleep(Duration::from_millis(400)); } } diff --git a/examples/proxy.rs b/examples/proxy.rs index 268fd9d..ec63778 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -4,12 +4,12 @@ extern crate dipstick; use std::thread::sleep; use std::time::Duration; -use dipstick::{Proxy, Stream, InputScope, Input, Naming}; +use dipstick::{Proxy, Stream, InputScope, Input, Prefixed}; fn main() { let root_proxy = Proxy::default(); - let sub = root_proxy.add_naming("sub"); + let sub = root_proxy.add_prefix("sub"); let count1 = root_proxy.counter("counter_a"); @@ -22,12 +22,12 @@ fn main() { count2.count(2); // route every metric from the root to stdout with prefix "root" - root_proxy.set_target(stdout.add_naming("root")); + root_proxy.set_target(stdout.add_prefix("root")); count1.count(3); count2.count(4); // route metrics from "sub" to stdout with prefix "mutant" - sub.set_target(stdout.add_naming("mutant")); + sub.set_target(stdout.add_prefix("mutant")); count1.count(5); count2.count(6); diff --git a/examples/raw_log.rs b/examples/raw_log.rs index 94467ea..9f9c084 100755 --- a/examples/raw_log.rs +++ b/examples/raw_log.rs @@ -17,7 +17,7 @@ pub fn raw_write() { // define and send metrics using raw channel API let counter = metrics_log.new_metric( "count_a".into(), - dipstick::Kind::Counter, + dipstick::InputKind::Counter, ); counter.write(1, labels![]); } diff --git a/examples/statsd_nosampling.rs b/examples/statsd_nosampling.rs index d6f9304..64ab1e6 100644 --- a/examples/statsd_nosampling.rs +++ b/examples/statsd_nosampling.rs @@ -1,6 +1,5 @@ //! A sample application sending ad-hoc counter values both to statsd _and_ to stdout. -//extern crate badlog; extern crate dipstick; use dipstick::*; @@ -11,7 +10,7 @@ fn main() { Statsd::send_to("localhost:8125") .expect("Connected") // .with_sampling(Sampling::Random(0.2)) - .add_naming("my_app") + .add_prefix("my_app") .input(); let counter = metrics.counter("counter_a"); diff --git a/examples/statsd_sampling.rs b/examples/statsd_sampling.rs index 03c408f..3946611 100755 --- a/examples/statsd_sampling.rs +++ b/examples/statsd_sampling.rs @@ -1,6 +1,5 @@ //! A sample application sending ad-hoc counter values both to statsd _and_ to stdout. -//extern crate badlog; extern crate dipstick; use dipstick::*; @@ -11,7 +10,7 @@ fn main() { Statsd::send_to("localhost:8125") .expect("Connected") .sampled(Sampling::Random(0.2)) - .add_naming("my_app") + .add_prefix("my_app") .input(); let counter = metrics.counter("counter_a"); diff --git a/examples/text_format_label.rs b/examples/text_format_label.rs index d96e5d7..b3956a2 100644 --- a/examples/text_format_label.rs +++ b/examples/text_format_label.rs @@ -1,18 +1,19 @@ -//! A sample application asynchronously printing metrics to stdout. +//! Print metrics to stderr with custom formatter including a label. extern crate dipstick; use std::thread::sleep; use std::time::Duration; use dipstick::{Stream, InputScope, Input, Formatting, AppLabel, - Name, Kind, LineTemplate, LineFormat, LineOp, LabelOp}; + MetricName, InputKind, LineTemplate, LineFormat, LineOp, LabelOp}; +/// Generates template like "$METRIC $value $label_value["abc"]\n" struct MyFormat; impl LineFormat for MyFormat { - fn template(&self, name: &Name, _kind: Kind) -> LineTemplate { + fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate { vec![ - LineOp::Literal(format!("{} ", name.join(".")).into()), + LineOp::Literal(format!("{} ", name.join(".")).to_uppercase().into()), LineOp::ValueAsText, LineOp::Literal(" ".into()), LineOp::LabelExists("abc".into(), diff --git a/handbook/01_basics.md b/handbook/01_basics.md index 17c51c3..5e675c7 100755 --- a/handbook/01_basics.md +++ b/handbook/01_basics.md @@ -1,11 +1,19 @@ -# the dipstick handbook -**IN PROGRESS** +# The dipstick handbook -## table of contents +This handbook's purpose is to get you started instrumenting your apps with dipstick +and give an idea of what's possible. -## introduction +For more details, consult the [docs](https://docs.rs/dipstick/). -## static metrics macro +## Overview + +To achieve it's flexibility, Dipstick decouples the metrics _inputs_ from the metric _outputs_. +For example, incrementing a counter in the application may not result in immediate output to a file or to the network. +Conversely, it is also possible that an app will output metrics data even though no values were recorded. +While this makes things generally simpler, it requires the programmer to decide beforehand how metrics will be handled. + + +## Static metrics For speed and easier maintenance, metrics are usually defined statically: @@ -25,4 +33,4 @@ fn main() { } ``` -Metric definition macros are just `lazy_static!` wrappers. +(Metric definition macros are just `lazy_static!` wrappers.) diff --git a/handbook/02_inputs.md b/handbook/02_inputs.md index 7d899ae..2be5349 100755 --- a/handbook/02_inputs.md +++ b/handbook/02_inputs.md @@ -1,4 +1,28 @@ -# input +# Input + +Metrics input are the measurement instruments that are called from application code. +The inputs are high-level components that are assumed to be callable +from all contexts, regardless of threading, security, etc. + +Each metric input has a name and a kind. +A metric's name is a short alphanumeric identifier. +A metric's kind can be one of four kinds: +- Counter +- Marker +- Timer +- Gauge + +The actual flow of measured values varies depending on how the metrics backend has been configured. +Skip to the output section for more details on backend configuration. + +## Counters and Markers + +## Timers + +## Gauges + + + ## namespace diff --git a/src/aggregate/mod.rs b/src/aggregate/mod.rs deleted file mode 100755 index e4ffd80..0000000 --- a/src/aggregate/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bucket; -pub mod scores; diff --git a/src/aggregate/scores.rs b/src/aggregate/scores.rs deleted file mode 100755 index 0e3ac72..0000000 --- a/src/aggregate/scores.rs +++ /dev/null @@ -1,174 +0,0 @@ -use core::input::Kind; -use core::Value; - -use std::mem; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::*; -use std::usize; - -use self::ScoreType::*; - -#[derive(Debug, Clone, Copy)] -/// Possibly aggregated scores. -pub enum ScoreType { - /// Number of times the metric was used. - Count(u64), - /// Sum of metric values reported. - Sum(u64), - /// Biggest value reported. - Max(u64), - /// Smallest value reported. - Min(u64), - /// Average value (hit count / sum, non-atomic) - Mean(f64), - /// Mean rate (hit count / period length in seconds, non-atomic) - Rate(f64), -} - -/// A metric that holds aggregated values. -/// Some fields are kept public to ease publishing. -#[derive(Debug)] -pub struct Scoreboard { - /// The kind of metric - kind: Kind, - /// The actual recorded metric scores - scores: [AtomicUsize; 4], -} - -impl Scoreboard { - /// Create a new Scoreboard to track summary values of a metric - pub fn new(kind: Kind) -> Self { - Scoreboard { - kind, - scores: unsafe { mem::transmute(Scoreboard::blank()) }, - } - } - - /// Returns the metric's kind. - pub fn metric_kind(&self) -> Kind { - self.kind - } - - #[inline] - fn blank() -> [usize; 4] { - [0, 0, usize::MIN, usize::MAX] - } - - /// Update scores with new value - pub fn update(&self, value: Value) -> () { - // TODO report any concurrent updates / resets for measurement of contention - let value = value as usize; - self.scores[0].fetch_add(1, AcqRel); - match self.kind { - Kind::Marker => {} - _ => { - // optimization - these fields are unused for Marker stats - self.scores[1].fetch_add(value, AcqRel); - swap_if(&self.scores[2], value, |new, current| new > current); - swap_if(&self.scores[3], value, |new, current| new < current); - } - } - } - - /// Reset scores to zero, return previous values - fn snapshot(&self, scores: &mut [usize; 4]) -> bool { - // NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies - scores[0] = self.scores[0].swap(0, AcqRel); - scores[1] = self.scores[1].swap(0, AcqRel); - - // if hit count is zero, then no values were recorded. - if scores[0] == 0 { - return false; - } - - scores[2] = self.scores[2].swap(usize::MIN, AcqRel); - scores[3] = self.scores[3].swap(usize::MAX, AcqRel); - true - } - - /// Map raw scores (if any) to applicable statistics - pub fn reset(&self, duration_seconds: f64) -> Option> { - let mut scores = Scoreboard::blank(); - if self.snapshot(&mut scores) { - - let mut snapshot = Vec::new(); - match self.kind { - Kind::Marker => { - snapshot.push(Count(scores[0] as u64)); - snapshot.push(Rate(scores[0] as f64 / duration_seconds)) - } - Kind::Gauge => { - snapshot.push(Max(scores[2] as u64)); - snapshot.push(Min(scores[3] as u64)); - snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); - } - Kind::Timer => { - snapshot.push(Count(scores[0] as u64)); - snapshot.push(Sum(scores[1] as u64)); - - snapshot.push(Max(scores[2] as u64)); - snapshot.push(Min(scores[3] as u64)); - snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); - // timer rate uses the COUNT of timer calls per second (not SUM) - snapshot.push(Rate(scores[0] as f64 / duration_seconds)) - } - Kind::Counter => { - snapshot.push(Count(scores[0] as u64)); - snapshot.push(Sum(scores[1] as u64)); - - snapshot.push(Max(scores[2] as u64)); - snapshot.push(Min(scores[3] as u64)); - snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); - // counter rate uses the SUM of values per second (e.g. to get bytes/s) - snapshot.push(Rate(scores[1] as f64 / duration_seconds)) - } - } - Some(snapshot) - } else { - None - } - } -} - -/// Spinlock until success or clear loss to concurrent update. -#[inline] -fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) { - let mut current = counter.load(Acquire); - while compare(new_value, current) { - if counter.compare_and_swap(current, new_value, Release) == new_value { - // update successful - break; - } - // race detected, retry - current = counter.load(Acquire); - } -} - -#[cfg(feature = "bench")] -mod bench { - - use core::input::Kind; - - use super::*; - use test; - - #[bench] - fn update_marker(b: &mut test::Bencher) { - let metric = Scoreboard::new(Kind::Marker); - b.iter(|| test::black_box(metric.update(1))); - } - - #[bench] - fn update_count(b: &mut test::Bencher) { - let metric = Scoreboard::new(Kind::Counter); - b.iter(|| test::black_box(metric.update(4))); - } - - #[bench] - fn empty_snapshot(b: &mut test::Bencher) { - let metric = Scoreboard::new(Kind::Counter); - let scores = &mut Scoreboard::blank(); - b.iter(|| test::black_box(metric.snapshot(scores))); - } - -} diff --git a/src/aggregate/bucket.rs b/src/bucket/atomic.rs similarity index 52% rename from src/aggregate/bucket.rs rename to src/bucket/atomic.rs index 96f5cab..7c224f2 100755 --- a/src/aggregate/bucket.rs +++ b/src/bucket/atomic.rs @@ -1,21 +1,26 @@ //! Maintain aggregated metrics for deferred reporting, -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::{Name}; -use core::input::{Kind, InputScope, InputMetric}; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::{MetricName}; +use core::input::{InputKind, InputScope, InputMetric}; use core::output::{OutputDyn, OutputScope, OutputMetric, Output, output_none}; use core::clock::TimeHandle; -use core::{Value, Flush}; -use aggregate::scores::{Scoreboard, ScoreType}; +use core::{MetricValue, Flush}; +use bucket::{ScoreType, stats_summary}; +use bucket::ScoreType::*; use core::error; +use std::mem; +use std::usize; use std::collections::BTreeMap; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::*; use std::sync::{Arc, RwLock}; use std::fmt; use std::borrow::Borrow; /// A function type to transform aggregated scores into publishable statistics. -pub type StatsFn = Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static; +pub type StatsFn = Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static; fn initial_stats() -> &'static StatsFn { &stats_summary @@ -34,21 +39,21 @@ lazy_static! { /// Central aggregation structure. /// Maintains a list of metrics for enumeration when used as source. #[derive(Debug, Clone)] -pub struct Bucket { +pub struct AtomicBucket { attributes: Attributes, - inner: Arc>, + inner: Arc>, } -struct InnerBucket { - metrics: BTreeMap>, +struct InnerAtomicBucket { + metrics: BTreeMap>, period_start: TimeHandle, - stats: Option Option<(Kind, Name, Value)> + Send + Sync + 'static>>, + stats: Option Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>, output: Option>, publish_metadata: bool, } -impl fmt::Debug for InnerBucket { +impl fmt::Debug for InnerAtomicBucket { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "metrics: {:?}", self.metrics)?; write!(f, "period_start: {:?}", self.period_start) @@ -56,10 +61,10 @@ impl fmt::Debug for InnerBucket { } lazy_static! { - static ref PERIOD_LENGTH: Name = "_period_length".into(); + static ref PERIOD_LENGTH: MetricName = "_period_length".into(); } -impl InnerBucket { +impl InnerAtomicBucket { pub fn flush(&mut self) -> error::Result<()> { let stats_fn = match self.stats { @@ -90,13 +95,13 @@ impl InnerBucket { /// Take a snapshot of aggregated values and reset them. /// Compute stats on captured values using assigned or default stats function. /// Write stats to assigned or default output. - pub fn flush_to(&mut self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> { + pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> { let now = TimeHandle::now(); let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0; self.period_start = now; - let mut snapshot: Vec<(&Name, Kind, Vec)> = self.metrics.iter() + let mut snapshot: Vec<(&MetricName, InputKind, Vec)> = self.metrics.iter() .flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) { Some((name, scores.metric_kind(), values)) } else { @@ -112,39 +117,41 @@ impl InnerBucket { } else { // TODO add switch for metadata such as PERIOD_LENGTH if self.publish_metadata { - snapshot.push((&PERIOD_LENGTH, Kind::Timer, vec![ScoreType::Sum((duration_seconds * 1000.0) as u64)])); + snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as u64)])); } for metric in snapshot { for score in metric.2 { - let filtered = (stats_fn)(metric.1, metric.0.clone(), score); + let filtered = stats(metric.1, metric.0.clone(), score); if let Some((kind, name, value)) = filtered { - let metric: OutputMetric = publish_scope.new_metric(name, kind); + let metric: OutputMetric = target.new_metric(name, kind); + // TODO provide some bucket context through labels? metric.write(value, labels![]) } } } - publish_scope.flush() + target.flush() } } } -impl> From for Bucket { - fn from(name: S) -> Bucket { - Bucket::new().add_naming(name.as_ref()) +impl> From for AtomicBucket { + fn from(name: S) -> AtomicBucket { + AtomicBucket::new().add_prefix(name.as_ref()) } } -impl Bucket { - /// Build a new metric aggregation - pub fn new() -> Bucket { - Bucket { +impl AtomicBucket { + /// Build a new atomic bucket. + pub fn new() -> AtomicBucket { + AtomicBucket { attributes: Attributes::default(), - inner: Arc::new(RwLock::new(InnerBucket { + inner: Arc::new(RwLock::new(InnerAtomicBucket { metrics: BTreeMap::new(), period_start: TimeHandle::now(), stats: None, output: None, + // TODO add API toggle for metadata publish publish_metadata: false, })) } @@ -153,7 +160,7 @@ impl Bucket { /// Set the default aggregated metrics statistics generator. pub fn set_default_stats(func: F) where - F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static + F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static { *DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func) } @@ -176,7 +183,7 @@ impl Bucket { /// Set the default aggregated metrics statistics generator. pub fn set_stats(&self, func: F) where - F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static + F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static { self.inner.write().expect("Aggregator").stats = Some(Arc::new(func)) } @@ -187,7 +194,7 @@ impl Bucket { } /// Install a new receiver for all aggregated metrics, replacing any previous receiver. - pub fn set_target(&self, new_config: impl Output + Send + Sync + 'static) { + pub fn set_flush_target(&self, new_config: impl Output + Send + Sync + 'static) { self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config)) } @@ -204,21 +211,21 @@ impl Bucket { } -impl InputScope for Bucket { - /// Lookup or create a scoreboard for the requested metric. - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let scoreb = self.inner +impl InputScope for AtomicBucket { + /// Lookup or create scores for the requested metric. + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let scores = self.inner .write() .expect("Aggregator") .metrics - .entry(self.naming_append(name)) - .or_insert_with(|| Arc::new(Scoreboard::new(kind))) + .entry(self.prefix_append(name)) + .or_insert_with(|| Arc::new(AtomicScores::new(kind))) .clone(); - InputMetric::new(move |value, _labels| scoreb.update(value)) + InputMetric::new(move |value, _labels| scores.update(value)) } } -impl Flush for Bucket { +impl Flush for AtomicBucket { /// Collect and reset aggregated data. /// Publish statistics fn flush(&self) -> error::Result<()> { @@ -227,64 +234,127 @@ impl Flush for Bucket { } } -impl WithAttributes for Bucket { +impl WithAttributes for AtomicBucket { fn get_attributes(&self) -> &Attributes { &self.attributes } fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } } -/// A predefined export strategy reporting all aggregated stats for all metric types. -/// Resulting stats are named by appending a short suffix to each metric's name. -#[allow(dead_code)] -pub fn stats_all(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> { - match score { - ScoreType::Count(hit) => Some((Kind::Counter, name.make_name("count"), hit)), - ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)), - ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as Value)), - ScoreType::Max(max) => Some((Kind::Gauge, name.make_name("max"), max)), - ScoreType::Min(min) => Some((Kind::Gauge, name.make_name("min"), min)), - ScoreType::Rate(rate) => Some((Kind::Gauge, name.make_name("rate"), rate.round() as Value)), +/// A metric that holds aggregated values. +/// Some fields are kept public to ease publishing. +#[derive(Debug)] +struct AtomicScores { + /// The kind of metric + kind: InputKind, + /// The actual recorded metric scores + scores: [AtomicUsize; 4], +} + +impl AtomicScores { + /// Create new scores to track summary values of a metric + pub fn new(kind: InputKind) -> Self { + AtomicScores { + kind, + scores: unsafe { mem::transmute(AtomicScores::blank()) }, + } + } + + /// Returns the metric's kind. + pub fn metric_kind(&self) -> InputKind { + self.kind + } + + #[inline] + fn blank() -> [usize; 4] { + [0, 0, usize::MIN, usize::MAX] + } + + /// Update scores with new value + pub fn update(&self, value: MetricValue) -> () { + // TODO report any concurrent updates / resets for measurement of contention + let value = value as usize; + self.scores[0].fetch_add(1, AcqRel); + match self.kind { + InputKind::Marker => {} + _ => { + // optimization - these fields are unused for Marker stats + self.scores[1].fetch_add(value, AcqRel); + swap_if(&self.scores[2], value, |new, current| new > current); + swap_if(&self.scores[3], value, |new, current| new < current); + } + } + } + + /// Reset scores to zero, return previous values + fn snapshot(&self, scores: &mut [usize; 4]) -> bool { + // NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies + scores[0] = self.scores[0].swap(0, AcqRel); + scores[1] = self.scores[1].swap(0, AcqRel); + + // if hit count is zero, then no values were recorded. + if scores[0] == 0 { + return false; + } + + scores[2] = self.scores[2].swap(usize::MIN, AcqRel); + scores[3] = self.scores[3].swap(usize::MAX, AcqRel); + true + } + + /// Map raw scores (if any) to applicable statistics + pub fn reset(&self, duration_seconds: f64) -> Option> { + let mut scores = AtomicScores::blank(); + if self.snapshot(&mut scores) { + + let mut snapshot = Vec::new(); + match self.kind { + InputKind::Marker => { + snapshot.push(Count(scores[0] as u64)); + snapshot.push(Rate(scores[0] as f64 / duration_seconds)) + } + InputKind::Gauge => { + snapshot.push(Max(scores[2] as u64)); + snapshot.push(Min(scores[3] as u64)); + snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); + } + InputKind::Timer => { + snapshot.push(Count(scores[0] as u64)); + snapshot.push(Sum(scores[1] as u64)); + + snapshot.push(Max(scores[2] as u64)); + snapshot.push(Min(scores[3] as u64)); + snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); + // timer rate uses the COUNT of timer calls per second (not SUM) + snapshot.push(Rate(scores[0] as f64 / duration_seconds)) + } + InputKind::Counter => { + snapshot.push(Count(scores[0] as u64)); + snapshot.push(Sum(scores[1] as u64)); + + snapshot.push(Max(scores[2] as u64)); + snapshot.push(Min(scores[3] as u64)); + snapshot.push(Mean(scores[1] as f64 / scores[0] as f64)); + // counter rate uses the SUM of values per second (e.g. to get bytes/s) + snapshot.push(Rate(scores[1] as f64 / duration_seconds)) + } + } + Some(snapshot) + } else { + None + } } } -/// A predefined export strategy reporting the average value for every non-marker metric. -/// Marker metrics export their hit count instead. -/// Since there is only one stat per metric, there is no risk of collision -/// and so exported stats copy their metric's name. -#[allow(dead_code)] -pub fn stats_average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> { - match kind { - Kind::Marker => match score { - ScoreType::Count(count) => Some((Kind::Counter, name, count)), - _ => None, - }, - _ => match score { - ScoreType::Mean(avg) => Some((Kind::Gauge, name, avg.round() as Value)), - _ => None, - }, - } -} - -/// A predefined single-stat-per-metric export strategy: -/// - Timers and Counters each export their sums -/// - Markers each export their hit count -/// - Gauges each export their average -/// Since there is only one stat per metric, there is no risk of collision -/// and so exported stats copy their metric's name. -#[allow(dead_code)] -pub fn stats_summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> { - match kind { - Kind::Marker => match score { - ScoreType::Count(count) => Some((Kind::Counter, name, count)), - _ => None, - }, - Kind::Counter | Kind::Timer => match score { - ScoreType::Sum(sum) => Some((kind, name, sum)), - _ => None, - }, - Kind::Gauge => match score { - ScoreType::Mean(mean) => Some((Kind::Gauge, name, mean.round() as Value)), - _ => None, - }, +/// Spinlock until success or clear loss to concurrent update. +#[inline] +fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) { + let mut current = counter.load(Acquire); + while compare(new_value, current) { + if counter.compare_and_swap(current, new_value, Release) == new_value { + // update successful + break; + } + // race detected, retry + current = counter.load(Acquire); } } @@ -294,17 +364,36 @@ mod bench { use test; use super::*; + #[bench] + fn update_marker(b: &mut test::Bencher) { + let metric = AtomicScores::new(InputKind::Marker); + b.iter(|| test::black_box(metric.update(1))); + } + + #[bench] + fn update_count(b: &mut test::Bencher) { + let metric = AtomicScores::new(InputKind::Counter); + b.iter(|| test::black_box(metric.update(4))); + } + + #[bench] + fn empty_snapshot(b: &mut test::Bencher) { + let metric = AtomicScores::new(InputKind::Counter); + let scores = &mut AtomicScores::blank(); + b.iter(|| test::black_box(metric.snapshot(scores))); + } + #[bench] fn aggregate_marker(b: &mut test::Bencher) { - let sink = Bucket::new(); - let metric = sink.new_metric("event_a".into(), Kind::Marker); + let sink = AtomicBucket::new(); + let metric = sink.new_metric("event_a".into(), InputKind::Marker); b.iter(|| test::black_box(metric.write(1, labels![]))); } #[bench] fn aggregate_counter(b: &mut test::Bencher) { - let sink = Bucket::new(); - let metric = sink.new_metric("count_a".into(), Kind::Counter); + let sink = AtomicBucket::new(); + let metric = sink.new_metric("count_a".into(), InputKind::Counter); b.iter(|| test::black_box(metric.write(1, labels![]))); } @@ -313,16 +402,18 @@ mod bench { #[cfg(test)] mod test { use super::*; + use bucket::{stats_all, stats_average, stats_summary}; + use core::clock::{mock_clock_advance, mock_clock_reset}; use output::map::StatsMap; use std::time::Duration; use std::collections::BTreeMap; - fn make_stats(stats_fn: &StatsFn) -> BTreeMap { + fn make_stats(stats_fn: &StatsFn) -> BTreeMap { mock_clock_reset(); - let metrics = Bucket::new().add_naming("test"); + let metrics = AtomicBucket::new().add_prefix("test"); let counter = metrics.counter("counter_a"); let timer = metrics.timer("timer_a"); diff --git a/src/bucket/mod.rs b/src/bucket/mod.rs new file mode 100755 index 0000000..140a84b --- /dev/null +++ b/src/bucket/mod.rs @@ -0,0 +1,84 @@ +pub mod atomic; + +use core::input::InputKind; +use core::MetricValue; +use core::name::{MetricName}; + +/// Possibly aggregated scores. +#[derive(Debug, Clone, Copy)] +pub enum ScoreType { + /// Number of times the metric was used. + Count(u64), + /// Sum of metric values reported. + Sum(u64), + /// Biggest value reported. + Max(u64), + /// Smallest value reported. + Min(u64), + /// Average value (hit count / sum, non-atomic) + Mean(f64), + /// Mean rate (hit count / period length in seconds, non-atomic) + Rate(f64), +} + +/// A predefined export strategy reporting all aggregated stats for all metric types. +/// Resulting stats are named by appending a short suffix to each metric's name. +#[allow(dead_code)] +pub fn stats_all(kind: InputKind, name: MetricName, score: ScoreType) + -> Option<(InputKind, MetricName, MetricValue)> +{ + match score { + ScoreType::Count(hit) => Some((InputKind::Counter, name.make_name("count"), hit)), + ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)), + ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as MetricValue)), + ScoreType::Max(max) => Some((InputKind::Gauge, name.make_name("max"), max)), + ScoreType::Min(min) => Some((InputKind::Gauge, name.make_name("min"), min)), + ScoreType::Rate(rate) => Some((InputKind::Gauge, name.make_name("rate"), rate.round() as MetricValue)), + } +} + +/// A predefined export strategy reporting the average value for every non-marker metric. +/// Marker metrics export their hit count instead. +/// Since there is only one stat per metric, there is no risk of collision +/// and so exported stats copy their metric's name. +#[allow(dead_code)] +pub fn stats_average(kind: InputKind, name: MetricName, score: ScoreType) + -> Option<(InputKind, MetricName, MetricValue)> +{ + match kind { + InputKind::Marker => match score { + ScoreType::Count(count) => Some((InputKind::Counter, name, count)), + _ => None, + }, + _ => match score { + ScoreType::Mean(avg) => Some((InputKind::Gauge, name, avg.round() as MetricValue)), + _ => None, + }, + } +} + +/// A predefined single-stat-per-metric export strategy: +/// - Timers and Counters each export their sums +/// - Markers each export their hit count +/// - Gauges each export their average +/// Since there is only one stat per metric, there is no risk of collision +/// and so exported stats copy their metric's name. +#[allow(dead_code)] +pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType) + -> Option<(InputKind, MetricName, MetricValue)> +{ + match kind { + InputKind::Marker => match score { + ScoreType::Count(count) => Some((InputKind::Counter, name, count)), + _ => None, + }, + InputKind::Counter | InputKind::Timer => match score { + ScoreType::Sum(sum) => Some((kind, name, sum)), + _ => None, + }, + InputKind::Gauge => match score { + ScoreType::Mean(mean) => Some((InputKind::Gauge, name, mean.round() as MetricValue)), + _ => None, + }, + } +} \ No newline at end of file diff --git a/src/cache/cache_in.rs b/src/cache/cache_in.rs index 44e9246..8ad32fc 100755 --- a/src/cache/cache_in.rs +++ b/src/cache/cache_in.rs @@ -1,9 +1,9 @@ //! Cache metric definitions. use core::Flush; -use core::input::{Kind, Input, InputScope, InputMetric, InputDyn}; -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; +use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn}; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; use cache::lru_cache as lru; use core::error; @@ -24,7 +24,7 @@ pub trait CachedInput: Input + Send + Sync + 'static + Sized { pub struct InputCache { attributes: Attributes, target: Arc, - cache: Arc>>, + cache: Arc>>, } impl InputCache { @@ -61,7 +61,7 @@ impl Input for InputCache { pub struct InputScopeCache { attributes: Attributes, target: Arc, - cache: Arc>>, + cache: Arc>>, } impl WithAttributes for InputScopeCache { @@ -70,8 +70,8 @@ impl WithAttributes for InputScopeCache { } impl InputScope for InputScopeCache { - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); let lookup = { self.cache.write().expect("Cache Lock").get(&name).cloned() }; diff --git a/src/cache/cache_out.rs b/src/cache/cache_out.rs index 971d733..2deb4c0 100755 --- a/src/cache/cache_out.rs +++ b/src/cache/cache_out.rs @@ -1,10 +1,10 @@ //! Cache metric definitions. use core::Flush; -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; use core::output::{Output, OutputMetric, OutputScope, OutputDyn}; -use core::input::Kind; +use core::input::InputKind; use cache::lru_cache as lru; use core::error; @@ -26,7 +26,7 @@ pub trait CachedOutput: Output + Send + Sync + 'static + Sized { pub struct OutputCache { attributes: Attributes, target: Arc, - cache: Arc>>, + cache: Arc>>, } impl OutputCache { @@ -63,7 +63,7 @@ impl Output for OutputCache { pub struct OutputScopeCache { attributes: Attributes, target: Rc, - cache: Arc>>, + cache: Arc>>, } impl WithAttributes for OutputScopeCache { @@ -72,8 +72,8 @@ impl WithAttributes for OutputScopeCache { } impl OutputScope for OutputScopeCache { - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { + let name = self.prefix_append(name); let lookup = { self.cache.write().expect("Cache Lock").get(&name).cloned() }; diff --git a/src/cache/lru_cache.rs b/src/cache/lru_cache.rs index d87d8ec..c27799a 100755 --- a/src/cache/lru_cache.rs +++ b/src/cache/lru_cache.rs @@ -1,6 +1,6 @@ - //! A fixed-size cache with LRU expiration criteria. - //! Stored values will be held onto as long as there is space. - //! When space runs out, the oldest unused value will get evicted to make room for a new value. +//! A fixed-size cache with LRU expiration criteria. +//! Stored values will be held onto as long as there is space. +//! When space runs out, the oldest unused value will get evicted to make room for a new value. use std::hash::Hash; use std::collections::HashMap; diff --git a/src/core/attributes.rs b/src/core/attributes.rs index 7d1d8b1..e4059df 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::collections::{HashMap}; -use core::name::{NameParts, Name}; +use core::name::{NameParts, MetricName}; /// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method. #[derive(Debug, Clone, Copy)] @@ -43,7 +43,7 @@ pub trait WithAttributes: Clone { // TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc) fn mut_attributes(&mut self) -> &mut Attributes; - /// Utility method. Clone a component and mutate its attributes at once. + /// Clone the component and mutate its attributes at once. fn with_attributes(&self, edit: F) -> Self { let mut cloned = self.clone(); (edit)(cloned.mut_attributes()); @@ -52,21 +52,21 @@ pub trait WithAttributes: Clone { } /// Name operations support. -pub trait Naming { +pub trait Prefixed { /// Returns namespace of component. - fn get_naming(&self) -> &NameParts; + fn get_prefixes(&self) -> &NameParts; - /// Join namespace and prepend in newly defined metrics. - fn add_naming>(&self, name: S) -> Self; + /// Extend the namespace metrics will be defined in. + fn add_prefix>(&self, name: S) -> Self; /// Append any name parts to the name's namespace. - fn naming_append>(&self, name: S) -> Name { - name.into().append(self.get_naming().clone()) + fn prefix_append>(&self, name: S) -> MetricName { + name.into().append(self.get_prefixes().clone()) } /// Prepend any name parts to the name's namespace. - fn naming_prepend>(&self, name: S) -> Name { - name.into().prepend(self.get_naming().clone()) + fn prefix_prepend>(&self, name: S) -> MetricName { + name.into().prepend(self.get_prefixes().clone()) } } @@ -80,16 +80,16 @@ pub trait Label { } -impl Naming for T { +impl Prefixed for T { /// Returns namespace of component. - fn get_naming(&self) -> &NameParts { + fn get_prefixes(&self) -> &NameParts { &self.get_attributes().naming } /// Adds a name part to any existing naming. /// Return a clone of the component with the updated naming. - fn add_naming>(&self, name: S) -> Self { + fn add_prefix>(&self, name: S) -> Self { let name = name.into(); self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone())) } diff --git a/src/core/clock.rs b/src/core/clock.rs index 569ce20..2d51f83 100755 --- a/src/core/clock.rs +++ b/src/core/clock.rs @@ -6,7 +6,7 @@ use std::time::Duration; use std::time::Instant; -use core::Value; +use core::MetricValue; #[derive(Debug, Copy, Clone)] /// A handle to the start time of a counter. @@ -21,13 +21,13 @@ impl TimeHandle { } /// Get the elapsed time in microseconds since TimeHandle was obtained. - pub fn elapsed_us(self) -> Value { + pub fn elapsed_us(self) -> MetricValue { let duration = now() - self.0; - duration.as_secs() * 1_000_000 + duration.subsec_micros() as Value + duration.as_secs() * 1_000_000 + duration.subsec_micros() as MetricValue } /// Get the elapsed time in microseconds since TimeHandle was obtained. - pub fn elapsed_ms(self) -> Value { + pub fn elapsed_ms(self) -> MetricValue { self.elapsed_us() / 1000 } } diff --git a/src/core/input.rs b/src/core/input.rs index e7709af..f4a6011 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -1,7 +1,7 @@ use core::clock::TimeHandle; -use core::{Value, Flush}; -use core::name::Name; -use ::{Labels}; +use core::{MetricValue, Flush}; +use core::name::MetricName; +use core::label::Labels; use std::sync::Arc; use std::fmt; @@ -36,26 +36,26 @@ impl InputDyn for T { pub trait InputScope: Flush { /// Define a generic metric of the specified type. /// It is preferable to use counter() / marker() / timer() / gauge() methods. - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric; + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric; /// Define a counter. fn counter(&self, name: &str) -> Counter { - self.new_metric(name.into(), Kind::Counter).into() + self.new_metric(name.into(), InputKind::Counter).into() } /// Define a marker. fn marker(&self, name: &str) -> Marker { - self.new_metric(name.into(), Kind::Marker).into() + self.new_metric(name.into(), InputKind::Marker).into() } /// Define a timer. fn timer(&self, name: &str) -> Timer { - self.new_metric(name.into(), Kind::Timer).into() + self.new_metric(name.into(), InputKind::Timer).into() } /// Define a gauge. fn gauge(&self, name: &str) -> Gauge { - self.new_metric(name.into(), Kind::Gauge).into() + self.new_metric(name.into(), InputKind::Gauge).into() } } @@ -63,7 +63,7 @@ pub trait InputScope: Flush { /// A metric is actually a function that knows to write a metric value to a metric output. #[derive(Clone)] pub struct InputMetric { - inner: Arc + inner: Arc } impl fmt::Debug for InputMetric { @@ -74,20 +74,20 @@ impl fmt::Debug for InputMetric { impl InputMetric { /// Utility constructor - pub fn new(metric: F) -> InputMetric { + pub fn new(metric: F) -> InputMetric { InputMetric { inner: Arc::new(metric) } } /// Collect a new value for this metric. #[inline] - pub fn write(&self, value: Value, labels: Labels) { + pub fn write(&self, value: MetricValue, labels: Labels) { (self.inner)(value, labels) } } /// Used to differentiate between metric kinds in the backend. #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub enum Kind { +pub enum InputKind { /// Handling one item at a time. Marker, /// Handling quantities or multiples. @@ -98,15 +98,15 @@ pub enum Kind { Timer, } -/// Used by the metrics! macro to obtain the Kind from the stringified type. -impl<'a> From<&'a str> for Kind { - fn from(s: &str) -> Kind { +/// Used by the metrics! macro to obtain the InputKind from the stringified type. +impl<'a> From<&'a str> for InputKind { + fn from(s: &str) -> InputKind { match s { - "Marker" => Kind::Marker, - "Counter" => Kind::Counter, - "Gauge" => Kind::Gauge, - "Timer" => Kind::Timer, - _ => panic!("No Kind '{}' defined", s) + "Marker" => InputKind::Marker, + "Counter" => InputKind::Counter, + "Gauge" => InputKind::Gauge, + "Timer" => InputKind::Timer, + _ => panic!("No InputKind '{}' defined", s) } } } diff --git a/src/core/metrics.rs b/src/core/metrics.rs index e958107..43cb699 100755 --- a/src/core/metrics.rs +++ b/src/core/metrics.rs @@ -3,7 +3,7 @@ //! This is also kept in a separate module because it is not to be exposed outside of the crate. use core::input::{Marker, InputScope, Counter}; -use core::attributes::Naming; +use core::attributes::Prefixed; use core::proxy::Proxy; metrics!{ diff --git a/src/core/mod.rs b/src/core/mod.rs index a00d543..52b58ee 100755 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -13,7 +13,7 @@ pub mod scheduler; pub mod metrics; /// Base type for recorded metric values. -pub type Value = u64; +pub type MetricValue = u64; /// Both InputScope and OutputScope share the ability to flush the recorded data. pub trait Flush { @@ -31,10 +31,9 @@ pub mod test { #[test] fn test_to_void() { let c = void::Void::metrics().input(); - let m = c.new_metric("test".into(), input::Kind::Marker); + let m = c.new_metric("test".into(), input::InputKind::Marker); m.write(33, labels![]); } - } #[cfg(feature = "bench")] @@ -42,7 +41,7 @@ pub mod bench { use super::input::*; use super::clock::*; - use super::super::aggregate::bucket::*; + use super::super::bucket::atomic::*; use test; #[bench] @@ -52,7 +51,7 @@ pub mod bench { #[bench] fn time_bench_direct_dispatch_event(b: &mut test::Bencher) { - let metrics = Bucket::new(); + let metrics = AtomicBucket::new(); let marker = metrics.marker("aaa"); b.iter(|| test::black_box(marker.mark())); } diff --git a/src/core/name.rs b/src/core/name.rs index e7bec81..d021dc7 100644 --- a/src/core/name.rs +++ b/src/core/name.rs @@ -28,15 +28,15 @@ impl NameParts { } /// Make a name in this namespace - pub fn make_name>(&self, leaf: S) -> Name { + pub fn make_name>(&self, leaf: S) -> MetricName { let mut nodes = self.clone(); nodes.push_back(leaf.into()); - Name { nodes } + MetricName { nodes } } /// Extract a copy of the last name part /// Panics if empty - pub fn short(&self) -> Name { + pub fn short(&self) -> MetricName { self.back().expect("Short metric name").clone().into() } } @@ -70,11 +70,11 @@ impl DerefMut for NameParts { /// The name of a metric, including the concatenated possible namespaces in which it was defined. #[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] -pub struct Name { +pub struct MetricName { nodes: NameParts, } -impl Name { +impl MetricName { /// Prepend to the existing namespace. pub fn prepend>(mut self, namespace: S) -> Self { @@ -101,20 +101,20 @@ impl Name { } } -impl> From for Name { +impl> From for MetricName { fn from(name: S) -> Self { - Name { nodes: NameParts::from(name) } + MetricName { nodes: NameParts::from(name) } } } -impl Deref for Name { +impl Deref for MetricName { type Target = NameParts; fn deref(&self) -> &Self::Target { &self.nodes } } -impl DerefMut for Name { +impl DerefMut for MetricName { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.nodes } diff --git a/src/core/out_lock.rs b/src/core/out_lock.rs index 3f5d6f5..5ee3038 100755 --- a/src/core/out_lock.rs +++ b/src/core/out_lock.rs @@ -1,7 +1,7 @@ -use core::input::{InputScope, InputMetric, Input, Kind}; +use core::input::{InputScope, InputMetric, Input, InputKind}; use core::output::{Output, OutputScope}; -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; use core::Flush; use core::error; use std::rc::Rc; @@ -9,7 +9,7 @@ use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::ops; -/// Provide thread-safe locking to RawScope implementers. +/// Synchronous thread-safety for metric output using basic locking. #[derive(Clone)] pub struct LockingScopeBox { attributes: Attributes, @@ -23,8 +23,8 @@ impl WithAttributes for LockingScopeBox { impl InputScope for LockingScopeBox { - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind); let mutex = self.inner.clone(); InputMetric::new(move |value, labels| { diff --git a/src/core/output.rs b/src/core/output.rs index ca1a3fc..2d96dde 100755 --- a/src/core/output.rs +++ b/src/core/output.rs @@ -1,8 +1,8 @@ -use core::{Flush, Value}; -use core::input::Kind; -use core::name::Name; +use core::{Flush, MetricValue}; +use core::input::InputKind; +use core::name::MetricName; use core::void::Void; -use ::{Labels}; +use core::label::Labels; use std::rc::Rc; @@ -10,26 +10,26 @@ use std::rc::Rc; pub trait OutputScope: Flush { /// Define a raw metric of the specified type. - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric; + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric; } /// Output metrics are not thread safe. #[derive(Clone)] pub struct OutputMetric { - inner: Rc + inner: Rc } impl OutputMetric { /// Utility constructor - pub fn new(metric: F) -> OutputMetric { + pub fn new(metric: F) -> OutputMetric { OutputMetric { inner: Rc::new(metric) } } /// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form. /// This shouldn't matter as metrics should be of type Counter, Marker, etc. #[inline] - pub fn write(&self, value: Value, labels: Labels) { + pub fn write(&self, value: MetricValue, labels: Labels) { (self.inner)(value, labels) } } diff --git a/src/core/proxy.rs b/src/core/proxy.rs index ab94d2b..81c3394 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -1,9 +1,9 @@ //! Decouple metric definition from configuration with trait objects. -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::{Name, NameParts}; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::{MetricName, NameParts}; use core::Flush; -use core::input::{Kind, InputMetric, InputScope}; +use core::input::{InputKind, InputMetric, InputScope}; use core::void::VOID_INPUT; use core::error; @@ -26,7 +26,7 @@ lazy_static! { struct ProxyMetric { // basic info for this metric, needed to recreate new corresponding trait object if target changes name: NameParts, - kind: Kind, + kind: InputKind, // the metric trait object to proxy metric values to // the second part can be up to namespace.len() + 1 if this metric was individually targeted @@ -174,13 +174,13 @@ impl Proxy { /// Replace target for this proxy and it's children. pub fn set_target(&self, target: T) { let mut inner = self.inner.write().expect("Dispatch Lock"); - inner.set_target(self.get_naming(), Arc::new(target)); + inner.set_target(self.get_prefixes(), Arc::new(target)); } /// Replace target for this proxy and it's children. pub fn unset_target(&self) { let mut inner = self.inner.write().expect("Dispatch Lock"); - inner.unset_target(self.get_naming()); + inner.unset_target(self.get_prefixes()); } /// Replace target for this proxy and it's children. @@ -197,19 +197,19 @@ impl Proxy { impl> From for Proxy { fn from(name: S) -> Proxy { - Proxy::new().add_naming(name.as_ref()) + Proxy::new().add_prefix(name.as_ref()) } } impl InputScope for Proxy { /// Lookup or create a proxy stub for the requested metric. - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let name: Name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name: MetricName = self.prefix_append(name); let mut inner = self.inner.write().expect("Dispatch Lock"); let proxy = inner .metrics .get(&name) - // TODO validate that Kind matches existing + // TODO validate that InputKind matches existing .and_then(|proxy_ref| Weak::upgrade(proxy_ref)) .unwrap_or_else(|| { let namespace = &*name; @@ -235,7 +235,7 @@ impl InputScope for Proxy { impl Flush for Proxy { fn flush(&self) -> error::Result<()> { - self.inner.write().expect("Dispatch Lock").flush(self.get_naming()) + self.inner.write().expect("Dispatch Lock").flush(self.get_prefixes()) } } @@ -249,11 +249,11 @@ mod bench { use super::*; use test; - use aggregate::bucket::Bucket; + use bucket::atomic::AtomicBucket; #[bench] fn proxy_marker_to_aggregate(b: &mut test::Bencher) { - ROOT_PROXY.set_target(Bucket::new()); + ROOT_PROXY.set_target(AtomicBucket::new()); let metric = ROOT_PROXY.marker("event_a"); b.iter(|| test::black_box(metric.mark())); } diff --git a/src/core/void.rs b/src/core/void.rs index 2785a95..705b959 100755 --- a/src/core/void.rs +++ b/src/core/void.rs @@ -1,6 +1,6 @@ use core::output::{Output, OutputScope, OutputMetric}; -use core::name::Name; -use core::input::{Kind, InputDyn, InputScope}; +use core::name::MetricName; +use core::input::{InputKind, InputDyn, InputScope}; use core::Flush; use std::sync::Arc; @@ -40,7 +40,7 @@ impl Output for Void { } impl OutputScope for VoidOutput { - fn new_metric(&self, _name: Name, _kind: Kind) -> OutputMetric { + fn new_metric(&self, _name: MetricName, _kind: InputKind) -> OutputMetric { OutputMetric::new(|_value, _labels| {}) } } diff --git a/src/lib.rs b/src/lib.rs index f32aee3..66b6139 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,10 +27,10 @@ mod macros; pub use macros::*; mod core; -pub use core::{Flush, Value}; -pub use core::attributes::{Naming, Sampling, Sampled, Buffered, Buffering}; -pub use core::name::{Name, NameParts}; -pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, Kind}; +pub use core::{Flush, MetricValue}; +pub use core::attributes::{Prefixed, Sampling, Sampled, Buffered, Buffering}; +pub use core::name::{MetricName, NameParts}; +pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, InputKind}; pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric}; pub use core::scheduler::{ScheduleFlush, CancelHandle}; pub use core::out_lock::{LockingScopeBox}; @@ -51,9 +51,9 @@ pub use output::statsd::{Statsd, StatsdScope, StatsdMetric}; pub use output::map::{StatsMap}; pub use output::log::{Log, LogScope}; -mod aggregate; -pub use aggregate::bucket::{Bucket, stats_all, stats_average, stats_summary}; -pub use aggregate::scores::{ScoreType, Scoreboard}; +mod bucket; +pub use bucket::{ScoreType, stats_all, stats_average, stats_summary}; +pub use bucket::atomic::{AtomicBucket}; mod cache; pub use cache::cache_in::CachedInput; diff --git a/src/macros.rs b/src/macros.rs index 9f17c8b..c67f681 100755 --- a/src/macros.rs +++ b/src/macros.rs @@ -130,27 +130,27 @@ macro_rules! metrics { // SUB BRANCH NODE - public identifier (@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => { - lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_naming($e); } + lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_prefix($e); } metrics!( @internal $IDENT; $TY; $($BRANCH)*); metrics!( @internal $WITH; $TY; $($REST)*); }; // SUB BRANCH NODE - private identifier (@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => { - lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_naming($e); } + lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_prefix($e); } metrics!( @internal $IDENT; $TY; $($BRANCH)*); metrics!( @internal $WITH; $TY; $($REST)*); }; // SUB BRANCH NODE (not yet) (@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => { - metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*); + metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*); metrics!( @internal $WITH; $TY; $($REST)*); }; // SUB BRANCH NODE (not yet) (@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => { - metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*); + metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*); metrics!( @internal $WITH; $TY; $($REST)*); }; diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index ed8152a..4fdd6c4 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -1,9 +1,9 @@ //! Dispatch metrics to multiple sinks. use core::Flush; -use core::input::{Kind, Input, InputScope, InputMetric, InputDyn}; -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; +use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn}; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; use core::error; use std::sync::Arc; @@ -75,8 +75,8 @@ impl MultiInputScope { } impl InputScope for MultiInputScope { - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let name = &self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = &self.prefix_append(name); let metrics: Vec = self.scopes.iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); diff --git a/src/multi/multi_out.rs b/src/multi/multi_out.rs index ffadc35..395405f 100755 --- a/src/multi/multi_out.rs +++ b/src/multi/multi_out.rs @@ -1,9 +1,9 @@ //! Dispatch metrics to multiple sinks. use core::Flush; -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; -use core::input::Kind; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; +use core::input::InputKind; use core::output::{Output, OutputMetric, OutputScope, OutputDyn}; use core::error; @@ -76,8 +76,8 @@ impl MultiOutputScope { } impl OutputScope for MultiOutputScope { - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric { - let name = &self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { + let name = &self.prefix_append(name); let metrics: Vec = self.scopes.iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); diff --git a/src/output/format.rs b/src/output/format.rs index 176b356..3db5a78 100644 --- a/src/output/format.rs +++ b/src/output/format.rs @@ -1,6 +1,6 @@ -use core::name::Name; -use core::input::Kind; -use core::Value; +use core::name::MetricName; +use core::input::InputKind; +use core::MetricValue; use self::LineOp::*; use std::io; @@ -15,7 +15,7 @@ pub enum LineOp { /// Print metric value as text. ValueAsText, /// Print metric value, divided by the given scale, as text. - ScaledValueAsText(Value), + ScaledValueAsText(MetricValue), /// Print the newline character.labels.lookup(key) NewLine, } @@ -43,7 +43,7 @@ impl From> for LineTemplate { impl LineTemplate { /// Template execution applies commands in turn, writing to the output. - pub fn print(&self, output: &mut io::Write, value: Value, lookup: L) -> Result<(), io::Error> + pub fn print(&self, output: &mut io::Write, value: MetricValue, lookup: L) -> Result<(), io::Error> where L: Fn(&str) -> Option> { for cmd in &self.ops { @@ -85,7 +85,7 @@ pub trait Formatting { pub trait LineFormat: Send + Sync { /// Prepare a template for output of metric values. - fn template(&self, name: &Name, kind: Kind) -> LineTemplate; + fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate; } /// A simple metric output format of "MetricName {Value}" @@ -96,7 +96,7 @@ pub struct SimpleFormat { } impl LineFormat for SimpleFormat { - fn template(&self, name: &Name, _kind: Kind) -> LineTemplate { + fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate { let mut header = name.join("."); header.push(' '); LineTemplate { @@ -112,12 +112,12 @@ impl LineFormat for SimpleFormat { #[cfg(test)] pub mod test { use super::*; - use ::Labels; + use core::label::Labels; pub struct TestFormat; impl LineFormat for TestFormat { - fn template(&self, name: &Name, kind: Kind) -> LineTemplate { + fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate { let mut header: String = format!("{:?}", kind); header.push('/'); header.push_str(&name.join(".")); @@ -143,9 +143,9 @@ pub mod test { fn print_label_exists() { let labels: Labels = labels!("test_key" => "456"); let format = TestFormat {}; - let mut name = Name::from("abc"); + let mut name = MetricName::from("abc"); name = name.prepend("xyz"); - let template = format.template(&name, Kind::Counter); + let template = format.template(&name, InputKind::Counter); let mut out = vec![]; template.print(&mut out, 123000, |key| labels.lookup(key)).unwrap(); assert_eq!("Counter/xyz.abc 123000 123 test_key=456\n", String::from_utf8(out).unwrap()); @@ -154,9 +154,9 @@ pub mod test { #[test] fn print_label_not_exists() { let format = TestFormat {}; - let mut name = Name::from("abc"); + let mut name = MetricName::from("abc"); name = name.prepend("xyz"); - let template = format.template(&name, Kind::Counter); + let template = format.template(&name, InputKind::Counter); let mut out = vec![]; template.print(&mut out, 123000, |_key| None).unwrap(); assert_eq!("Counter/xyz.abc 123000 123 \n", String::from_utf8(out).unwrap()); diff --git a/src/output/graphite.rs b/src/output/graphite.rs index e291347..e588cf0 100755 --- a/src/output/graphite.rs +++ b/src/output/graphite.rs @@ -1,9 +1,9 @@ //! Send metrics to a graphite server. -use core::attributes::{Buffered, Attributes, WithAttributes, Naming}; -use core::name::Name; -use core::{Flush, Value}; -use core::input::Kind; +use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; +use core::{Flush, MetricValue}; +use core::input::InputKind; use core::metrics; use core::output::{Output, OutputScope, OutputMetric}; use core::error; @@ -71,13 +71,13 @@ pub struct GraphiteScope { impl OutputScope for GraphiteScope { /// Define a metric of the specified type. - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric { - let mut prefix = self.naming_prepend(name).join("."); + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { + let mut prefix = self.prefix_prepend(name).join("."); prefix.push(' '); let scale = match kind { // timers are in µs, but we give graphite milliseconds - Kind::Timer => 1000, + InputKind::Timer => 1000, _ => 1, }; @@ -99,7 +99,7 @@ impl Flush for GraphiteScope { } impl GraphiteScope { - fn print(&self, metric: &GraphiteMetric, value: Value) { + fn print(&self, metric: &GraphiteMetric, value: MetricValue) { let scaled_value = value / metric.scale; let value_str = scaled_value.to_string(); @@ -195,7 +195,7 @@ mod bench { #[bench] pub fn immediate_graphite(b: &mut test::Bencher) { let sd = Graphite::send_to("localhost:2003").unwrap().input(); - let timer = sd.new_metric("timer".into(), Kind::Timer); + let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); } @@ -204,7 +204,7 @@ mod bench { pub fn buffering_graphite(b: &mut test::Bencher) { let sd = Graphite::send_to("localhost:2003").unwrap() .buffered(Buffering::BufferSize(65465)).input(); - let timer = sd.new_metric("timer".into(), Kind::Timer); + let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); } diff --git a/src/output/log.rs b/src/output/log.rs index 0b14475..61cf990 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -1,7 +1,7 @@ use core::{Flush}; -use core::input::{Kind, Input, InputScope, InputMetric}; -use core::attributes::{Attributes, WithAttributes, Buffered, Naming}; -use core::name::Name; +use core::input::{InputKind, Input, InputScope, InputMetric}; +use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed}; +use core::name::MetricName; use core::error; use cache::cache_in; use queue::queue_in; @@ -75,8 +75,8 @@ impl queue_in::QueuedInput for Log {} impl cache_in::CachedInput for Log {} impl InputScope for LogScope { - fn new_metric(&self, name: Name, kind: Kind) -> InputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); let template = self.output.format.template(&name, kind); @@ -136,7 +136,7 @@ mod test { #[test] fn test_to_log() { let c = super::Log::log_to().input(); - let m = c.new_metric("test".into(), Kind::Marker); + let m = c.new_metric("test".into(), InputKind::Marker); m.write(33, labels![]); } diff --git a/src/output/map.rs b/src/output/map.rs index 0be46f6..737572a 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -1,6 +1,6 @@ -use core::{Flush, Value}; -use core::input::Kind; -use core::name::Name; +use core::{Flush, MetricValue}; +use core::input::InputKind; +use core::name::MetricName; use core::output::{OutputMetric, OutputScope}; use std::rc::Rc; @@ -11,11 +11,11 @@ use std::collections::BTreeMap; /// Every received value for a metric replaces the previous one (if any). #[derive(Clone, Default)] pub struct StatsMap { - inner: Rc>>, + inner: Rc>>, } impl OutputScope for StatsMap { - fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric { + fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric { let write_to = self.inner.clone(); let name: String = name.join("."); OutputMetric::new(move |value, _labels| { @@ -26,7 +26,7 @@ impl OutputScope for StatsMap { impl Flush for StatsMap {} -impl From for BTreeMap { +impl From for BTreeMap { fn from(map: StatsMap) -> Self { // FIXME this is is possibly a full map copy, for nothing. // into_inner() is what we'd really want here but would require some `unsafe`? don't know how to do this yet. diff --git a/src/output/prometheus.rs b/src/output/prometheus.rs index dad5117..abf5e16 100755 --- a/src/output/prometheus.rs +++ b/src/output/prometheus.rs @@ -5,10 +5,10 @@ //! - Serve metrics with basic HTTP server //! - Print metrics to a buffer provided by an HTTP framework. -use core::{Flush, Value}; -use core::input::{Kind, Input, InputScope, InputMetric}; -use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Naming}; -use core::name::Name; +use core::{Flush, MetricValue}; +use core::input::{InputKind, Input, InputScope, InputMetric}; +use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed}; +use core::name::MetricName; use core::output::{Output, OutputMetric, OutputScope}; use core::error; @@ -47,8 +47,8 @@ impl PrometheusScope { impl OutputScope for PrometheusScope { /// Define a metric of the specified type. - fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric { - let mut _prefix = self.naming_prepend(name).join("."); + fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric { + let mut _prefix = self.prefix_prepend(name).join("."); OutputMetric::new(|_value, _labels| {}) } } diff --git a/src/output/statsd.rs b/src/output/statsd.rs index c4f5ae3..c9872f6 100755 --- a/src/output/statsd.rs +++ b/src/output/statsd.rs @@ -1,10 +1,10 @@ //! Send metrics to a statsd server. -use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Naming}; -use core::name::Name; +use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Prefixed}; +use core::name::MetricName; use core::pcg32; -use core::{Flush, Value}; -use core::input::Kind; +use core::{Flush, MetricValue}; +use core::input::InputKind; use core::metrics; use core::output::{Output, OutputScope, OutputMetric}; use core::error; @@ -78,21 +78,21 @@ impl Sampled for StatsdScope {} impl OutputScope for StatsdScope { /// Define a metric of the specified type. - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric { - let mut prefix = self.naming_prepend(name).join("."); + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { + let mut prefix = self.prefix_prepend(name).join("."); prefix.push(':'); let mut suffix = String::with_capacity(16); suffix.push('|'); suffix.push_str(match kind { - Kind::Marker | Kind::Counter => "c", - Kind::Gauge => "g", - Kind::Timer => "ms", + InputKind::Marker | InputKind::Counter => "c", + InputKind::Gauge => "g", + InputKind::Timer => "ms", }); let scale = match kind { // timers are in µs, statsd wants ms - Kind::Timer => 1000, + InputKind::Timer => 1000, _ => 1, }; @@ -127,7 +127,7 @@ impl Flush for StatsdScope { } impl StatsdScope { - fn print(&self, metric: &StatsdMetric, value: Value) { + fn print(&self, metric: &StatsdMetric, value: MetricValue) { let scaled_value = value / metric.scale; let value_str = scaled_value.to_string(); let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len(); @@ -212,16 +212,16 @@ impl Drop for StatsdScope { //pub struct StatsdFormat; // //impl Format for StatsdFormat { -// fn template(&self, name: &Name, kind: Kind) -> Template { +// fn template(&self, name: &Name, kind: InputKind) -> Template { // let mut before_value = name.join("."); // before_value.push(':'); // // let mut after_value = String::with_capacity(16); // after_value.push('|'); // after_value.push_str(match kind { -// Kind::Marker | Kind::Counter => "c", -// Kind::Gauge => "g", -// Kind::Timer => "ms", +// InputKind::Marker | InputKind::Counter => "c", +// InputKind::Gauge => "g", +// InputKind::Timer => "ms", // }); // // // specify sampling rate if any @@ -232,7 +232,7 @@ impl Drop for StatsdScope { // // scale timer values // let value_text = match kind { // // timers are in µs, statsd wants ms -// Kind::Timer => ScaledValueAsText(1000), +// InputKind::Timer => ScaledValueAsText(1000), // _ => ValueAsText, // }; // @@ -258,7 +258,7 @@ mod bench { #[bench] pub fn immediate_statsd(b: &mut test::Bencher) { let sd = Statsd::send_to("localhost:2003").unwrap().input(); - let timer = sd.new_metric("timer".into(), Kind::Timer); + let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); } @@ -267,7 +267,7 @@ mod bench { pub fn buffering_statsd(b: &mut test::Bencher) { let sd = Statsd::send_to("localhost:2003").unwrap() .buffered(Buffering::BufferSize(65465)).input(); - let timer = sd.new_metric("timer".into(), Kind::Timer); + let timer = sd.new_metric("timer".into(), InputKind::Timer); b.iter(|| test::black_box(timer.write(2000, labels![]))); } diff --git a/src/output/stream.rs b/src/output/stream.rs index 76ec698..7990e8f 100755 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -3,9 +3,9 @@ // TODO parameterize templates use core::{Flush}; -use core::input::Kind; -use core::attributes::{Attributes, WithAttributes, Buffered, Naming}; -use core::name::Name; +use core::input::InputKind; +use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed}; +use core::name::MetricName; use core::output::{Output, OutputMetric, OutputScope}; use core::error; @@ -118,8 +118,8 @@ impl WithAttributes for TextScope { impl Buffered for TextScope {} impl OutputScope for TextScope { - fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { + let name = self.prefix_append(name); let template = self.output.format.template(&name, kind); let entries = self.entries.clone(); @@ -180,13 +180,13 @@ impl Drop for TextScope { #[cfg(test)] mod test { use super::*; - use core::input::Kind; + use core::input::InputKind; use std::io; #[test] fn sink_print() { let c = super::Stream::write_to(io::stdout()).output(); - let m = c.new_metric("test".into(), Kind::Marker); + let m = c.new_metric("test".into(), InputKind::Marker); m.write(33, labels![]); } } diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 0d2bae9..520ed10 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -2,14 +2,14 @@ //! Metrics definitions are still synchronous. //! If queue size is exceeded, calling code reverts to blocking. -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; -use core::input::{Kind, Input, InputScope, InputDyn, InputMetric}; -use core::{Value, Flush}; +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; +use core::input::{InputKind, Input, InputScope, InputDyn, InputMetric}; +use core::{MetricValue, Flush}; use core::metrics; use cache::cache_in::CachedInput; use core::error; -use ::{ Labels}; +use core::label::Labels; use std::sync::Arc; use std::sync::mpsc; @@ -90,7 +90,7 @@ impl Input for InputQueue { /// Async commands should be of no concerns to applications. pub enum InputQueueCmd { /// Send metric write - Write(InputMetric, Value, Labels), + Write(InputMetric, MetricValue, Labels), /// Send metric flush Flush(Arc), } @@ -121,8 +121,8 @@ impl WithAttributes for InputQueueScope { } impl InputScope for InputQueueScope { - fn new_metric(&self, name: Name, kind:Kind) -> InputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); let target_metric = self.target.new_metric(name, kind); let sender = self.sender.clone(); InputMetric::new(move |value, mut labels| { diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index 2986ee4..8214bef 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -1,16 +1,16 @@ //! Queue metrics for write on a separate thread, //! RawMetrics definitions are still synchronous. //! If queue size is exceeded, calling code reverts to blocking. -//! -use core::attributes::{Attributes, WithAttributes, Naming}; -use core::name::Name; -use core::input::{Kind, Input, InputScope, InputMetric}; + +use core::attributes::{Attributes, WithAttributes, Prefixed}; +use core::name::MetricName; +use core::input::{InputKind, Input, InputScope, InputMetric}; use core::output::{OutputDyn, OutputScope, OutputMetric, Output}; -use core::{Value, Flush}; +use core::{MetricValue, Flush}; use core::metrics; use cache::cache_in; use core::error; -use ::{Labels}; +use core::label::Labels; use std::rc::Rc; use std::ops; @@ -95,7 +95,7 @@ impl Input for OutputQueue { /// Async commands should be of no concerns to applications. pub enum OutputQueueCmd { /// Send metric write - Write(Arc, Value, Labels), + Write(Arc, MetricValue, Labels), /// Send metric flush Flush(Arc), } @@ -115,8 +115,8 @@ impl WithAttributes for OutputQueueScope { } impl InputScope for OutputQueueScope { - fn new_metric(&self, name: Name, kind:Kind) -> InputMetric { - let name = self.naming_append(name); + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); let target_metric = Arc::new(self.target.new_metric(name, kind)); let sender = self.sender.clone(); InputMetric::new(move |value, mut labels| {