diff --git a/Cargo.toml b/Cargo.toml index abdae28..e4391a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "examples/counter_timer_gauge/", "examples/multi_outs/", "examples/aggregate_print/", + "examples/summary_print/", "examples/async_print/", "examples/custom_publish/", "examples/raw_log/", diff --git a/README.md b/README.md index 926ac2b..564f1d4 100644 --- a/README.md +++ b/README.md @@ -92,29 +92,31 @@ let _app_metrics = app_metrics(to_stdout()).with_async_queue(64); The async queue uses a Rust channel and a standalone thread. The current behavior is to block when full. -Metric definitions can be cached to make using _ad-hoc metrics_ faster: -```rust,skt-run -let app_metrics = app_metrics(to_log().with_cache(512)); -app_metrics.gauge(format!("my_gauge_{}", 34)).value(44); -``` - -The preferred way is to _predefine metrics_, -possibly in a [lazy_static!](https://crates.io/crates/lazy_static) block: +For better performance and easy maintenance, metrics should usually be predefined: ```rust,skt-plain +#[macro_use] extern crate dipstick; #[macro_use] extern crate lazy_static; -extern crate dipstick; - use dipstick::*; -lazy_static! { - pub static ref METRICS: AppMetrics = app_metrics(to_stdout()); - pub static ref COUNTER_A: AppCounter = METRICS.counter("counter_a"); -} +app_metric!(String, APP_METRICS, app_metrics(to_stdout())); +app_counter!(String, APP_METRICS, { + COUNTER_A: "counter_a", +}); fn main() { COUNTER_A.count(11); } ``` +Metric definition macros are just `lazy_static!` wrappers. + + +Where necessary, metrics can be defined _ad-hoc_: +```rust,skt-run +let user_name = "john_day"; +let app_metrics = app_metrics(to_log().with_cache(512)); +app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44); +``` +Defining a cache is optional but will speed up re-definition of common ad-hoc metrics. Timers can be used multiple ways: ```rust,skt-run diff --git a/examples/aggregate_print/src/main.rs b/examples/aggregate_print/src/main.rs index 99f2e7a..6009cd4 100644 --- a/examples/aggregate_print/src/main.rs +++ b/examples/aggregate_print/src/main.rs @@ -7,7 +7,7 @@ use std::time::Duration; use dipstick::*; fn main() { - let to_aggregate = aggregate(summary, to_stdout()); + let to_aggregate = aggregate(all_stats, to_stdout()); let app_metrics = app_metrics(to_aggregate); diff --git a/examples/async_print/src/main.rs b/examples/async_print/src/main.rs index 0155d39..832d212 100644 --- a/examples/async_print/src/main.rs +++ b/examples/async_print/src/main.rs @@ -13,7 +13,7 @@ fn main() { let counter = metrics.counter("counter_a"); let timer = metrics.timer("timer_b"); - let subsystem_metrics = metrics.with_prefix("subsystem."); + let subsystem_metrics = metrics.with_name("subsystem"); let event = subsystem_metrics.marker("event_c"); let gauge = subsystem_metrics.gauge("gauge_d"); diff --git a/examples/counter_timer_gauge/src/main.rs b/examples/counter_timer_gauge/src/main.rs index 7eb809b..f9e4600 100644 --- a/examples/counter_timer_gauge/src/main.rs +++ b/examples/counter_timer_gauge/src/main.rs @@ -19,7 +19,7 @@ fn main() { app_metrics.counter("just_once").count(4); // metric names can be prepended with a common prefix - let prefixed_metrics = app_metrics.with_prefix("subsystem."); + let prefixed_metrics = app_metrics.with_name("subsystem".to_string()); let event = prefixed_metrics.marker("event_c"); let gauge = prefixed_metrics.gauge("gauge_d"); diff --git a/examples/graphite/src/main.rs b/examples/graphite/src/main.rs index 904a361..563baf5 100644 --- a/examples/graphite/src/main.rs +++ b/examples/graphite/src/main.rs @@ -11,7 +11,7 @@ fn main() { let metrics = app_metrics( to_graphite("localhost:2003").expect("Connecting") - .with_namespace(&["my", "app"])); + .with_namespace(&["my", "app"][..])); loop { metrics.counter("counter_a").count(123); diff --git a/examples/multi_outs/src/main.rs b/examples/multi_outs/src/main.rs index de28ace..3c92cd7 100644 --- a/examples/multi_outs/src/main.rs +++ b/examples/multi_outs/src/main.rs @@ -15,8 +15,8 @@ fn main() { let same_type_metrics = app_metrics( &[ // use slices to combine multiple metrics of the same type - to_stdout().with_prefix("yeah"), - to_stdout().with_prefix("ouch"), + to_stdout().with_name("yeah"), + to_stdout().with_name("ouch"), to_stdout().with_sampling_rate(0.5), ][..], ); diff --git a/examples/summary_print/Cargo.toml b/examples/summary_print/Cargo.toml new file mode 100644 index 0000000..b1447e9 --- /dev/null +++ b/examples/summary_print/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "summary_print" +version = "0.0.0" +workspace = "../../" + +[dependencies] +dipstick = { path = '../../' } diff --git a/examples/summary_print/src/main.rs b/examples/summary_print/src/main.rs new file mode 100644 index 0000000..99f2e7a --- /dev/null +++ b/examples/summary_print/src/main.rs @@ -0,0 +1,37 @@ +//! A sample application continuously aggregating metrics, +//! printing the summary stats every three seconds + +extern crate dipstick; + +use std::time::Duration; +use dipstick::*; + +fn main() { + let to_aggregate = aggregate(summary, to_stdout()); + + let app_metrics = app_metrics(to_aggregate); + + app_metrics.flush_every(Duration::from_secs(3)); + + let counter = app_metrics.counter("counter_a"); + let timer = app_metrics.timer("timer_a"); + let gauge = app_metrics.gauge("gauge_a"); + let marker = app_metrics.marker("marker_a"); + + loop { + // add counts forever, non-stop + counter.count(11); + counter.count(12); + counter.count(13); + + timer.interval_us(11_000_000); + timer.interval_us(12_000_000); + timer.interval_us(13_000_000); + + gauge.value(11); + gauge.value(12); + gauge.value(13); + + marker.mark(); + } +} diff --git a/src/app_metrics.rs b/src/app_metrics.rs index 68a6a06..fc45707 100644 --- a/src/app_metrics.rs +++ b/src/app_metrics.rs @@ -220,9 +220,9 @@ where } impl WithNamespace for AppMetrics { - fn with_namespace(&self, nspace: &[&str]) -> Self { + fn with_name>(&self, names: IN) -> Self { AppMetrics { - chain: Arc::new(self.chain.with_namespace(nspace)), + chain: Arc::new(self.chain.with_name(names)), scope: self.scope.clone(), } } diff --git a/src/async_queue.rs b/src/async_queue.rs index 8debb39..322f224 100644 --- a/src/async_queue.rs +++ b/src/async_queue.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use std::sync::mpsc; use std::thread; -mod_metric!(Aggregate, QUEUE_METRICS = DIPSTICK_METRICS.with_prefix("async_queue")); +mod_metrics!(Aggregate, QUEUE_METRICS = DIPSTICK_METRICS.with_prefix("async_queue")); mod_marker!(Aggregate, QUEUE_METRICS, { SEND_FAILED: "send_failed" }); /// Enqueue collected metrics for dispatch on background thread. diff --git a/src/core.rs b/src/core.rs index 98fedb6..1dea59a 100644 --- a/src/core.rs +++ b/src/core.rs @@ -209,9 +209,9 @@ impl Chain { /// Create a new metric chain with the provided metric definition and scope creation functions. pub fn new(make_metric: MF, make_scope: WF) -> Self - where - MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static, - WF: Fn(bool) -> ControlScopeFn + Send + Sync + 'static, + where + MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static, + WF: Fn(bool) -> ControlScopeFn + Send + Sync + 'static, { Chain { // capture the provided closures in Arc to provide cheap clones @@ -246,8 +246,8 @@ impl Chain { /// Intercept metric definition without changing the metric type. pub fn mod_metric(&self, mod_fn: MF) -> Chain - where - MF: Fn(DefineMetricFn) -> DefineMetricFn, + where + MF: Fn(DefineMetricFn) -> DefineMetricFn, { Chain { define_metric_fn: mod_fn(self.define_metric_fn.clone()), @@ -257,9 +257,9 @@ impl Chain { /// Intercept both metric definition and scope creation, possibly changing the metric type. pub fn mod_both(&self, mod_fn: MF) -> Chain - where - MF: Fn(DefineMetricFn, OpenScopeFn) -> (DefineMetricFn, OpenScopeFn), - N: Clone + Send + Sync, + where + MF: Fn(DefineMetricFn, OpenScopeFn) -> (DefineMetricFn, OpenScopeFn), + N: Clone + Send + Sync, { let (metric_fn, scope_fn) = mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone()); @@ -271,8 +271,8 @@ impl Chain { /// Intercept scope creation. pub fn mod_scope(&self, mod_fn: MF) -> Self - where - MF: Fn(OpenScopeFn) -> OpenScopeFn, + where + MF: Fn(OpenScopeFn) -> OpenScopeFn, { Chain { define_metric_fn: self.define_metric_fn.clone(), @@ -391,4 +391,3 @@ impl ScopeTimer { value } } - diff --git a/src/dispatch.rs b/src/dispatch.rs new file mode 100644 index 0000000..0eedeb7 --- /dev/null +++ b/src/dispatch.rs @@ -0,0 +1,102 @@ +use core::*; +use chain::*; +use std::collections::{HashMap, LinkedList}; + +pub struct MetricHandle (usize); + +pub struct ScopeHandle (usize); + +pub trait Observer { + /// Register a new metric. + /// Only one metric of a certain name will be defined. + /// Observer must return a MetricHandle that uniquely identifies the metric. + fn metric_create(&self, kind: Kind, name: &str, rate: Rate) -> MetricHandle; + + /// Drop a previously registered metric. + /// Drop is called once per handle. + /// Dropped handle will never be used again. + /// Drop is only called with previously registered handles. + fn metric_drop(&self, metric: MetricHandle); + + /// Open a new scope. + /// Observer must return a new ScopeHandle that uniquely identifies the scope. + fn scope_open(&self, buffered: bool) -> ScopeHandle; + + /// Write metric value to a scope. + /// Observers only receive previously registered handles. + fn scope_write(&self, scope: ScopeHandle, metric: MetricHandle, value:Value); + + /// Flush a scope. + /// Observers only receive previously registered handles. + fn scope_flush(&self, scope: ScopeHandle); + + /// Drop a previously registered scope. + /// Drop is called once per handle. + /// Dropped handle will never be used again. + /// Drop is only called with previously registered handles. + fn scope_close(&self, scope: ScopeHandle); +} + +pub struct ChainObserver { + chain: Chain +} + +impl Observer for ChainObserver { + fn metric_create(&self, kind: Kind, name: &str, rate: Rate) -> MetricHandle { + self.chain.define_metric(kind, name, rate) + } + + fn metric_drop(&self, metric: MetricHandle) {} + + fn scope_open(&self, buffered: bool) -> ScopeHandle {} + fn scope_write(&self, scope: ScopeHandle, metric: MetricHandle, value:Value) {} + fn scope_flush(&self, scope: ScopeHandle) {} + fn scope_close(&self, scope: ScopeHandle) {} +} + +pub struct Dispatcher { + active_observers: usize, + metrics: HashMap, + observers: RwLock> +} + +/// Aggregate metrics in memory. +/// Depending on the type of metric, count, sum, minimum and maximum of values will be tracked. +/// Needs to be connected to a publish to be useful. +/// ``` +/// use dipstick::*; +/// let sink = aggregate(4, summary, to_stdout()); +/// let metrics = global_metrics(sink); +/// metrics.marker("my_event").mark(); +/// metrics.marker("my_event").mark(); +/// ``` +pub fn dispatch(stat_fn: E, to_chain: Chain) -> Chain + where + E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static, + M: Clone + Send + Sync + Debug + 'static, +{ + let metrics = Arc::new(RwLock::new(HashMap::new())); + let metrics0 = metrics.clone(); + + let publish = Arc::new(Publisher::new(stat_fn, to_chain)); + + Chain::new( + move |kind, name, _rate| { + // add metric + }, + move |_buffered| { + // open scope + ControlScopeFn::new(move |cmd| match cmd { + ScopeCmd::Write(metric, value) => { + let metric: &Aggregate = metric; + metric.update(value) + }, + ScopeCmd::Flush => { + let metrics = metrics.read().expect("Locking metrics scoreboards"); + let snapshot = metrics.values().flat_map(|score| score.reset()).collect(); + publish.publish(snapshot); + } + }) + }, + ) +} \ No newline at end of file diff --git a/src/graphite.rs b/src/graphite.rs index 8524385..86f2351 100644 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -13,7 +13,7 @@ use std::fmt::Debug; use socket::RetrySocket; -mod_metric!(Aggregate, GRAPHITE_METRICS = DIPSTICK_METRICS.with_prefix("graphite")); +mod_metrics!(Aggregate, GRAPHITE_METRICS = DIPSTICK_METRICS.with_prefix("graphite")); mod_marker!(Aggregate, GRAPHITE_METRICS, { SEND_ERR: "send_failed", TRESHOLD_EXCEEDED: "bufsize_exceeded", diff --git a/src/lib.rs b/src/lib.rs index 02da545..5b03f3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,11 +33,14 @@ mod lru_cache; pub mod error; +#[macro_use] +pub mod macros; + pub mod core; pub use core::*; -#[macro_use] -pub mod macros; +//pub mod dispatch; +//pub use dispatch::*; mod output; pub use output::*; diff --git a/src/macros.rs b/src/macros.rs index 9074862..39e5b50 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -22,7 +22,7 @@ macro_rules! time { /// Define application-scoped metrics. #[macro_export] -macro_rules! app_metric { +macro_rules! app_metrics { ($type_param: ty, $metric_id: ident = $app_metrics: expr) => { lazy_static! { pub static ref $metric_id: AppMetrics<$type_param> = $app_metrics; } }; @@ -66,7 +66,7 @@ macro_rules! app_timer { /// Define module-scoped metrics. #[macro_export] -macro_rules! mod_metric { +macro_rules! mod_metrics { ($type_param: ty, $metric_id: ident = $mod_metrics: expr) => { lazy_static! { static ref $metric_id: AppMetrics<$type_param> = $mod_metrics; } }; @@ -111,7 +111,7 @@ macro_rules! mod_timer { mod test_app { use self_metrics::*; - app_metric!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix")); + app_metrics!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix")); app_marker!(Aggregate, TEST_METRICS, { M1: "failed", @@ -154,7 +154,7 @@ mod test_app { mod test_mod { use self_metrics::*; - mod_metric!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix")); + mod_metrics!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix")); mod_marker!(Aggregate, TEST_METRICS, { M1: "failed", diff --git a/src/namespace.rs b/src/namespace.rs index 49ae798..f729279 100644 --- a/src/namespace.rs +++ b/src/namespace.rs @@ -3,26 +3,72 @@ use core::*; use std::sync::Arc; +const DEFAULT_SEPARATOR: &'static str = "."; + +/// A list of parts of a metric's name. +#[derive(Debug, Clone)] +pub struct Namespace (Vec); + +impl Namespace { + /// Make this namespace a subspace of the parent. + pub fn subspace_of(self, parent: &Namespace) -> Self { + Namespace([parent.0.clone(), self.0].concat()) + } + + /// Combine name parts into a string. + pub fn join(&self, separator: &str) -> String { + self.0.join(separator) + } +} + +impl<'a> From<&'a str> for Namespace { + fn from(name: &'a str) -> Namespace { + Namespace(vec![name.to_string()]) + } +} + +impl From for Namespace { + fn from(name: String) -> Namespace { + Namespace(vec![name]) + } +} + +impl<'a, 'b: 'a> From<&'b [&'a str]> for Namespace { + fn from(names: &'a [&'a str]) -> Namespace { + Namespace(names.iter().map(|n| n.to_string()).collect()) + } +} + + /// Prepend metric names with custom prefix. pub trait WithNamespace where Self: Sized, { /// Insert prefix in newly defined metrics. +// #[deprecated(since = "0.6.3", note = "Use `with_name` instead.")] fn with_prefix>(&self, prefix: AS) -> Self { self.with_namespace(&[prefix.as_ref()]) } /// Join namespace and prepend in newly defined metrics. - fn with_namespace(&self, names: &[&str]) -> Self; +// #[deprecated(since = "0.6.3", note = "Use `with_name` instead.")] + fn with_namespace(&self, names: &[&str]) -> Self { + self.with_name(names) + } + + /// Join namespace and prepend in newly defined metrics. + fn with_name>(&self, names: IN) -> Self; + } impl WithNamespace for Chain { - fn with_namespace(&self, names: &[&str]) -> Self { + fn with_name>(&self, names: IN) -> Self { + let ninto = names.into(); self.mod_metric(|next| { - let nspace = names.join("."); + let nspace = ninto.join(DEFAULT_SEPARATOR); Arc::new(move |kind, name, rate| { - let name = [nspace.as_ref(), name].join("."); + let name = [nspace.as_ref(), name].join(DEFAULT_SEPARATOR); (next)(kind, name.as_ref(), rate) }) }) @@ -31,7 +77,7 @@ impl WithNamespace for Chain { /// deprecated, use with_prefix() omitting any previously supplied separator #[deprecated(since = "0.5.0", - note = "Use `with_prefix` instead, omitting any previously supplied separator.")] + note = "Use `with_name` instead, omitting any previously supplied separator.")] pub fn prefix(prefix: &str, chain: IC) -> Chain where M: Clone + Send + Sync + 'static, diff --git a/src/scores.rs b/src/scores.rs index e3e7f52..5a264e5 100644 --- a/src/scores.rs +++ b/src/scores.rs @@ -99,11 +99,14 @@ impl Scoreboard { if self.snapshot(now, &mut scores) { let duration_seconds = (now - scores[0]) as f64 / 1_000_000_000.0; + println!("duration {}s", duration_seconds); + println!("rate {}/s", scores[2] as f64 / duration_seconds); + let mut snapshot = Vec::new(); match self.kind { Marker => { snapshot.push(Count(scores[1] as u64)); - snapshot.push(Rate(scores[2] as f64 / duration_seconds)) + snapshot.push(Rate(scores[1] as f64 / duration_seconds)) } Gauge => { snapshot.push(Max(scores[3] as u64)); diff --git a/src/self_metrics.rs b/src/self_metrics.rs index 7ef2b3f..0350714 100644 --- a/src/self_metrics.rs +++ b/src/self_metrics.rs @@ -28,4 +28,4 @@ fn build_self_metrics() -> AppMetrics { lazy_static! { static ref AGGREGATOR: Chain = build_aggregator(); } /// Application metrics are collected to the aggregator -app_metric!(Aggregate, DIPSTICK_METRICS = build_self_metrics()); +app_metrics!(Aggregate, DIPSTICK_METRICS = build_self_metrics()); diff --git a/src/statsd.rs b/src/statsd.rs index 5af05e0..61f2169 100644 --- a/src/statsd.rs +++ b/src/statsd.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, RwLock}; pub use std::net::ToSocketAddrs; -mod_metric!(Aggregate, STATSD_METRICS = DIPSTICK_METRICS.with_prefix("statsd")); +mod_metrics!(Aggregate, STATSD_METRICS = DIPSTICK_METRICS.with_prefix("statsd")); mod_marker!(Aggregate, STATSD_METRICS, { SEND_ERR: "send_failed" }); mod_counter!(Aggregate, STATSD_METRICS, { SENT_BYTES: "sent_bytes" });