mirror of https://github.com/fralalonde/dipstick
parent
d6bc8e22b4
commit
1a6ac869bd
|
@ -25,17 +25,16 @@ lazy_static = "1.0"
|
|||
derivative = "1.0"
|
||||
atomic_refcell = "0.1"
|
||||
skeptic = { version = "0.13", optional = true }
|
||||
num = { version = "0.1", default-features = false }
|
||||
|
||||
[build-dependencies]
|
||||
skeptic = { version = "0.13", optional = true }
|
||||
|
||||
[dependencies.num]
|
||||
default-features = false
|
||||
version = "0.1"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
bench = []
|
||||
# enables the use of the mock metric clock outside of dipstick's own tests
|
||||
# this disables real-time metrics clock and should not be used outside of tests
|
||||
mock_clock = []
|
||||
|
||||
[package.metadata.release]
|
||||
#sign-commit = true
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
#!make
|
||||
|
||||
# SCCACHE can be 'local' or 'off' (default)
|
||||
# local will cache in ~/.cache/sccache
|
||||
SCCACHE ?= off
|
||||
SCCACHE_CMD ?= ~/.cargo/bin/sccache
|
||||
|
||||
CARGO_CMD ?= $(if $(filter off,$(SCCACHE)),,RUSTC_WRAPPER=$(SCCACHE_CMD) )cargo
|
||||
|
||||
# Default target
|
||||
all: test examples bench
|
||||
|
||||
CARGO_TEST_FLAGS ?=
|
||||
CARGO_BUILD_FLAGS ?=
|
||||
|
||||
# 'test' is a friendly alias for 'unit_test'
|
||||
test:
|
||||
$(CARGO_CMD) test
|
||||
|
||||
examples:
|
||||
$(CARGO_CMD) build --examples
|
||||
|
||||
bench:
|
||||
$(CARGO_CMD) +nightly bench --features "bench"
|
||||
|
||||
clean:
|
||||
$(CARGO_CMD) clean
|
||||
|
||||
publish: test examples bench
|
||||
cargo publish
|
||||
|
||||
.PHONY: all build clean test examples bench publish
|
||||
|
|
@ -8,7 +8,7 @@ use std::time::Duration;
|
|||
use dipstick::*;
|
||||
|
||||
fn main() {
|
||||
let metrics = metric_scope(to_stdout().with_async_queue(0));
|
||||
let metrics = to_stdout().with_async_queue(0).open_scope();
|
||||
|
||||
let counter = metrics.counter("counter_a");
|
||||
let timer = metrics.timer("timer_b");
|
||||
|
|
|
@ -9,11 +9,11 @@ use std::time::Duration;
|
|||
fn main() {
|
||||
// badlog::init(Some("info"));
|
||||
|
||||
let metrics = metric_scope(
|
||||
let metrics =
|
||||
to_graphite("localhost:2003")
|
||||
.expect("Connecting")
|
||||
.with_suffix("my_app"),
|
||||
);
|
||||
.with_suffix("my_app")
|
||||
.open_scope();
|
||||
|
||||
loop {
|
||||
metrics.counter("counter_a").count(123);
|
||||
|
|
|
@ -7,7 +7,7 @@ use dipstick::*;
|
|||
|
||||
fn main() {
|
||||
// print only 1 out of every 10000 metrics recorded
|
||||
let app_metrics: MetricScope<String> = metric_scope(to_stdout().with_sampling_rate(0.0001));
|
||||
let app_metrics = to_stdout().with_sampling_rate(0.0001).open_scope();
|
||||
|
||||
let marker = app_metrics.marker("marker_a");
|
||||
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
//! Maintain aggregated metrics for deferred reporting,
|
||||
//!
|
||||
use core::{command_fn, Kind, Sampling, Command, Value, Namespace};
|
||||
use clock::TimeHandle;
|
||||
use core::Kind::*;
|
||||
use output::{OpenScope, NO_METRIC_OUTPUT, MetricOutput};
|
||||
use scope::{self, MetricScope, MetricInput, Flush, ScheduleFlush, DefineMetric,};
|
||||
use scope::{MetricScope, MetricInput, Flush, ScheduleFlush, DefineMetric};
|
||||
|
||||
use scores::{ScoreType, Scoreboard};
|
||||
use scores::ScoreType::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Instant;
|
||||
|
||||
/// A function type to transform aggregated scores into publishable statistics.
|
||||
pub type StatsFn = Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static;
|
||||
|
@ -29,9 +29,6 @@ lazy_static! {
|
|||
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OpenScope + Sync + Send>> = RwLock::new(initial_output());
|
||||
}
|
||||
|
||||
/// 1024 Metrics per scoreboard should be enough?
|
||||
const DEFAULT_CAPACITY: usize = 1024;
|
||||
|
||||
impl From<MetricAggregator> for MetricScope<Aggregate> {
|
||||
fn from(agg: MetricAggregator) -> MetricScope<Aggregate> {
|
||||
agg.into_scope()
|
||||
|
@ -66,46 +63,47 @@ pub struct MetricAggregator {
|
|||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
struct InnerAggregator {
|
||||
metrics: HashMap<Namespace, Arc<Scoreboard>>,
|
||||
period_start: Instant,
|
||||
metrics: BTreeMap<Namespace, Arc<Scoreboard>>,
|
||||
period_start: TimeHandle,
|
||||
#[derivative(Debug = "ignore")]
|
||||
stats: Option<Arc<Fn(Kind, Namespace, ScoreType)
|
||||
-> Option<(Kind, Namespace, Value)> + Send + Sync + 'static>>,
|
||||
output: Option<Arc<OpenScope + Sync + Send>>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref PERIOD_LENGTH: Namespace = "_period_length".into();
|
||||
}
|
||||
|
||||
impl InnerAggregator {
|
||||
/// Take a snapshot of aggregated values and reset them.
|
||||
/// Compute stats on captured values using assigned or default stats function.
|
||||
/// Write stats to assigned or default output.
|
||||
pub fn flush_to(&mut self, publish_scope: &DefineMetric, stats_fn: Arc<StatsFn>) {
|
||||
pub fn flush_to(&mut self, publish_scope: &DefineMetric, stats_fn: &StatsFn) {
|
||||
|
||||
let now = Instant::now();
|
||||
let duration = now - self.period_start;
|
||||
let duration_seconds = (duration.subsec_nanos() / 1_000_000_000) as f64 + duration.as_secs() as f64;
|
||||
let now = TimeHandle::now();
|
||||
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
|
||||
self.period_start = now;
|
||||
|
||||
let snapshot: Vec<(&Namespace, Kind, Vec<ScoreType>)> = self.metrics.iter()
|
||||
let mut snapshot: Vec<(&Namespace, Kind, Vec<ScoreType>)> = self.metrics.iter()
|
||||
.flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) {
|
||||
Some((name, scores.metric_kind(), values))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
// snapshot.push((Kind::Counter, "_duration_ms".to_string(), vec![ScoreType::Sum((duration_seconds * 1000.0) as u64)]));
|
||||
|
||||
if snapshot.is_empty() {
|
||||
// no data was collected for this period
|
||||
// TODO repeat previous frame min/max ?
|
||||
// TODO update some canary metric ?
|
||||
} else {
|
||||
snapshot.push((&PERIOD_LENGTH, Timer, vec![Sum((duration_seconds * 1000.0) as u64)]));
|
||||
for metric in snapshot {
|
||||
for score in metric.2 {
|
||||
let filtered = (stats_fn)(metric.1, metric.0.clone(), score);
|
||||
if let Some((kind, name, value)) = filtered {
|
||||
publish_scope
|
||||
.define_metric_object(&name, kind, 1.0)
|
||||
.write(value);
|
||||
publish_scope.define_metric_object(&name, kind, 1.0)(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,16 +115,11 @@ impl InnerAggregator {
|
|||
impl MetricAggregator {
|
||||
/// Build a new metric aggregation
|
||||
pub fn new() -> MetricAggregator {
|
||||
MetricAggregator::with_capacity(DEFAULT_CAPACITY)
|
||||
}
|
||||
|
||||
/// Build a new metric aggregation point with initial capacity of metrics to aggregate.
|
||||
pub fn with_capacity(size: usize) -> MetricAggregator {
|
||||
MetricAggregator {
|
||||
namespace: "".into(),
|
||||
inner: Arc::new(RwLock::new(InnerAggregator {
|
||||
metrics: HashMap::with_capacity(size),
|
||||
period_start: Instant::now(),
|
||||
metrics: BTreeMap::new(),
|
||||
period_start: TimeHandle::now(),
|
||||
stats: None,
|
||||
output: None,
|
||||
}))
|
||||
|
@ -201,6 +194,12 @@ impl MetricAggregator {
|
|||
)
|
||||
}
|
||||
|
||||
/// Flush the aggregator scores using the specified scope and stats.
|
||||
pub fn flush_to(&self, publish_scope: &DefineMetric, stats_fn: &StatsFn) {
|
||||
let mut inner = self.inner.write().expect("Aggregator");
|
||||
inner.flush_to(publish_scope, stats_fn);
|
||||
}
|
||||
|
||||
// /// Discard scores for ad-hoc metrics.
|
||||
// pub fn cleanup(&self) {
|
||||
// let orphans: Vec<Namespace> = self.inner.read().expect("Aggregator").metrics.iter()
|
||||
|
@ -220,25 +219,6 @@ impl MetricAggregator {
|
|||
}
|
||||
|
||||
impl MetricInput<Aggregate> for MetricAggregator {
|
||||
/// Define an event counter of the provided name.
|
||||
fn marker(&self, name: &str) -> scope::Marker {
|
||||
self.into_scope().marker(name)
|
||||
}
|
||||
|
||||
/// Define a counter of the provided name.
|
||||
fn counter(&self, name: &str) -> scope::Counter {
|
||||
self.into_scope().counter(name)
|
||||
}
|
||||
|
||||
/// Define a timer of the provided name.
|
||||
fn timer(&self, name: &str) -> scope::Timer {
|
||||
self.into_scope().timer(name)
|
||||
}
|
||||
|
||||
/// Define a gauge of the provided name.
|
||||
fn gauge(&self, name: &str) -> scope::Gauge {
|
||||
self.into_scope().gauge(name)
|
||||
}
|
||||
|
||||
/// Lookup or create a scoreboard for the requested metric.
|
||||
fn define_metric(&self, name: &Namespace, kind: Kind, _rate: Sampling) -> Aggregate {
|
||||
|
@ -291,7 +271,7 @@ impl Flush for MetricAggregator {
|
|||
&None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().open_scope_object(),
|
||||
};
|
||||
|
||||
inner.flush_to(pub_scope.as_ref(), stats_fn);
|
||||
inner.flush_to(pub_scope.as_ref(), stats_fn.as_ref());
|
||||
|
||||
// TODO parameterize whether to keep ad-hoc metrics after publish
|
||||
// source.cleanup();
|
||||
|
|
|
@ -77,17 +77,6 @@ impl<M: Send + Sync + Clone + 'static> WithAsyncQueue for MetricOutput<M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Enqueue collected metrics for dispatch on background thread.
|
||||
#[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")]
|
||||
pub fn async<M, IC>(queue_size: usize, chain: IC) -> MetricOutput<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
IC: Into<MetricOutput<M>>,
|
||||
{
|
||||
let chain = chain.into();
|
||||
chain.with_async_queue(queue_size)
|
||||
}
|
||||
|
||||
/// Carry the scope command over the queue, from the sender, to be executed by the receiver.
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
use std::time::Instant;
|
||||
use core::Value;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
/// A handle to the start time of a counter.
|
||||
/// Wrapped so it may be changed safely later.
|
||||
pub struct TimeHandle(Instant);
|
||||
|
||||
impl TimeHandle {
|
||||
/// Get a handle on current time.
|
||||
/// Used by the TimerMetric start_time() method.
|
||||
pub fn now() -> TimeHandle {
|
||||
TimeHandle(self::inner::now())
|
||||
}
|
||||
|
||||
/// Get the elapsed time in microseconds since TimeHanduule was obtained.
|
||||
pub fn elapsed_us(self) -> Value {
|
||||
let duration = self::inner::now() - self.0;
|
||||
duration.as_secs() * 1000000 + (duration.subsec_nanos() / 1000) as Value
|
||||
}
|
||||
|
||||
/// Get the elapsed time in microseconds since TimeHandle was obtained.
|
||||
pub fn elapsed_ms(self) -> Value {
|
||||
self.elapsed_us() / 1000
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(mock_clock, test)))]
|
||||
mod inner {
|
||||
use std::time::Instant;
|
||||
|
||||
pub fn now() -> Instant {
|
||||
Instant::now()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(mock_clock, test))]
|
||||
pub mod inner {
|
||||
use std::ops::Add;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::sync::RwLock;
|
||||
|
||||
lazy_static!{
|
||||
static ref MOCK_CLOCK: RwLock<Instant> = RwLock::new(Instant::now());
|
||||
}
|
||||
|
||||
|
||||
/// Metrics mock_clock enabled!
|
||||
/// thread::sleep will have no effect on metrics.
|
||||
/// Use advance_time() to simulate passing time.
|
||||
pub fn now() -> Instant {
|
||||
MOCK_CLOCK.read().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Advance the mock clock by a certain amount of time.
|
||||
/// Enables writing reproducible metrics tests.
|
||||
pub fn advance_time(period: Duration) {
|
||||
let mut now = MOCK_CLOCK.write().unwrap();
|
||||
let new_now = now.add(period);
|
||||
*now = new_now;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use Value;
|
||||
use clock::inner;
|
||||
|
||||
#[test]
|
||||
fn aggregate_all_stats() {
|
||||
use std::time::Duration;
|
||||
use std::collections::BTreeMap;
|
||||
use aggregate::{MetricAggregator, all_stats};
|
||||
use scope::MetricInput;
|
||||
use local::StatsMap;
|
||||
|
||||
let metrics = MetricAggregator::new().with_suffix("test");
|
||||
|
||||
let counter = metrics.counter("counter_a");
|
||||
let timer = metrics.timer("timer_a");
|
||||
let gauge = metrics.gauge("gauge_a");
|
||||
let marker = metrics.marker("marker_a");
|
||||
|
||||
marker.mark();
|
||||
marker.mark();
|
||||
marker.mark();
|
||||
|
||||
counter.count(10);
|
||||
counter.count(20);
|
||||
|
||||
timer.interval_us(10_000_000);
|
||||
timer.interval_us(20_000_000);
|
||||
|
||||
gauge.value(10);
|
||||
gauge.value(20);
|
||||
|
||||
inner::advance_time(Duration::from_secs(3));
|
||||
|
||||
// TODO expose & use flush_to()
|
||||
let stats = StatsMap::new();
|
||||
metrics.flush_to(&stats, &all_stats);
|
||||
let map: BTreeMap<String, Value> = stats.into();
|
||||
|
||||
assert_eq!(map["test.counter_a.count"], 2);
|
||||
assert_eq!(map["test.counter_a.sum"], 30);
|
||||
assert_eq!(map["test.counter_a.mean"], 15);
|
||||
assert_eq!(map["test.counter_a.rate"], 10);
|
||||
|
||||
assert_eq!(map["test.timer_a.count"], 2);
|
||||
assert_eq!(map["test.timer_a.sum"], 30_000_000);
|
||||
assert_eq!(map["test.timer_a.min"], 10_000_000);
|
||||
assert_eq!(map["test.timer_a.max"], 20_000_000);
|
||||
assert_eq!(map["test.timer_a.mean"], 15_000_000);
|
||||
assert_eq!(map["test.timer_a.rate"], 1);
|
||||
|
||||
assert_eq!(map["test.gauge_a.mean"], 15);
|
||||
assert_eq!(map["test.gauge_a.min"], 10);
|
||||
assert_eq!(map["test.gauge_a.max"], 20);
|
||||
|
||||
assert_eq!(map["test.marker_a.count"], 3);
|
||||
assert_eq!(map["test.marker_a.rate"], 1);
|
||||
}
|
||||
}
|
|
@ -5,7 +5,6 @@
|
|||
use self::Command::*;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
// TODO define an 'AsValue' trait + impl for supported number types, then drop 'num' crate
|
||||
pub use num::ToPrimitive;
|
||||
|
@ -14,32 +13,6 @@ pub use num::ToPrimitive;
|
|||
// TODO should this be f64? f32?
|
||||
pub type Value = u64;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
/// A handle to the start time of a counter.
|
||||
/// Wrapped so it may be changed safely later.
|
||||
pub struct TimeHandle(Instant);
|
||||
|
||||
impl TimeHandle {
|
||||
|
||||
/// Get a handle on current time.
|
||||
/// Used by the TimerMetric start_time() method.
|
||||
pub fn now() -> TimeHandle {
|
||||
TimeHandle(Instant::now())
|
||||
}
|
||||
|
||||
/// Get the elapsed time in microseconds since TimeHanduule was obtained.
|
||||
pub fn elapsed_us(self) -> Value {
|
||||
let duration = Instant::now() - self.0;
|
||||
duration.as_secs() * 1000000 + (duration.subsec_nanos() / 1000) as Value
|
||||
}
|
||||
|
||||
/// Get the elapsed time in microseconds since TimeHandle was obtained.
|
||||
pub fn elapsed_ms(self) -> Value {
|
||||
let duration = Instant::now() - self.0;
|
||||
duration.as_secs() * 1000 + (duration.subsec_nanos() / 1000000) as Value
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Base type for sampling rate.
|
||||
/// - 1.0 records everything
|
||||
|
@ -234,7 +207,7 @@ impl<M> CommandFn<M> {
|
|||
#[cfg(feature = "bench")]
|
||||
mod bench {
|
||||
|
||||
use super::*;
|
||||
use clock::TimeHandle;
|
||||
use test;
|
||||
|
||||
#[bench]
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
use scope::{Marker, Counter, Gauge, Timer, MetricScope};
|
||||
use output::MetricOutput;
|
||||
use core::Sampling;
|
||||
|
||||
use async_queue::WithAsyncQueue;
|
||||
use sample::WithSamplingRate;
|
||||
|
||||
/// Enqueue collected metrics for dispatch on background thread.
|
||||
#[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")]
|
||||
pub fn async<M, IC>(queue_size: usize, chain: IC) -> MetricOutput<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
IC: Into<MetricOutput<M>>,
|
||||
{
|
||||
let chain = chain.into();
|
||||
chain.with_async_queue(queue_size)
|
||||
}
|
||||
|
||||
/// Perform random sampling of values according to the specified rate.
|
||||
#[deprecated(since = "0.5.0", note = "Use `with_sampling_rate` instead.")]
|
||||
pub fn sample<M, IC>(sampling_rate: Sampling, chain: IC) -> MetricOutput<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
IC: Into<MetricOutput<M>>,
|
||||
{
|
||||
let chain = chain.into();
|
||||
chain.with_sampling_rate(sampling_rate)
|
||||
}
|
||||
|
||||
/// Wrap the metrics backend to provide an application-friendly interface.
|
||||
/// Open a metric scope to share across the application.
|
||||
#[deprecated(since = "0.7.0", note = "Use into() instead")]
|
||||
pub fn app_metrics<M, AM>(scope: AM) -> MetricScope<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
AM: Into<MetricScope<M>>,
|
||||
{
|
||||
scope.into()
|
||||
}
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Metrics instead")]
|
||||
pub type AppMetrics<M> = MetricScope<M>;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Marker instead")]
|
||||
pub type AppMarker = Marker;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Counter instead")]
|
||||
pub type AppCounter = Counter;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Gauge instead")]
|
||||
pub type AppGauge = Gauge;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Timer instead")]
|
||||
pub type AppTimer = Timer;
|
||||
|
||||
/// Define application-scoped metrics.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_metrics {
|
||||
($type_param: ty, $metric_id: ident = ($($SCOPE: expr),+ $(,)*)) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = metric_scope(($($SCOPE),*));
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = [$($SCOPE: expr),+ $(,)*]) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = metric_scope(&[$($SCOPE),*][..],);
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = $SCOPE: expr) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = $SCOPE.into();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped markers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_marker {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Marker = $SCOPE.marker( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped counters.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_counter {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Counter = $SCOPE.counter( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped gauges.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_gauge {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Gauge = $SCOPE.gauge( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped timers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_timer {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Timer = $SCOPE.timer( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/////////////
|
||||
// MOD SCOPE
|
||||
|
||||
/// Define module-scoped metrics.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_metrics {
|
||||
($type_param: ty, $metric_id: ident = ($($SCOPE: expr),+ $(,)*)) => {
|
||||
lazy_static! {
|
||||
static ref $metric_id: MetricScope<$type_param> = metric_scope(($($SCOPE),*));
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = [$($SCOPE: expr),+ $(,)*]) => {
|
||||
lazy_static! { static ref $metric_id: MetricScope<$type_param> =
|
||||
metric_scope(&[$($SCOPE),*][..],); }
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = $mod_metrics: expr) => {
|
||||
lazy_static! { static ref $metric_id: MetricScope<$type_param> =
|
||||
$mod_metrics.into(); }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped markers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_marker {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Marker =
|
||||
$mod_metrics.marker( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped counters.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_counter {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Counter =
|
||||
$mod_metrics.counter( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped gauges.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_gauge {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Gauge =
|
||||
$mod_metrics.gauge( $m_exp );)* }
|
||||
};
|
||||
($type_param: ty, $mod_metrics: expr, $metric_id: ident: $m_exp: expr) => {
|
||||
lazy_static! { static ref $metric_id: Gauge =
|
||||
$mod_metrics.gauge( $m_exp ); }
|
||||
}
|
||||
}
|
||||
|
||||
/// Define module-scoped timers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_timer {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Timer =
|
||||
$mod_metrics.timer( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod legacy_test {
|
||||
use self_metrics::*;
|
||||
|
||||
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.with_suffix("test_prefix"));
|
||||
|
||||
app_marker!(<Aggregate> TEST_METRICS => {
|
||||
M1: "failed",
|
||||
M2: "success",
|
||||
});
|
||||
|
||||
app_counter!(<Aggregate> TEST_METRICS => {
|
||||
C1: "failed",
|
||||
C2: "success",
|
||||
});
|
||||
|
||||
app_gauge!(<Aggregate> TEST_METRICS => {
|
||||
G1: "failed",
|
||||
G2: "success",
|
||||
});
|
||||
|
||||
app_timer!(<Aggregate> TEST_METRICS => {
|
||||
T1: "failed",
|
||||
T2: "success",
|
||||
});
|
||||
|
||||
#[test]
|
||||
fn call_old_macro_defined_metrics() {
|
||||
M1.mark();
|
||||
M2.mark();
|
||||
|
||||
C1.count(1);
|
||||
C2.count(2);
|
||||
|
||||
G1.value(1);
|
||||
G2.value(2);
|
||||
|
||||
T1.interval_us(1);
|
||||
T2.interval_us(2);
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
//! Decouple metric definition from configuration with trait objects.
|
||||
|
||||
use core::*;
|
||||
use scope::{self, DefineMetric, MetricScope, WriteMetric, MetricInput, Flush, ScheduleFlush, NO_METRIC_SCOPE};
|
||||
use scope::{DefineMetric, MetricScope, MetricInput, Flush, ScheduleFlush, NO_METRIC_SCOPE};
|
||||
|
||||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
@ -34,7 +34,7 @@ pub struct DispatchMetric {
|
|||
// the second part can be up to namespace.len() + 1 if this metric was individually targeted
|
||||
// 0 if no target assigned
|
||||
#[derivative(Debug = "ignore")]
|
||||
write_metric: (AtomicRefCell<(Box<WriteMetric + Send + Sync>, usize)>),
|
||||
write_metric: (AtomicRefCell<(WriteFn, usize)>),
|
||||
|
||||
// a reference to the the parent dispatcher to remove the metric from when it is dropped
|
||||
#[derivative(Debug = "ignore")]
|
||||
|
@ -202,7 +202,7 @@ impl MetricDispatch {
|
|||
command_fn(move |cmd| match cmd {
|
||||
Command::Write(metric, value) => {
|
||||
let dispatch: &Arc<DispatchMetric> = metric;
|
||||
dispatch.write_metric.borrow().0.write(value);
|
||||
dispatch.write_metric.borrow().0(value);
|
||||
}
|
||||
Command::Flush => disp_1.inner.write().expect("Dispatch Lock").flush(&disp_1.namespace),
|
||||
}),
|
||||
|
@ -212,25 +212,6 @@ impl MetricDispatch {
|
|||
}
|
||||
|
||||
impl MetricInput<Dispatch> for MetricDispatch {
|
||||
/// Define an event counter of the provided name.
|
||||
fn marker(&self, name: &str) -> scope::Marker {
|
||||
self.into_scope().marker(name)
|
||||
}
|
||||
|
||||
/// Define a counter of the provided name.
|
||||
fn counter(&self, name: &str) -> scope::Counter {
|
||||
self.into_scope().counter(name)
|
||||
}
|
||||
|
||||
/// Define a timer of the provided name.
|
||||
fn timer(&self, name: &str) -> scope::Timer {
|
||||
self.into_scope().timer(name)
|
||||
}
|
||||
|
||||
/// Define a gauge of the provided name.
|
||||
fn gauge(&self, name: &str) -> scope::Gauge {
|
||||
self.into_scope().gauge(name)
|
||||
}
|
||||
|
||||
/// Lookup or create a dispatch stub for the requested metric.
|
||||
fn define_metric(&self, name: &Namespace, kind: Kind, rate: Sampling) -> Dispatch {
|
||||
|
@ -262,7 +243,7 @@ impl MetricInput<Dispatch> for MetricDispatch {
|
|||
|
||||
#[inline]
|
||||
fn write(&self, metric: &Dispatch, value: Value) {
|
||||
metric.write_metric.borrow().0.write(value);
|
||||
metric.write_metric.borrow().0(value);
|
||||
}
|
||||
|
||||
fn with_suffix(&self, name: &str) -> Self {
|
||||
|
|
|
@ -193,11 +193,19 @@ mod bench {
|
|||
use test;
|
||||
|
||||
#[bench]
|
||||
pub fn timer_graphite(b: &mut test::Bencher) {
|
||||
pub fn unbufferd_graphite(b: &mut test::Bencher) {
|
||||
let sd = to_graphite("localhost:8125").unwrap().open_scope();
|
||||
let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0);
|
||||
|
||||
b.iter(|| test::black_box(sd.write(&timer, 2000)));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
pub fn buffered_graphite(b: &mut test::Bencher) {
|
||||
let sd = to_buffered_graphite("localhost:8125").unwrap().open_scope();
|
||||
let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0);
|
||||
|
||||
b.iter(|| test::black_box(sd.write(&timer, 2000)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
39
src/lib.rs
39
src/lib.rs
|
@ -25,50 +25,59 @@ pub use error::{Error, Result};
|
|||
pub mod macros;
|
||||
|
||||
pub mod core;
|
||||
pub use core::{Value, Sampling, FULL_SAMPLING_RATE, TimeHandle, Kind, ROOT_NS, Namespace};
|
||||
pub use core::{Value, Sampling, FULL_SAMPLING_RATE, Kind, ROOT_NS, Namespace};
|
||||
|
||||
pub mod output;
|
||||
pub use output::{MetricOutput, NO_METRIC_OUTPUT, OpenScope};
|
||||
|
||||
#[macro_use]
|
||||
pub mod dispatch;
|
||||
pub use dispatch::{MetricDispatch, Dispatch, metric_dispatch};
|
||||
|
||||
#[macro_use]
|
||||
mod aggregate;
|
||||
pub use aggregate::{MetricAggregator, Aggregate};
|
||||
pub use aggregate::{MetricAggregator, Aggregate, summary, all_stats, average};
|
||||
|
||||
mod local;
|
||||
pub use local::*;
|
||||
pub use local::{StatsMap, to_buffered_log, to_buffered_stdout, to_log, to_stdout, to_void};
|
||||
|
||||
mod scope;
|
||||
pub use scope::*;
|
||||
pub use scope::{Marker, Timer, Counter, Gauge, MetricInput, MetricScope, Flush, ScheduleFlush, DefineMetric, metric_scope};
|
||||
|
||||
mod sample;
|
||||
pub use sample::*;
|
||||
pub use sample::WithSamplingRate;
|
||||
|
||||
mod scores;
|
||||
pub use scores::*;
|
||||
pub use scores::ScoreType;
|
||||
|
||||
mod statsd;
|
||||
pub use statsd::*;
|
||||
pub use statsd::{Statsd, to_statsd};
|
||||
|
||||
mod graphite;
|
||||
pub use graphite::*;
|
||||
pub use graphite::{Graphite, to_graphite, to_buffered_graphite};
|
||||
|
||||
mod socket;
|
||||
pub use socket::*;
|
||||
pub use socket::RetrySocket;
|
||||
|
||||
mod cache;
|
||||
pub use cache::*;
|
||||
pub use cache::{add_cache, WithCache};
|
||||
|
||||
mod multi;
|
||||
pub use multi::*;
|
||||
|
||||
mod async_queue;
|
||||
pub use async_queue::*;
|
||||
pub use async_queue::WithAsyncQueue;
|
||||
|
||||
mod schedule;
|
||||
pub use schedule::*;
|
||||
mod scheduler;
|
||||
pub use scheduler::{set_schedule, CancelHandle};
|
||||
|
||||
mod self_metrics;
|
||||
pub use self_metrics::DIPSTICK_METRICS;
|
||||
|
||||
mod clock;
|
||||
pub use clock::TimeHandle;
|
||||
#[cfg(mock_clock)]
|
||||
pub use clock::inner::advance_time;
|
||||
|
||||
// FIXME using * to prevent "use of deprecated" warnings. #[allow(dead_code)] doesnt work?
|
||||
#[macro_use]
|
||||
mod deprecated;
|
||||
pub use deprecated::*;
|
||||
|
|
78
src/local.rs
78
src/local.rs
|
@ -2,9 +2,83 @@
|
|||
|
||||
// TODO parameterize templates
|
||||
// TODO define backing structs that can flush() on Drop
|
||||
use core::*;
|
||||
use output::*;
|
||||
use core::{ROOT_NS, Namespace, Sampling, Value, WriteFn, Kind, command_fn, Command};
|
||||
use output::{MetricOutput, metric_output};
|
||||
use scope::{MetricInput, DefineMetric, Flush};
|
||||
use std::sync::RwLock;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A HashMap wrapper to receive metrics or stats values.
|
||||
/// Every received value for a metric replaces the previous one (if any).
|
||||
#[derive(Clone)]
|
||||
pub struct StatsMap {
|
||||
namespace: Namespace,
|
||||
map: Arc<RwLock<BTreeMap<Namespace, Value>>>,
|
||||
}
|
||||
|
||||
impl StatsMap {
|
||||
/// Create a new StatsMap.
|
||||
pub fn new() -> Self {
|
||||
StatsMap { namespace: ROOT_NS.clone(), map: Arc::new(RwLock::new(BTreeMap::new())) }
|
||||
}
|
||||
|
||||
/// Get the latest published value for the named stat (if it exists).
|
||||
pub fn get(&self, key: &Namespace) -> Option<Value> {
|
||||
self.map.read().expect("StatsMap").get(key).map(|v| *v)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StatsMap> for BTreeMap<String, Value> {
|
||||
fn from(map: StatsMap) -> Self {
|
||||
let inner = map.map.write().expect("StatsMap");
|
||||
inner.clone().into_iter()
|
||||
.map(|(key, value)| (key.join("."), value))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl DefineMetric for StatsMap {
|
||||
fn define_metric_object(&self, name: &Namespace, kind: Kind, rate: Sampling) -> WriteFn {
|
||||
let target_metric = self.define_metric(name, kind, rate);
|
||||
let write_to = self.clone();
|
||||
Arc::new(move |value| write_to.write(&target_metric, value))
|
||||
}
|
||||
}
|
||||
|
||||
impl Flush for StatsMap {}
|
||||
|
||||
|
||||
impl MetricInput<Namespace> for StatsMap {
|
||||
|
||||
fn define_metric(&self, name: &Namespace, _kind: Kind, _rate: Sampling) -> Namespace {
|
||||
name.clone()
|
||||
}
|
||||
|
||||
fn write(&self, metric: &Namespace, value: Value) {
|
||||
self.map.write().expect("StatsMap").insert(metric.clone(), value);
|
||||
}
|
||||
|
||||
fn with_suffix(&self, name: &str) -> Self {
|
||||
Self {
|
||||
namespace: self.namespace.with_suffix(name),
|
||||
map: self.map.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//pub fn to_map(map: StatsMap) -> MetricScope<Namespace> {
|
||||
// let mut map: StatsMap = map;
|
||||
// MetricScope::new(
|
||||
// ().into(),
|
||||
// Arc::new(|name, _kind, _rate| name.clone()),
|
||||
// Arc::new(|cmd|
|
||||
// if let Command::Write(m, v) = cmd {
|
||||
// map.insert(m.clone(), v);
|
||||
// },
|
||||
// )
|
||||
//}
|
||||
|
||||
/// Write metric values to stdout using `println!`.
|
||||
pub fn to_stdout() -> MetricOutput<String> {
|
||||
|
|
206
src/macros.rs
206
src/macros.rs
|
@ -22,21 +22,21 @@ macro_rules! time {
|
|||
macro_rules! metrics {
|
||||
// TYPED
|
||||
// typed, public, no metrics
|
||||
(<$METRIC_TYPE:ty> pub $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
lazy_static! { pub static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
($(#[$attr:meta])* <$METRIC_TYPE:ty> pub $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
lazy_static! { $(#[$attr])* pub static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
};
|
||||
// typed, public, some metrics
|
||||
(<$METRIC_TYPE:ty> pub $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
lazy_static! { pub static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
($(#[$attr:meta])* <$METRIC_TYPE:ty> pub $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
lazy_static! { $(#[$attr])* pub static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
__metrics_block!($METRIC_ID; $($REMAINING)*);
|
||||
};
|
||||
// typed, module, no metrics
|
||||
(<$METRIC_TYPE:ty> $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
lazy_static! { static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
($(#[$attr:meta])* <$METRIC_TYPE:ty> $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
lazy_static! { $(#[$attr])* static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
};
|
||||
// typed, module, some metrics
|
||||
(<$METRIC_TYPE:ty> $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
lazy_static! { static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
($(#[$attr:meta])* <$METRIC_TYPE:ty> $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
lazy_static! { $(#[$attr])* static ref $METRIC_ID: MetricScope<$METRIC_TYPE> = $e.into(); }
|
||||
__metrics_block!($METRIC_ID; $($REMAINING)*);
|
||||
};
|
||||
// typed, reuse predeclared
|
||||
|
@ -54,19 +54,19 @@ macro_rules! metrics {
|
|||
__metrics_block!(ROOT_METRICS; $($REMAINING)*);
|
||||
};
|
||||
|
||||
(pub $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
metrics! {<Dispatch> pub $METRIC_ID = $e; }
|
||||
($(#[$attr:meta])* pub $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
metrics! {$(#[$attr])* <Dispatch> pub $METRIC_ID = $e; }
|
||||
};
|
||||
(pub $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
metrics! {<Dispatch> pub $METRIC_ID = $e => { $($REMAINING)* } }
|
||||
($(#[$attr:meta])* pub $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
metrics! {$(#[$attr])* <Dispatch> pub $METRIC_ID = $e => { $($REMAINING)* } }
|
||||
};
|
||||
($METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
metrics! {<Dispatch> $METRIC_ID = $e; }
|
||||
($(#[$attr:meta])* $METRIC_ID:ident = $e:expr $(;)*) => {
|
||||
metrics! {$(#[$attr])* <Dispatch> $METRIC_ID = $e; }
|
||||
};
|
||||
($METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
metrics! {<Dispatch> $METRIC_ID = $e => { $($REMAINING)* } }
|
||||
($(#[$attr:meta])* $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
|
||||
metrics! {$(#[$attr])* <Dispatch> $METRIC_ID = $e => { $($REMAINING)* } }
|
||||
};
|
||||
($METRIC_ID:ident => { $($REMAINING:tt)+ }) => {
|
||||
($(#[$attr:meta])* $METRIC_ID:ident => { $($REMAINING:tt)+ }) => {
|
||||
metrics! {<Dispatch> $METRIC_ID => { $($REMAINING)* } }
|
||||
};
|
||||
($e:expr => { $($REMAINING:tt)+ }) => {
|
||||
|
@ -130,136 +130,6 @@ macro_rules! __metrics_block {
|
|||
($METRIC_ID:ident;) => ()
|
||||
}
|
||||
|
||||
/// Define application-scoped metrics.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_metrics {
|
||||
($type_param: ty, $metric_id: ident = ($($SCOPE: expr),+ $(,)*)) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = metric_scope(($($SCOPE),*));
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = [$($SCOPE: expr),+ $(,)*]) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = metric_scope(&[$($SCOPE),*][..],);
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = $SCOPE: expr) => {
|
||||
lazy_static! {
|
||||
pub static ref $metric_id: MetricScope<$type_param> = $SCOPE.into();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped markers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_marker {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Marker = $SCOPE.marker( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped counters.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_counter {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Counter = $SCOPE.counter( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped gauges.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_gauge {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Gauge = $SCOPE.gauge( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Define application-scoped timers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! app_timer {
|
||||
(<$type_param: ty> $SCOPE: expr => { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! {
|
||||
$(pub static ref $metric_id: Timer = $SCOPE.timer( $m_exp );)*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/////////////
|
||||
// MOD SCOPE
|
||||
|
||||
/// Define module-scoped metrics.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_metrics {
|
||||
($type_param: ty, $metric_id: ident = ($($SCOPE: expr),+ $(,)*)) => {
|
||||
lazy_static! {
|
||||
static ref $metric_id: MetricScope<$type_param> = metric_scope(($($SCOPE),*));
|
||||
}
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = [$($SCOPE: expr),+ $(,)*]) => {
|
||||
lazy_static! { static ref $metric_id: MetricScope<$type_param> =
|
||||
metric_scope(&[$($SCOPE),*][..],); }
|
||||
};
|
||||
($type_param: ty, $metric_id: ident = $mod_metrics: expr) => {
|
||||
lazy_static! { static ref $metric_id: MetricScope<$type_param> =
|
||||
$mod_metrics.into(); }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped markers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_marker {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Marker =
|
||||
$mod_metrics.marker( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped counters.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_counter {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Counter =
|
||||
$mod_metrics.counter( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
/// Define module-scoped gauges.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_gauge {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Gauge =
|
||||
$mod_metrics.gauge( $m_exp );)* }
|
||||
};
|
||||
($type_param: ty, $mod_metrics: expr, $metric_id: ident: $m_exp: expr) => {
|
||||
lazy_static! { static ref $metric_id: Gauge =
|
||||
$mod_metrics.gauge( $m_exp ); }
|
||||
}
|
||||
}
|
||||
|
||||
/// Define module-scoped timers.
|
||||
#[macro_export]
|
||||
#[deprecated(since = "0.7.0", note = "Use metrics!() instead")]
|
||||
macro_rules! mod_timer {
|
||||
($type_param: ty, $mod_metrics: expr, { $($metric_id: ident: $m_exp: expr),* $(,)* } ) => {
|
||||
lazy_static! { $(static ref $metric_id: Timer =
|
||||
$mod_metrics.timer( $m_exp );)* }
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
@ -291,45 +161,3 @@ mod test {
|
|||
T2.interval_us(2);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod legacy_test {
|
||||
use self_metrics::*;
|
||||
|
||||
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.with_suffix("test_prefix"));
|
||||
|
||||
app_marker!(<Aggregate> TEST_METRICS => {
|
||||
M1: "failed",
|
||||
M2: "success",
|
||||
});
|
||||
|
||||
app_counter!(<Aggregate> TEST_METRICS => {
|
||||
C1: "failed",
|
||||
C2: "success",
|
||||
});
|
||||
|
||||
app_gauge!(<Aggregate> TEST_METRICS => {
|
||||
G1: "failed",
|
||||
G2: "success",
|
||||
});
|
||||
|
||||
app_timer!(<Aggregate> TEST_METRICS => {
|
||||
T1: "failed",
|
||||
T2: "success",
|
||||
});
|
||||
|
||||
#[test]
|
||||
fn call_old_macro_defined_metrics() {
|
||||
M1.mark();
|
||||
M2.mark();
|
||||
|
||||
C1.count(1);
|
||||
C2.count(2);
|
||||
|
||||
G1.value(1);
|
||||
G2.value(2);
|
||||
|
||||
T1.interval_us(1);
|
||||
T2.interval_us(2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,17 +46,6 @@ impl<M: Send + Sync + 'static + Clone> WithSamplingRate for MetricOutput<M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Perform random sampling of values according to the specified rate.
|
||||
#[deprecated(since = "0.5.0", note = "Use `with_sampling_rate` instead.")]
|
||||
pub fn sample<M, IC>(sampling_rate: Sampling, chain: IC) -> MetricOutput<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
IC: Into<MetricOutput<M>>,
|
||||
{
|
||||
let chain = chain.into();
|
||||
chain.with_sampling_rate(sampling_rate)
|
||||
}
|
||||
|
||||
mod pcg32 {
|
||||
//! PCG32 random number generation for fast sampling
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ impl CancelHandle {
|
|||
|
||||
/// Schedule a task to run periodically.
|
||||
/// Starts a new thread for every task.
|
||||
pub fn schedule<F>(every: Duration, operation: F) -> CancelHandle
|
||||
pub fn set_schedule<F>(every: Duration, operation: F) -> CancelHandle
|
||||
where
|
||||
F: Fn() -> () + Send + 'static,
|
||||
{
|
126
src/scope.rs
126
src/scope.rs
|
@ -7,10 +7,11 @@
|
|||
//!
|
||||
//! If multiple [AppMetrics] are defined, they'll each have their scope.
|
||||
//!
|
||||
use core::*;
|
||||
use core::{Value, Sampling, WriteFn, Namespace, Kind, DefineMetricFn, CommandFn};
|
||||
use core::Kind::*;
|
||||
use cache::*;
|
||||
use schedule::{schedule, CancelHandle};
|
||||
use clock::TimeHandle;
|
||||
use cache::{add_cache, WithCache};
|
||||
use scheduler::{set_schedule, CancelHandle};
|
||||
use output;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
@ -30,28 +31,16 @@ pub trait DefineMetric: Flush {
|
|||
/// Register a new metric.
|
||||
/// Only one metric of a certain name will be defined.
|
||||
/// Observer must return a MetricHandle that uniquely identifies the metric.
|
||||
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, rate: Sampling)
|
||||
-> Box<WriteMetric + Send + Sync>;
|
||||
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, rate: Sampling) -> WriteFn;
|
||||
}
|
||||
|
||||
/// Dynamic counterpart of the `DispatcherMetric`.
|
||||
/// Adapter to a metric of unknown type.
|
||||
pub trait WriteMetric {
|
||||
/// Write metric value to a scope.
|
||||
/// Observers only receive previously registered handles.
|
||||
fn write(&self, value: Value);
|
||||
}
|
||||
|
||||
/// Wrap the metrics backend to provide an application-friendly interface.
|
||||
/// Open a metric scope to share across the application.
|
||||
#[deprecated(since = "0.7.0", note = "Use into() instead")]
|
||||
pub fn app_metrics<M, AM>(scope: AM) -> MetricScope<M>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
AM: Into<MetricScope<M>>,
|
||||
{
|
||||
scope.into()
|
||||
}
|
||||
///// Dynamic counterpart of the `DispatcherMetric`.
|
||||
///// Adapter to a metric of unknown type.
|
||||
//pub trait WriteMetric {
|
||||
// /// Write metric value to a scope.
|
||||
// /// Observers only receive previously registered handles.
|
||||
// fn write(&self, value: Value);
|
||||
//}
|
||||
|
||||
/// Wrap the metrics backend to provide an application-friendly interface.
|
||||
/// Open a metric scope to share across the application.
|
||||
|
@ -162,26 +151,6 @@ impl Timer {
|
|||
}
|
||||
}
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Metrics instead")]
|
||||
pub type AppMetrics<M> = MetricScope<M>;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Marker instead")]
|
||||
pub type AppMarker = Marker;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Counter instead")]
|
||||
pub type AppCounter = Counter;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Gauge instead")]
|
||||
pub type AppGauge = Gauge;
|
||||
|
||||
/// Help transition to new syntax
|
||||
#[deprecated(since = "0.7.0", note = "Use Timer instead")]
|
||||
pub type AppTimer = Timer;
|
||||
|
||||
/// Variations of this should also provide control of the metric recording scope.
|
||||
#[derive(Derivative, Clone)]
|
||||
pub struct MetricScope<M> {
|
||||
|
@ -214,21 +183,29 @@ fn scope_write_fn<M, D>(scope: &D, kind: Kind, name: &str) -> WriteFn
|
|||
}
|
||||
|
||||
/// Define metrics, write values and flush them.
|
||||
pub trait MetricInput<M>: Clone + Flush + Send + Sync + 'static
|
||||
pub trait MetricInput<M>: Clone + Send + Sync + 'static + Flush
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Define an event counter of the provided name.
|
||||
fn marker(&self, name: &str) -> Marker;
|
||||
fn marker(&self, name: &str) -> Marker {
|
||||
Marker { write: scope_write_fn(self, Marker, name) }
|
||||
}
|
||||
|
||||
/// Define a counter of the provided name.
|
||||
fn counter(&self, name: &str) -> Counter;
|
||||
fn counter(&self, name: &str) -> Counter {
|
||||
Counter { write: scope_write_fn(self, Counter, name) }
|
||||
}
|
||||
|
||||
/// Define a timer of the provided name.
|
||||
fn timer(&self, name: &str) -> Timer;
|
||||
fn timer(&self, name: &str) -> Timer {
|
||||
Timer { write: scope_write_fn(self, Timer, name) }
|
||||
}
|
||||
|
||||
/// Define a gauge of the provided name.
|
||||
fn gauge(&self, name: &str) -> Gauge;
|
||||
fn gauge(&self, name: &str) -> Gauge {
|
||||
Gauge { write: scope_write_fn(self, Gauge, name) }
|
||||
}
|
||||
|
||||
/// Define a metric of the specified type.
|
||||
fn define_metric(&self, namespace: &Namespace, kind: Kind, rate: Sampling) -> M;
|
||||
|
@ -251,7 +228,8 @@ pub trait MetricInput<M>: Clone + Flush + Send + Sync + 'static
|
|||
pub trait Flush {
|
||||
/// Flushes any recorded metric value.
|
||||
/// Has no effect on unbuffered metrics.
|
||||
fn flush(&self);
|
||||
/// Default impl does nothing
|
||||
fn flush(&self) {}
|
||||
}
|
||||
|
||||
impl<M> MetricInput<M> for MetricScope<M>
|
||||
|
@ -259,26 +237,6 @@ where
|
|||
M: Clone + Send + Sync + 'static,
|
||||
{
|
||||
|
||||
/// Define an event counter of the provided name.
|
||||
fn marker(&self, name: &str) -> Marker {
|
||||
Marker { write: scope_write_fn(self, Marker, name) }
|
||||
}
|
||||
|
||||
/// Define a counter of the provided name.
|
||||
fn counter(&self, name: &str) -> Counter {
|
||||
Counter { write: scope_write_fn(self, Counter, name) }
|
||||
}
|
||||
|
||||
/// Define a timer of the provided name.
|
||||
fn timer(&self, name: &str) -> Timer {
|
||||
Timer { write: scope_write_fn(self, Timer, name) }
|
||||
}
|
||||
|
||||
/// Define a gauge of the provided name.
|
||||
fn gauge(&self, name: &str) -> Gauge {
|
||||
Gauge { write: scope_write_fn(self, Gauge, name) }
|
||||
}
|
||||
|
||||
fn define_metric(&self, namespace: &Namespace, kind: Kind, rate: Sampling) -> M {
|
||||
(self.define_fn)(namespace, kind, rate)
|
||||
}
|
||||
|
@ -314,7 +272,7 @@ pub trait ScheduleFlush: Flush + Clone + Send + 'static {
|
|||
/// Start a thread dedicated to flushing this scope at regular intervals.
|
||||
fn flush_every(&self, period: Duration) -> CancelHandle {
|
||||
let scope = self.clone();
|
||||
schedule(period, move || scope.flush())
|
||||
set_schedule(period, move || scope.flush())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -322,27 +280,25 @@ impl<M: Clone + Send + 'static> ScheduleFlush for MetricScope<M> {}
|
|||
|
||||
//// Dispatch / Receiver impl
|
||||
|
||||
struct MetricWriter<M> {
|
||||
define_fn: M,
|
||||
command_fn: CommandFn<M>,
|
||||
}
|
||||
//pub struct MetricWriter<M> {
|
||||
// target_metric: M,
|
||||
// command_fn: CommandFn<M>,
|
||||
//}
|
||||
|
||||
impl<M: Send + Sync + Clone + 'static> DefineMetric for MetricScope<M> {
|
||||
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, rate: Sampling)
|
||||
-> Box<WriteMetric + Send + Sync>
|
||||
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, rate: Sampling) -> WriteFn
|
||||
{
|
||||
Box::new(MetricWriter {
|
||||
define_fn: self.define_metric(namespace, kind, rate),
|
||||
command_fn: self.command_fn.clone(),
|
||||
})
|
||||
let target_metric = self.define_metric(namespace, kind, rate);
|
||||
let write_to = self.clone();
|
||||
Arc::new(move |value| write_to.write(&target_metric, value))
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> WriteMetric for MetricWriter<M> {
|
||||
fn write(&self, value: Value) {
|
||||
self.command_fn.write(&self.define_fn, value);
|
||||
}
|
||||
}
|
||||
//impl<M> WriteMetric for MetricWriter<M> {
|
||||
// fn write(&self, value: Value) {
|
||||
// self.command_fn.write(&self.target_metric, value);
|
||||
// }
|
||||
//}
|
||||
|
||||
//// Mutators impl
|
||||
|
||||
|
|
|
@ -7,4 +7,7 @@ pub use core::*;
|
|||
pub use scope::*;
|
||||
pub use aggregate::*;
|
||||
|
||||
metrics!(<Aggregate> pub DIPSTICK_METRICS = "dipstick");
|
||||
metrics!(
|
||||
/// Aggregator of dipstick's own internal metrics.
|
||||
<Aggregate> pub DIPSTICK_METRICS = "dipstick"
|
||||
);
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
//! A sample application continuously aggregating metrics,
|
||||
//! printing the summary stats every three seconds
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
#[cfg(mock_clock)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
use std::collections::BTreeMap;
|
||||
use dipstick::*;
|
||||
|
||||
#[test]
|
||||
fn external_aggregate_all_stats() {
|
||||
let metrics = MetricAggregator::new().with_suffix("test");
|
||||
|
||||
let counter = metrics.counter("counter_a");
|
||||
let timer = metrics.timer("timer_a");
|
||||
let gauge = metrics.gauge("gauge_a");
|
||||
let marker = metrics.marker("marker_a");
|
||||
|
||||
marker.mark();
|
||||
marker.mark();
|
||||
marker.mark();
|
||||
|
||||
counter.count(10);
|
||||
counter.count(20);
|
||||
|
||||
timer.interval_us(10_000_000);
|
||||
timer.interval_us(20_000_000);
|
||||
|
||||
gauge.value(10);
|
||||
gauge.value(20);
|
||||
|
||||
advance_time(Duration::from_secs(3));
|
||||
|
||||
// TODO expose & use flush_to()
|
||||
let stats = StatsMap::new();
|
||||
metrics.flush_to(&stats, &all_stats);
|
||||
let map: BTreeMap<String, Value> = stats.into();
|
||||
|
||||
assert_eq!(map["test.counter_a.count"], 2);
|
||||
assert_eq!(map["test.counter_a.sum"], 30);
|
||||
assert_eq!(map["test.counter_a.mean"], 15);
|
||||
assert_eq!(map["test.counter_a.rate"], 10);
|
||||
|
||||
assert_eq!(map["test.timer_a.count"], 2);
|
||||
assert_eq!(map["test.timer_a.sum"], 30_000_000);
|
||||
assert_eq!(map["test.timer_a.min"], 10_000_000);
|
||||
assert_eq!(map["test.timer_a.max"], 20_000_000);
|
||||
assert_eq!(map["test.timer_a.mean"], 15_000_000);
|
||||
assert_eq!(map["test.timer_a.rate"], 1);
|
||||
|
||||
assert_eq!(map["test.gauge_a.mean"], 15);
|
||||
assert_eq!(map["test.gauge_a.min"], 10);
|
||||
assert_eq!(map["test.gauge_a.max"], 20);
|
||||
|
||||
assert_eq!(map["test.marker_a.count"], 3);
|
||||
assert_eq!(map["test.marker_a.rate"], 1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue