Added namespaces

This commit is contained in:
Francis Lalonde 2018-05-01 09:41:31 -04:00
parent c9208c415e
commit 6d1f93e6a0
31 changed files with 630 additions and 245 deletions

View File

@ -26,9 +26,6 @@ derivative = "1.0"
atomic_refcell = "0.1"
chrono = "^0.3"
#[build-dependencies]
#skeptic = "0.13"
[dev-dependencies]
skeptic = "0.13"

View File

@ -135,7 +135,7 @@ timer.interval_us(123_456);
Related metrics can share a namespace:
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let db_metrics = app_metrics.with_prefix("database");
let db_metrics = app_metrics.with_suffix("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
```

View File

@ -7,9 +7,10 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let metrics = new_aggregate();
let metrics = MetricAggregator::new();
set_aggregate_default_output(to_stdout());
// MetricAggregator::set_default_output(to_stdout());
metrics.set_output(to_stdout());
metrics.flush_every(Duration::from_secs(3));

View File

@ -13,7 +13,7 @@ fn main() {
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_b");
let subsystem_metrics = metrics.with_name("subsystem");
let subsystem_metrics = metrics.with_suffix("subsystem");
let event = subsystem_metrics.marker("event_c");
let gauge = subsystem_metrics.gauge("gauge_d");

View File

@ -19,7 +19,7 @@ fn main() {
app_metrics.counter("just_once").count(4);
// metric names can be prepended with a common prefix
let prefixed_metrics = app_metrics.with_name("subsystem".to_string());
let prefixed_metrics = app_metrics.with_suffix("subsystem");
let event = prefixed_metrics.marker("event_c");
let gauge = prefixed_metrics.gauge("gauge_d");

View File

@ -35,10 +35,10 @@ fn main() {
}
// send application metrics to aggregator
set_aggregate_default_output(to_stdout());
set_aggregate_default_stats(custom_statistics);
MetricAggregator::set_default_output(to_stdout());
MetricAggregator::set_default_stats(custom_statistics);
let app_metrics = new_aggregate();
let app_metrics = MetricAggregator::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -12,7 +12,7 @@ fn main() {
let metrics = metric_scope(
to_graphite("localhost:2003")
.expect("Connecting")
.with_namespace(&["my", "app"][..]),
.with_suffix("my_app"),
);
loop {

View File

@ -18,14 +18,14 @@ metrics!(<Aggregate> pub AGGREGATE = () => {
});
metrics!(<Aggregate> AGGREGATE.with_prefix("module_prefix") => {
metrics!(<Aggregate> AGGREGATE.with_suffix("module_prefix") => {
// create counter "module_prefix.module_counter"
Counter MOD_COUNTER: "module_counter";
});
fn main() {
// print aggregated metrics to the console
set_aggregate_default_output(to_stdout());
MetricAggregator::set_default_output(to_stdout());
// enable autoflush...
AGGREGATE.flush_every(Duration::from_millis(4000));

View File

@ -36,7 +36,7 @@ metrics!(LIB_METRICS => {
});
fn main() {
dispatch().set_target(to_stdout());
metric_dispatch().set_target(to_stdout());
loop {
ROOT_COUNTER.count(123);

View File

@ -22,8 +22,8 @@ app_metrics!(
Vec<String>,
SAME_TYPE = [
// combine multiple outputs of the same type by using an array
to_stdout().with_prefix("yeah"),
to_stdout().with_prefix("ouch"),
to_stdout().with_suffix("yeah"),
to_stdout().with_suffix("ouch"),
to_stdout().with_sampling_rate(0.5),
]
);
@ -31,7 +31,7 @@ app_metrics!(
#[ignore(deprecated)]
app_metrics!(
Vec<String>,
MUTANT_CHILD = SAME_TYPE.with_prefix("super").with_prefix("duper")
MUTANT_CHILD = SAME_TYPE.with_suffix("super").with_suffix("duper")
);
fn main() {

View File

@ -15,12 +15,12 @@ metrics!(<(Statsd, String)> DIFFERENT_TYPES = (
metrics!(<Vec<String>> SAME_TYPE = [
// combine multiple outputs of the same type by using an array
to_stdout().with_prefix("yeah"),
to_stdout().with_prefix("ouch"),
to_stdout().with_suffix("yeah"),
to_stdout().with_suffix("ouch"),
to_stdout().with_sampling_rate(0.5),
][..]);
metrics!(<Vec<String>> MUTANT_CHILD = SAME_TYPE.with_prefix("super").with_prefix("duper"));
metrics!(<Vec<String>> MUTANT_CHILD = SAME_TYPE.with_suffix("super").with_suffix("duper"));
fn main() {
loop {

View File

@ -17,8 +17,8 @@ fn main() {
let same_type_metrics = metric_scope(
&[
// use slices to combine multiple metrics of the same type
to_stdout().with_name("yeah"),
to_stdout().with_name("ouch"),
to_stdout().with_suffix("yeah"),
to_stdout().with_suffix("ouch"),
to_stdout().with_sampling_rate(0.5),
][..],
);

View File

@ -3,7 +3,7 @@
extern crate dipstick;
use dipstick::MetricInput;
use dipstick::{MetricInput, ROOT_NS};
fn main() {
raw_write()
@ -15,6 +15,7 @@ pub fn raw_write() {
// define and send metrics using raw channel API
let counter = metrics_log.define_metric(
&ROOT_NS,
dipstick::Kind::Counter,
"count_a",
dipstick::FULL_SAMPLING_RATE,

View File

@ -7,9 +7,9 @@ use std::time::Duration;
use dipstick::*;
fn main() {
set_aggregate_default_output(to_stdout());
let app_metrics = new_aggregate();
let app_metrics = MetricAggregator::new();
app_metrics.set_output(to_stdout());
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -1,10 +1,9 @@
//! Maintain aggregated metrics for deferred reporting,
//!
use core::{command_fn, Kind, Sampling, Command, Value};
use core::{command_fn, Kind, Sampling, Command, Value, Namespace};
use core::Kind::*;
use output::{OpenScope, NO_METRIC_OUTPUT, MetricOutput};
use scope::{self, MetricScope, MetricInput, Flush, ScheduleFlush, DefineMetric,};
use namespace::WithNamespace;
use scores::{ScoreSnapshot, ScoreType, Scoreboard};
use scores::ScoreType::*;
@ -16,51 +15,34 @@ use std::sync::{Arc, RwLock};
pub type StatsFn = Fn(Kind, &str, ScoreType)
-> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static;
fn initial_stats() -> &'static StatsFn {
&summary
}
fn initial_output() -> Arc<OpenScope + Sync + Send> {
NO_METRIC_OUTPUT.clone()
}
lazy_static! {
static ref DEFAULT_AGGREGATE_STATS: RwLock<Arc<StatsFn>> = RwLock::new(Arc::new(summary));
static ref DEFAULT_AGGREGATE_STATS: RwLock<Arc<StatsFn>> = RwLock::new(Arc::new(initial_stats()));
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OpenScope + Sync + Send>> =
RwLock::new(NO_METRIC_OUTPUT.clone());
}
/// Set the default aggregated metrics statistics generator.
pub fn set_aggregate_default_stats<F>(func: F)
where
F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
{
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func)
}
/// Install a new receiver for all aggregateed metrics, replacing any previous receiver.
pub fn set_aggregate_default_output<IS: Into<MetricOutput<T>>, T: Send + Sync + Clone + 'static>
(new_config: IS)
{
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(new_config.into());
}
fn get_aggregate_default() -> Arc<OpenScope> {
DEFAULT_AGGREGATE_OUTPUT.read().unwrap().clone()
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OpenScope + Sync + Send>> = RwLock::new(initial_output());
}
/// 1024 Metrics per scoreboard should be enough?
const DEFAULT_CAPACITY: usize = 1024;
/// Get the default aggregate point.
pub fn new_aggregate() -> MetricAggregate {
MetricAggregate::with_capacity(DEFAULT_CAPACITY)
}
impl From<MetricAggregate> for MetricScope<Aggregate> {
fn from(agg: MetricAggregate) -> MetricScope<Aggregate> {
impl From<MetricAggregator> for MetricScope<Aggregate> {
fn from(agg: MetricAggregator) -> MetricScope<Aggregate> {
agg.into_scope()
}
}
impl From<&'static str> for MetricScope<Aggregate> {
fn from(prefix: &'static str) -> MetricScope<Aggregate> {
let scope: MetricScope<Aggregate> = new_aggregate().into();
let scope: MetricScope<Aggregate> = MetricAggregator::new().into();
if !prefix.is_empty() {
scope.with_prefix(prefix)
scope.with_suffix(prefix)
} else {
scope
}
@ -69,30 +51,104 @@ impl From<&'static str> for MetricScope<Aggregate> {
impl From<()> for MetricScope<Aggregate> {
fn from(_: ()) -> MetricScope<Aggregate> {
new_aggregate().into_scope()
MetricAggregator::new().into_scope()
}
}
/// Central aggregation structure.
/// Maintains a list of metrics for enumeration when used as source.
#[derive(Debug, Clone)]
pub struct MetricAggregate {
metrics: Arc<RwLock<HashMap<String, Arc<Scoreboard>>>>,
pub struct MetricAggregator {
namespace: Namespace,
inner: Arc<RwLock<InnerAggregator>>,
}
impl MetricAggregate {
#[derive(Derivative)]
#[derivative(Debug)]
struct InnerAggregator {
metrics: HashMap<String, Arc<Scoreboard>>,
#[derivative(Debug = "ignore")]
stats: Option<Arc<Fn(Kind, &str, ScoreType)
-> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static>>,
output: Option<Arc<OpenScope + Sync + Send>>,
}
impl MetricAggregator {
/// Build a new metric aggregation
pub fn new() -> MetricAggregator {
MetricAggregator::with_capacity(DEFAULT_CAPACITY)
}
/// Build a new metric aggregation point with initial capacity of metrics to aggregate.
pub fn with_capacity(size: usize) -> MetricAggregate {
MetricAggregate {
metrics: Arc::new(RwLock::new(HashMap::with_capacity(size))),
pub fn with_capacity(size: usize) -> MetricAggregator {
MetricAggregator {
namespace: "".into(),
inner: Arc::new(RwLock::new(InnerAggregator {
metrics: HashMap::with_capacity(size),
stats: None,
output: None,
}))
}
}
/// Set the default aggregated metrics statistics generator.
pub fn set_default_stats<F>(func: F)
where
F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static
{
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func)
}
/// Remove any global customization of the default aggregation statistics.
pub fn unset_default_stats() {
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(initial_stats())
}
/// Install a new receiver for all aggregateed metrics, replacing any previous receiver.
pub fn set_default_output<IS, T>(new_config: IS)
where IS: Into<MetricOutput<T>>,
T: Send + Sync + Clone + 'static
{
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(new_config.into());
}
/// Install a new receiver for all aggregateed metrics, replacing any previous receiver.
pub fn unset_default_output() {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_output()
}
/// Set the default aggregated metrics statistics generator.
pub fn set_stats<F>(&self, func: F)
where
F: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static
{
self.inner.write().expect("Lock Aggregator").stats = Some(Arc::new(func))
}
/// Set the default aggregated metrics statistics generator.
pub fn unset_stats<F>(&self) {
self.inner.write().expect("Lock Aggregator").stats = None
}
/// Install a new receiver for all aggregated metrics, replacing any previous receiver.
pub fn set_output<IS, T>(&self, new_config: IS)
where IS: Into<MetricOutput<T>>,
T: Send + Sync + Clone + 'static
{
self.inner.write().expect("Lock Aggregator").output = Some(Arc::new(new_config.into()))
}
/// Install a new receiver for all aggregated metrics, replacing any previous receiver.
pub fn unset_output(&self) {
self.inner.write().expect("Lock Aggregator").output = None
}
fn into_scope(&self) -> MetricScope<Aggregate> {
let agg_0 = self.clone();
let agg_1 = self.clone();
MetricScope::new(
Arc::new(move |kind, name, rate| agg_0.define_metric(kind, name, rate)),
self.namespace.clone(),
Arc::new(move |ns, kind, name, rate| agg_0.define_metric(ns, kind, name, rate)),
command_fn(move |cmd| match cmd {
Command::Write(metric, value) => {
let metric: &Aggregate = metric;
@ -105,26 +161,28 @@ impl MetricAggregate {
/// Discard scores for ad-hoc metrics.
pub fn cleanup(&self) {
let orphans: Vec<String> = self.metrics.read().unwrap().iter()
let orphans: Vec<String> = self.inner.read().expect("Lock Aggregator").metrics.iter()
// is aggregator now the sole owner?
// TODO use weak ref + impl Drop to mark abandoned metrics (see dispatch)
.filter(|&(_k, v)| Arc::strong_count(v) == 1)
.map(|(k, _v)| k.to_string())
.collect();
if !orphans.is_empty() {
let mut remover = self.metrics.write().unwrap();
let remover = &mut self.inner.write().unwrap().metrics;
orphans.iter().for_each(|k| {
remover.remove(k);
});
}
}
///
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&self, publish_scope: &DefineMetric, stats_fn: Arc<StatsFn>) {
let snapshot: Vec<ScoreSnapshot> = {
let metrics = self.metrics.read().expect("Aggregate Lock");
metrics.values().flat_map(|score| score.reset()).collect()
};
let snapshot: Vec<ScoreSnapshot> = self.inner.read().expect("Lock Aggregator")
.metrics.values()
.flat_map(|score| score.reset())
.collect();
if snapshot.is_empty() {
// no data was collected for this period
@ -135,7 +193,7 @@ impl MetricAggregate {
for score in metric.2 {
if let Some(ex) = (stats_fn)(metric.0, metric.1.as_ref(), score) {
publish_scope
.define_metric_object(ex.0, &ex.1.concat(), 1.0)
.define_metric_object(&self.namespace, ex.0, &ex.1.concat(), 1.0)
.write(ex.2);
}
}
@ -143,9 +201,10 @@ impl MetricAggregate {
}
}
}
impl MetricInput<Aggregate> for MetricAggregate {
impl MetricInput<Aggregate> for MetricAggregator {
/// Define an event counter of the provided name.
fn marker(&self, name: &str) -> scope::Marker {
self.into_scope().marker(name)
@ -167,12 +226,14 @@ impl MetricInput<Aggregate> for MetricAggregate {
}
/// Lookup or create a scoreboard for the requested metric.
fn define_metric(&self, kind: Kind, name: &str, _rate: Sampling) -> Aggregate {
self.metrics
fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, _rate: Sampling) -> Aggregate {
let ns = self.namespace.extend(source_ns);
self.inner
.write()
.expect("Locking aggregator")
.metrics
.entry(name.to_string())
.or_insert_with(|| Arc::new(Scoreboard::new(kind, name.to_string())))
.or_insert_with(|| Arc::new(Scoreboard::new(ns, kind, name.to_string())))
.clone()
}
@ -180,28 +241,53 @@ impl MetricInput<Aggregate> for MetricAggregate {
fn write(&self, metric: &Aggregate, value: Value) {
metric.update(value)
}
fn with_suffix(&self, name: &str) -> Self {
MetricAggregator {
namespace: self.namespace.with_suffix(name),
inner: self.inner.clone(),
}
}
}
impl Flush for MetricAggregate {
//impl<'a> Index<&'a str> for MetricAggregator {
// type Output = Self;
//
// fn index(&self, index: &'a str) -> &Self::Output {
// &self.push(index)
// }
//}
impl Flush for MetricAggregator {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) {
let default_publish_fn = DEFAULT_AGGREGATE_STATS.read().unwrap().clone();
let publish_scope = get_aggregate_default().open_scope_object();
let inner = self.inner.read().expect("Lock Aggregator");
self.flush_to(publish_scope.as_ref(), default_publish_fn);
let stats_fn = match &inner.stats {
&Some(ref stats_fn) => stats_fn.clone(),
&None => DEFAULT_AGGREGATE_STATS.read().unwrap().clone(),
};
let pub_scope = match &inner.output {
&Some(ref out) => out.open_scope_object(),
&None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().open_scope_object(),
};
self.flush_to(pub_scope.as_ref(), stats_fn);
// TODO parameterize whether to keep ad-hoc metrics after publish
// source.cleanup();
publish_scope.flush()
pub_scope.flush()
}
}
impl ScheduleFlush for MetricAggregate {}
impl ScheduleFlush for MetricAggregator {}
impl From<MetricAggregate> for Arc<DefineMetric + Send + Sync + 'static> {
fn from(metrics: MetricAggregate) -> Arc<DefineMetric + Send + Sync + 'static> {
metrics.into()
impl From<MetricAggregator> for Arc<DefineMetric + Send + Sync + 'static> {
fn from(metrics: MetricAggregator) -> Arc<DefineMetric + Send + Sync + 'static> {
Arc::new(metrics.into_scope())
}
}
@ -265,33 +351,36 @@ pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s
mod bench {
use test;
use core::ROOT_NS;
use core::Kind::{Counter, Marker};
use aggregate::new_aggregate;
use aggregate::MetricAggregator;
use scope::MetricInput;
#[bench]
fn aggregate_marker(b: &mut test::Bencher) {
let sink = new_aggregate();
let metric = sink.define_metric(Marker, "event_a", 1.0);
let sink = MetricAggregator::new();
let metric = sink.define_metric(&ROOT_NS, Marker, "event_a", 1.0);
b.iter(|| test::black_box(sink.write(&metric, 1)));
}
#[bench]
fn aggregate_counter(b: &mut test::Bencher) {
let sink = new_aggregate();
let metric = sink.define_metric(Counter, "count_a", 1.0);
let sink = MetricAggregator::new();
let metric = sink.define_metric(&ROOT_NS, Counter, "count_a", 1.0);
b.iter(|| test::black_box(sink.write(&metric, 1)));
}
#[bench]
fn reset_marker(b: &mut test::Bencher) {
let metric = new_aggregate().define_metric(Marker, "marker", 1.0);
let sink = MetricAggregator::new();
let metric = sink.define_metric(&ROOT_NS, Marker, "marker", 1.0);
b.iter(|| test::black_box(metric.reset()));
}
#[bench]
fn reset_counter(b: &mut test::Bencher) {
let metric = new_aggregate().define_metric(Counter, "count_a", 1.0);
let sink = MetricAggregator::new();
let metric = sink.define_metric(&ROOT_NS, Counter, "count_a", 1.0);
b.iter(|| test::black_box(metric.reset()));
}

View File

@ -11,7 +11,7 @@ use std::sync::mpsc;
use std::thread;
metrics!{
<Aggregate> DIPSTICK_METRICS.with_prefix("async_queue") => {
<Aggregate> DIPSTICK_METRICS.with_suffix("async_queue") => {
/// Maybe queue was full?
Marker SEND_FAILED: "send_failed";
}

View File

@ -23,7 +23,7 @@ where
{
let cache: RwLock<lru::LRUCache<String, M>> =
RwLock::new(lru::LRUCache::with_capacity(cache_size));
Arc::new(move |kind, name, rate| {
Arc::new(move |ns, kind, name, rate| {
let mut cache = cache.write().expect("Locking metric cache");
let name_str = String::from(name);
@ -32,7 +32,7 @@ where
return value.clone();
}
let new_value = (next)(kind, name, rate).clone();
let new_value = (next)(ns, kind, name, rate).clone();
cache.insert(name_str, new_value.clone());
new_value
})

View File

@ -8,6 +8,8 @@ use std::sync::Arc;
use chrono::{Local, DateTime};
use time;
// TODO define an 'AsValue' trait + impl for supported number types, then drop 'num' crate
pub use num::ToPrimitive;
@ -20,19 +22,31 @@ pub type Value = u64;
/// Wrapped so it may be changed safely later.
pub struct TimeHandle(i64);
fn now_micros() -> i64 {
/// takes 250ns but works every time
pub fn accurate_clock_micros() -> i64 {
let local: DateTime<Local> = Local::now();
let mut micros = local.timestamp() * 1_000_000;
micros += local.timestamp_subsec_micros() as i64;
micros
}
/// takes 25ns but fails to advance time on occasion
pub fn fast_clock_micros() -> i64 {
(time::precise_time_ns() / 1000) as i64
}
// another quick way
//fn now_micros() -> i64 {
// let t = time::get_time();
// (t.sec * 1_000_000) + (t.nsec as i64 / 1000)
//}
impl TimeHandle {
/// Get a handle on current time.
/// Used by the TimerMetric start_time() method.
pub fn now() -> TimeHandle {
TimeHandle(now_micros())
TimeHandle(fast_clock_micros())
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
@ -46,17 +60,17 @@ impl TimeHandle {
}
}
impl From<usize> for TimeHandle {
fn from(s: usize) -> TimeHandle {
TimeHandle(s as i64)
}
}
impl From<TimeHandle> for usize {
fn from(s: TimeHandle) -> usize {
s.0 as usize
}
}
//impl From<usize> for TimeHandle {
// fn from(s: usize) -> TimeHandle {
// TimeHandle(s as i64)
// }
//}
//
//impl From<TimeHandle> for usize {
// fn from(s: TimeHandle) -> usize {
// s.0 as usize
// }
//}
/// Base type for sampling rate.
/// - 1.0 records everything
@ -81,12 +95,98 @@ pub enum Kind {
Timer,
}
/// A namespace for metrics.
/// Does _not_ include the metric's "short" name itself.
/// Can be empty.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct Namespace {
inner: Vec<String>
}
lazy_static! {
/// Root namespace contains no string parts.
pub static ref ROOT_NS: Namespace = Namespace { inner: vec![] };
}
//impl<'a> Index<&'a str> for Namespace {
// type Output = Self;
//
// /// Returns a copy of this namespace with the "index" appended to it.
// /// Returned reference should be dereferenceable:
// ///
// /// ```
// /// let sub_ns = *ROOT_NS["sub_ns"];
// /// ```
// ///
// fn index(&self, index: &'a str) -> &Self::Output {
// let mut clone = self.inner.clone();
// if !index.is_empty() {
// clone.push(index.into());
// }
// &Namespace{ inner: clone }
// }
//}
impl Namespace {
/// Append name to the namespace, returning a modified copy.
pub fn with_suffix(&self, name: &str) -> Self {
let mut new = self.inner.clone();
new.push(name.into());
Namespace { inner: new }
}
/// Returns a copy of this namespace with the second namespace appended.
/// Both original namespaces stay untouched.
pub fn extend(&self, names: &Namespace) -> Self {
Namespace {
inner: {
let mut new = self.inner.clone();
new.extend_from_slice(&names.inner);
new
}
}
}
/// Combine name parts into a string.
pub fn join(&self, name: &str, separator: &str) -> String {
if self.inner.is_empty() {
return name.into()
}
let mut buf = String::with_capacity(64);
for n in &self.inner {
buf.push_str(n.as_ref());
buf.push_str(separator);
}
buf.push_str(name);
buf
}
}
impl From<()> for Namespace {
fn from(_name: ()) -> Namespace {
ROOT_NS.clone()
}
}
impl<'a> From<&'a str> for Namespace {
fn from(name: &'a str) -> Namespace {
ROOT_NS.with_suffix(name.as_ref())
}
}
impl From<String> for Namespace {
fn from(name: String) -> Namespace {
ROOT_NS.with_suffix(name.as_ref())
}
}
/// Dynamic metric definition function.
/// Metrics can be defined from any thread, concurrently (Fn is Sync).
/// The resulting metrics themselves can be also be safely shared across threads (<M> is Send + Sync).
/// Concurrent usage of a metric is done using threaded scopes.
/// Shared concurrent scopes may be provided by some backends (aggregate).
pub type DefineMetricFn<M> = Arc<Fn(Kind, &str, Sampling) -> M + Send + Sync>;
pub type DefineMetricFn<M> = Arc<Fn(&Namespace, Kind, &str, Sampling) -> M + Send + Sync>;
/// A function trait that opens a new metric capture scope.
pub type OpenScopeFn<M> = Arc<Fn() -> CommandFn<M> + Send + Sync>;
@ -136,3 +236,79 @@ impl<M> CommandFn<M> {
(self.inner)(Flush)
}
}
//#[cfg(test)]
//mod test {
// use core::*;
// use test;
// use std::f64;
//
// const ITER: i64 = 5_000;
// const LOOP: i64 = 50000;
//
// // a retarded, dirty and generally incorrect tentative at jitter measurement
// fn jitter(clock: fn() -> i64) {
// let mut first = 0;
// let mut last = 0;
// let mut min = 999_000_000;
// let mut max = -8888888;
// let mut delta_sum = 0;
// let mut dev2_sum = 0;
//
// for i in 1..ITER {
// let ts = clock();
// test::black_box(for _j in 0..LOOP {});
// last = clock();
// let delta = last - ts;
//
// delta_sum += delta;
// let mean = delta_sum / i;
//
// let dev2 = (delta - mean) ^ 2;
// dev2_sum += dev2;
//
// if delta > max {
// max = delta
// }
// if delta < min {
// min = delta
// }
// }
//
// println!("runt {}", last - first);
// println!("mean {}", delta_sum / ITER);
// println!("dev2 {}", (dev2_sum as f64).sqrt() / ITER as f64);
// println!("min {}", min);
// println!("max {}", max);
// }
//
//
// #[test]
// fn jitter_local_now() {
// jitter(|| super::slow_clock_micros())
// }
//
// #[test]
// fn jitter_precise_time_ns() {
// jitter(|| super::imprecise_clock_micros())
// }
//
//}
#[cfg(feature = "bench")]
mod bench {
use super::*;
use test;
#[bench]
fn get_slow_time(b: &mut test::Bencher) {
b.iter(|| test::black_box(accurate_clock_micros()));
}
#[bench]
fn get_imprecise_time(b: &mut test::Bencher) {
b.iter(|| test::black_box(fast_clock_micros()));
}
}

View File

@ -1,28 +1,21 @@
//! Decouple metric definition from configuration with trait objects.
use core::*;
use namespace::*;
use scope::{self, DefineMetric, MetricScope, WriteMetric, MetricInput, Flush, ScheduleFlush, NO_METRIC_SCOPE};
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Weak};
//use std::ops::Index;
use atomic_refcell::*;
lazy_static! {
static ref ROOT_DISPATCH: Arc<RwLock<InnerDispatch>> = Arc::new(RwLock::new(
InnerDispatch {
target: None,
parent: None,
metrics: HashMap::new(),
children: HashMap::new(),
}
));
static ref ROOT_DISPATCH: Arc<RwLock<InnerDispatch>> =
Arc::new(RwLock::new(InnerDispatch::with_parent(None, ROOT_NS.clone())));
}
/// Get the default dispatch point.
pub fn dispatch() -> MetricDispatch {
/// Get the root dispatch point.
pub fn metric_dispatch() -> MetricDispatch {
MetricDispatch { inner: ROOT_DISPATCH.clone() }
}
@ -60,6 +53,7 @@ pub struct MetricDispatch {
}
struct InnerDispatch {
namespace: Namespace,
target: Option<Arc<DefineMetric + Send + Sync>>,
metrics: HashMap<String, Weak<DispatchMetric>>,
parent: Option<Arc<RwLock<InnerDispatch>>>,
@ -69,14 +63,14 @@ struct InnerDispatch {
/// Allow turning a 'static str into a Delegate, where str is the prefix.
impl From<&'static str> for MetricScope<Dispatch> {
fn from(name: &'static str) -> MetricScope<Dispatch> {
dispatch().into_scope().with_prefix(name)
metric_dispatch().into_scope().with_suffix(name)
}
}
/// Allow turning a 'static str into a Delegate, where str is the prefix.
impl From<()> for MetricScope<Dispatch> {
fn from(_: ()) -> MetricScope<Dispatch> {
dispatch().into()
metric_dispatch().into()
}
}
@ -87,11 +81,22 @@ impl From<MetricDispatch> for MetricScope<Dispatch> {
}
impl InnerDispatch {
fn switch_scope(&mut self, target_scope: Arc<DefineMetric + Send + Sync + 'static>) {
fn with_parent(parent: Option<Arc<RwLock<InnerDispatch>>>, namespace: Namespace,) -> Self {
InnerDispatch {
namespace,
target: None,
parent,
metrics: HashMap::new(),
children: HashMap::new(),
}
}
fn set_new_scope(&mut self, target_scope: Arc<DefineMetric + Send + Sync + 'static>) {
for mut metric in self.metrics.values() {
if let Some(metric) = metric.upgrade() {
let target_metric = target_scope
.define_metric_object(metric.kind, metric.name.as_ref(), metric.rate);
.define_metric_object(&self.namespace, metric.kind, metric.name.as_ref(), metric.rate);
*metric.write_metric.borrow_mut() = target_metric;
}
}
@ -107,14 +112,14 @@ impl InnerDispatch {
fn set_target(&mut self, target: Option<Arc<DefineMetric + Send + Sync + 'static>>) {
let new_scope = target.clone().unwrap_or_else(|| self.get_parent_target().unwrap_or(NO_METRIC_SCOPE.clone()));
self.switch_scope(new_scope);
self.set_new_scope(new_scope);
self.target = target
}
fn parent_set_target(&mut self, target: Arc<DefineMetric + Send + Sync + 'static>) {
if self.target.is_none() {
// overriding target from this point downward
self.switch_scope(target)
self.set_new_scope(target)
}
}
@ -122,13 +127,25 @@ impl InnerDispatch {
}
impl MetricDispatch {
/// Create a new "private" metric dispatch root. This is usually not what you want.
/// Since this dispatch will not be part of the standard dispatch tree,
/// it will need to be configured independently and since downstream code may not know about
/// its existence this may never happen and metrics will not be dispatched anywhere.
/// If you want to use the standard dispatch tree, use #metric_dispatch() instead.
pub fn new() -> Self {
MetricDispatch {
inner: Arc::new(RwLock::new(InnerDispatch::with_parent(None, ROOT_NS.clone())))
}
}
/// Replace target for this dispatch and it's children.
pub fn set_target<IS: Into<Arc<DefineMetric + Send + Sync + 'static>>>(&self, target: IS) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.set_target(Some(target.into()));
}
/// Remove target.
/// Replace target for this dispatch and it's children.
pub fn unset_target(&self) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.set_target(None);
@ -138,8 +155,9 @@ impl MetricDispatch {
let disp_0 = self.clone();
let disp_1 = self.clone();
MetricScope::new(
self.inner.read().expect("Dispatch Lock").namespace.clone(),
// define metric
Arc::new(move |kind, name, rate| disp_0.define_metric(kind, name, rate)),
Arc::new(move |ns, kind, name, rate| disp_0.define_metric(ns, kind, name, rate)),
// write / flush metric
command_fn(move |cmd| match cmd {
Command::Write(metric, value) => {
@ -182,8 +200,8 @@ impl MetricInput<Dispatch> for MetricDispatch {
self.into_scope().gauge(name)
}
/// Lookup or create a scoreboard for the requested metric.
fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> Dispatch {
/// Lookup or create a dispatch stub for the requested metric.
fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Dispatch {
let mut inner = self.inner.write().expect("Dispatch Lock");
let target_scope = inner.target.clone().unwrap_or(NO_METRIC_SCOPE.clone());
inner
@ -191,7 +209,7 @@ impl MetricInput<Dispatch> for MetricDispatch {
.get(name)
.and_then(|metric_ref| Weak::upgrade(metric_ref))
.unwrap_or_else(|| {
let metric_object = target_scope.define_metric_object(kind, name, rate);
let metric_object = target_scope.define_metric_object(source_ns, kind, name, rate);
let define_metric = Arc::new(DispatchMetric {
kind,
name: name.to_string(),
@ -210,8 +228,31 @@ impl MetricInput<Dispatch> for MetricDispatch {
fn write(&self, metric: &Dispatch, value: Value) {
metric.write_metric.borrow().write(value);
}
fn with_suffix(&self, name: &str) -> Self {
if name.is_empty() {
return self.clone()
}
let mut inner = self.inner.write().expect("Dispatch Lock");
// FIXME namespace should be built only if required
let namespace = inner.namespace.with_suffix(name);
let child = inner.children.entry(name.to_string())
.or_insert_with(|| Arc::new(RwLock::new(InnerDispatch::with_parent(
Some(self.inner.clone()),
namespace
)))).clone();
MetricDispatch {inner: child}
}
}
//impl<'a> Index<&'a str> for MetricDispatch {
// type Output = Self;
//
// fn index(&self, index: &'a str) -> &Self::Output {
// &self.push(index)
// }
//}
impl Flush for MetricDispatch {
fn flush(&self) {
if let Some(ref target) = self.inner.write().expect("Dispatch Lock").target {
@ -225,22 +266,23 @@ impl ScheduleFlush for MetricDispatch {}
#[cfg(feature = "bench")]
mod bench {
use dispatch::dispatch;
use dispatch::metric_dispatch;
use test;
use aggregate::new_aggregate;
use aggregate::MetricAggregator;
use scope::MetricInput;
#[bench]
fn dispatch_marker_to_aggregate(b: &mut test::Bencher) {
dispatch().set_target(new_aggregate());
let metric = dispatch().marker("event_a");
println!("wewrwerwe");
metric_dispatch().set_target(MetricAggregator::new());
println!("sdfsdfsd");
let metric = metric_dispatch().marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}
#[bench]
fn dispatch_marker_to_void(b: &mut test::Bencher) {
let metric = dispatch().marker("event_a");
let metric = metric_dispatch().marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}

View File

@ -15,7 +15,7 @@ use std::fmt::Debug;
use socket::RetrySocket;
metrics!{
<Aggregate> DIPSTICK_METRICS.with_prefix("graphite") => {
<Aggregate> DIPSTICK_METRICS.with_suffix("graphite") => {
Marker SEND_ERR: "send_failed";
Marker TRESHOLD_EXCEEDED: "bufsize_exceeded";
Counter SENT_BYTES: "sent_bytes";
@ -46,7 +46,7 @@ where
let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?));
Ok(metric_output(
move |kind, name, rate| graphite_metric(kind, name, rate),
move |ns, kind, name, rate| graphite_metric(ns, kind, name, rate),
move || graphite_scope(&socket, false),
))
}
@ -60,14 +60,13 @@ where
let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?));
Ok(metric_output(
move |kind, name, rate| graphite_metric(kind, name, rate),
move |ns, kind, name, rate| graphite_metric(ns, kind, name, rate),
move || graphite_scope(&socket, true),
))
}
fn graphite_metric(kind: Kind, name: &str, rate: Sampling) -> Graphite {
let mut prefix = String::with_capacity(32);
prefix.push_str(name);
fn graphite_metric(namespace: &Namespace, kind: Kind, name: &str, rate: Sampling) -> Graphite {
let mut prefix = namespace.join(name, ".");
prefix.push(' ');
let mut scale = match kind {
@ -196,7 +195,7 @@ mod bench {
#[bench]
pub fn timer_graphite(b: &mut test::Bencher) {
let sd = to_graphite("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(Kind::Timer, "timer", 1000000.0);
let timer = sd.define_metric(&ROOT_NS, Kind::Timer, "timer", 1000000.0);
b.iter(|| test::black_box(sd.write(&timer, 2000)));
}

View File

@ -20,24 +20,24 @@ extern crate time;
extern crate chrono;
pub mod error;
pub use error::*;
pub use error::{Error, Result};
#[macro_use]
pub mod macros;
pub mod core;
pub use core::*;
pub use core::{Value, Sampling, FULL_SAMPLING_RATE, TimeHandle, Kind, ROOT_NS};
pub mod output;
pub use output::*;
pub use output::{MetricOutput, NO_METRIC_OUTPUT, OpenScope};
#[macro_use]
pub mod dispatch;
pub use dispatch::*;
pub use dispatch::{MetricDispatch, Dispatch, metric_dispatch};
#[macro_use]
mod aggregate;
pub use aggregate::*;
pub use aggregate::{MetricAggregator, Aggregate};
mod local;
pub use local::*;
@ -54,9 +54,6 @@ pub use scores::*;
mod statsd;
pub use statsd::*;
mod namespace;
pub use namespace::*;
mod graphite;
pub use graphite::*;

View File

@ -7,7 +7,7 @@ use std::sync::RwLock;
/// Write metric values to stdout using `println!`.
pub fn to_stdout() -> MetricOutput<String> {
metric_output(
|_kind, name, _rate| String::from(name),
|ns, _kind, name, _rate| ns.join(name, "."),
|| {
command_fn(|cmd| {
if let Command::Write(m, v) = cmd {
@ -24,7 +24,7 @@ pub fn to_stdout() -> MetricOutput<String> {
/// If thread latency is a concern you may wish to also use #with_async_queue.
pub fn to_buffered_stdout() -> MetricOutput<String> {
metric_output(
|_kind, name, _rate| String::from(name),
|ns, _kind, name, _rate| ns.join(name, "."),
|| {
let buf = RwLock::new(String::new());
command_fn(move |cmd| {
@ -47,7 +47,7 @@ pub fn to_buffered_stdout() -> MetricOutput<String> {
// TODO parameterize log level
pub fn to_log() -> MetricOutput<String> {
metric_output(
|_kind, name, _rate| String::from(name),
|ns, _kind, name, _rate| ns.join(name, "."),
|| {
command_fn(|cmd| {
if let Command::Write(m, v) = cmd {
@ -65,7 +65,7 @@ pub fn to_log() -> MetricOutput<String> {
// TODO parameterize log level
pub fn to_buffered_log() -> MetricOutput<String> {
metric_output(
|_kind, name, _rate| String::from(name),
|ns, _kind, name, _rate| ns.join(name, "."),
|| {
let buf = RwLock::new(String::new());
command_fn(move |cmd| {
@ -86,7 +86,7 @@ pub fn to_buffered_log() -> MetricOutput<String> {
/// Discard all metric values sent to it.
pub fn to_void() -> MetricOutput<()> {
metric_output(move |_kind, _name, _rate| (), || command_fn(|_cmd| {}))
metric_output(move |_ns, _kind, _name, _rate| (), || command_fn(|_cmd| {}))
}
#[cfg(test)]
@ -97,21 +97,21 @@ mod test {
#[test]
fn sink_print() {
let c = super::to_stdout().open_scope();
let m = c.define_metric(Kind::Marker, "test", 1.0);
let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0);
c.write(&m, 33);
}
#[test]
fn test_to_log() {
let c = super::to_log().open_scope();
let m = c.define_metric(Kind::Marker, "test", 1.0);
let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0);
c.write(&m, 33);
}
#[test]
fn test_to_void() {
let c = super::to_void().open_scope();
let m = c.define_metric(Kind::Marker, "test", 1.0);
let m = c.define_metric(&ROOT_NS, Kind::Marker, "test", 1.0);
c.write(&m, 33);
}

View File

@ -261,7 +261,7 @@ macro_rules! mod_timer {
mod test_app {
use self_metrics::*;
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"););
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.with_suffix("test_prefix"););
app_marker!(<Aggregate> TEST_METRICS => {
M1: "failed",

View File

@ -22,10 +22,11 @@ where
let scope1a = scope1.clone();
MetricScope::new(
Arc::new(move |kind, name, rate| {
ROOT_NS.clone(),
Arc::new(move |ns, kind, name, rate| {
(
scope0.define_metric(kind, name, rate),
scope1.define_metric(kind, name, rate),
scope0.define_metric(ns, kind, name, rate),
scope1.define_metric(ns, kind, name, rate),
)
}),
command_fn(move |cmd| match cmd {
@ -52,10 +53,11 @@ where
let scopes2 = scopes.clone();
MetricScope::new(
Arc::new(move |kind, name, rate| {
ROOT_NS.clone(),
Arc::new(move |ns, kind, name, rate| {
scopes
.iter()
.map(|m| m.define_metric(kind, name, rate))
.map(|m| m.define_metric(ns, kind, name, rate))
.collect()
}),
command_fn(move |cmd| match cmd {

View File

@ -2,44 +2,75 @@
use core::*;
use std::sync::Arc;
use std::collections::HashMap;
const DEFAULT_SEPARATOR: &'static str = ".";
/// A list of parts of a metric's name.
/// A namespace for metrics.
/// Does _not_ include the metric's "short" name itself.
/// Can be empty.
#[derive(Debug, Clone)]
pub struct Namespace(Vec<String>);
pub struct Namespace {
inner: Vec<String>
}
impl Namespace {
/// Make this namespace a subspace of the parent.
pub fn subspace_of(self, parent: &Namespace) -> Self {
Namespace([parent.0.clone(), self.0].concat())
pub fn split_first(&self) -> Option<(&String, &[String])> {
self.inner.split_first()
}
pub fn with_suffix(&self, names: &Namespace) -> Self {
Namespace { inner: self.inner.clone().extend(names) }
}
/// Combine name parts into a string.
pub fn join(&self, separator: &str) -> String {
self.0.join(separator)
self.inner.join(separator)
}
}
impl<'a> From<&'a str> for Namespace {
fn from(name: &'a str) -> Namespace {
Namespace(vec![name.to_string()])
Namespace { inner: vec![name.to_string()] }
}
}
impl From<String> for Namespace {
fn from(name: String) -> Namespace {
Namespace(vec![name])
Namespace { inner: vec![name] }
}
}
impl<'a, 'b: 'a> From<&'b [&'a str]> for Namespace {
fn from(names: &'a [&'a str]) -> Namespace {
Namespace(names.iter().map(|n| n.to_string()).collect())
}
pub trait Registry {
fn with_prefix(&self, prefix: &str) -> Self;
// fn parent(&self) -> Option<&Registry>;
//
// fn namespace(&self) -> &Namespace;
//
// fn children(&mut self) -> &mut HashMap<String, T>;
//
// fn create_children(parent: R, name: String) -> Self;
// fn with_names(&mut self, namespace: Namespace) -> Self {
//
// let namespace = &names.into();
// let (first, rest) = namespace.split_first();
// // recursively find or create children for every namespace component
// first.map(|f| {
// f.with_pre
// Self::make_new(self.children().entry(*first)
// .or_insert_with(|| InnerDispatch::with_parent(Some(self.inner.clone())))
// .clone()
// ).with_name(rest)
// }).unwrap_or_else(self.clone())
//
// }
}
/// Prepend metric names with custom prefix.
///// Prepend metric names with custom prefix.
pub trait WithNamespace
where
Self: Sized,

View File

@ -3,8 +3,8 @@
use core::*;
use scope::MetricScope;
use namespace::{add_namespace, Namespace, WithNamespace};
use std::sync::Arc;
use std::fmt::Debug;
use scope::DefineMetric;
use local;
@ -15,7 +15,7 @@ lazy_static! {
}
/// Wrap a MetricConfig in a non-generic trait.
pub trait OpenScope {
pub trait OpenScope: Debug {
/// Open a new metrics scope
fn open_scope_object(&self) -> Arc<DefineMetric + Send + Sync + 'static>;
}
@ -25,6 +25,8 @@ pub trait OpenScope {
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct MetricOutput<M> {
namespace: Namespace,
#[derivative(Debug = "ignore")]
define_metric_fn: DefineMetricFn<M>,
@ -44,17 +46,18 @@ impl<M> MetricOutput<M> {
/// ```
///
pub fn open_scope(&self) -> MetricScope<M> {
MetricScope::new(self.define_metric_fn.clone(), (self.open_scope_fn)())
MetricScope::new(self.namespace.clone(), self.define_metric_fn.clone(), (self.open_scope_fn)())
}
}
/// Create a new metric chain with the provided metric definition and scope creation functions.
pub fn metric_output<MF, WF, M>(define_fn: MF, open_scope_fn: WF) -> MetricOutput<M>
where
MF: Fn(Kind, &str, Sampling) -> M + Send + Sync + 'static,
MF: Fn(&Namespace, Kind, &str, Sampling) -> M + Send + Sync + 'static,
WF: Fn() -> CommandFn<M> + Send + Sync + 'static,
{
MetricOutput {
namespace: ().into(),
define_metric_fn: Arc::new(define_fn),
open_scope_fn: Arc::new(open_scope_fn),
}
@ -70,6 +73,7 @@ impl<M: Send + Sync + Clone + 'static> MetricOutput<M> {
let (define_metric_fn, open_scope_fn) =
mod_fn(self.define_metric_fn.clone(), self.open_scope_fn.clone());
MetricOutput {
namespace: self.namespace.clone(),
define_metric_fn,
open_scope_fn,
}
@ -81,12 +85,31 @@ impl<M: Send + Sync + Clone + 'static> MetricOutput<M> {
MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
{
MetricOutput {
namespace: self.namespace.clone(),
define_metric_fn: self.define_metric_fn.clone(),
open_scope_fn: mod_fn(self.open_scope_fn.clone()),
}
}
/// Return a copy of this output with the specified name appended to the namespace.
pub fn with_suffix(&self, name: &str) -> Self {
MetricOutput {
namespace: self.namespace.with_suffix(name),
define_metric_fn: self.define_metric_fn.clone(),
open_scope_fn: self.open_scope_fn.clone(),
}
}
}
//impl<'a, M: Send + Sync + Clone + 'static> Index<&'a str> for MetricOutput<M> {
// type Output = Self;
//
// fn index(&self, index: &'a str) -> &Self::Output {
// &self.push(index)
// }
//}
impl<M: Send + Sync + Clone + 'static> OpenScope for MetricOutput<M> {
fn open_scope_object(&self) -> Arc<DefineMetric + Send + Sync + 'static> {
Arc::new(self.open_scope())
@ -105,12 +128,3 @@ impl<M: Send + Sync + Clone + 'static> From<MetricOutput<M>> for Arc<DefineMetri
}
}
impl<M: Send + Sync + Clone + 'static> WithNamespace for MetricOutput<M> {
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
let ref ninto = names.into();
MetricOutput {
define_metric_fn: add_namespace(ninto, self.define_metric_fn.clone()),
open_scope_fn: self.open_scope_fn.clone(),
}
}
}

View File

@ -20,7 +20,7 @@ impl<M: Send + Sync + 'static + Clone> WithSamplingRate for MetricOutput<M> {
self.wrap_all(|metric_fn, scope_fn| {
(
Arc::new(move |kind, name, rate| {
Arc::new(move |ns, kind, name, rate| {
// TODO override only if FULL_SAMPLING else warn!()
if rate != FULL_SAMPLING_RATE {
info!(
@ -30,7 +30,7 @@ impl<M: Send + Sync + 'static + Clone> WithSamplingRate for MetricOutput<M> {
}
let new_rate = sampling_rate * rate;
metric_fn(kind, name, new_rate)
metric_fn(ns, kind, name, new_rate)
}),
Arc::new(move || {
let next_scope = scope_fn();

View File

@ -9,7 +9,6 @@
//!
use core::*;
use core::Kind::*;
use namespace::*;
use cache::*;
use schedule::{schedule, CancelHandle};
use output;
@ -31,7 +30,7 @@ pub trait DefineMetric: Flush {
/// Register a new metric.
/// Only one metric of a certain name will be defined.
/// Observer must return a MetricHandle that uniquely identifies the metric.
fn define_metric_object(&self, kind: Kind, name: &str, rate: Sampling)
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, name: &str, rate: Sampling)
-> Box<WriteMetric + Send + Sync>;
}
@ -186,6 +185,7 @@ pub type AppTimer = Timer;
/// Variations of this should also provide control of the metric recording scope.
#[derive(Derivative, Clone)]
pub struct MetricScope<M> {
namespace: Namespace,
flush_on_drop: bool,
#[derivative(Debug = "ignore")]
define_fn: DefineMetricFn<M>,
@ -195,8 +195,9 @@ pub struct MetricScope<M> {
impl<M> MetricScope<M> {
/// Create new application metrics instance.
pub fn new(define_metric_fn: DefineMetricFn<M>, scope: CommandFn<M>) -> Self {
pub fn new(namespace: Namespace, define_metric_fn: DefineMetricFn<M>, scope: CommandFn<M>) -> Self {
MetricScope {
namespace,
flush_on_drop: true,
define_fn: define_metric_fn,
command_fn: scope,
@ -204,12 +205,12 @@ impl<M> MetricScope<M> {
}
}
fn write_fn<M, D: MetricInput<M> + Clone + Send + Sync + 'static>(scope: &D, kind: Kind, name: &str) -> WriteFn
fn scope_write_fn<M, D: MetricInput<M> + Clone + Send + Sync + 'static>(scope: &D, kind: Kind, name: &str) -> WriteFn
where
M: Clone + Send + Sync + 'static,
{
let scope = scope.clone();
let metric = scope.define_metric(kind, name, 1.0);
let metric = scope.define_metric(&ROOT_NS, kind, name, 1.0);
Arc::new(move |value| scope.write(&metric, value))
}
@ -219,30 +220,32 @@ pub trait MetricInput<M>: Clone + Flush + Send + Sync + 'static
M: Clone + Send + Sync + 'static,
{
/// Define an event counter of the provided name.
fn marker(&self, name: &str) -> Marker {
Marker { write: write_fn(self, Marker, name) }
}
fn marker(&self, name: &str) -> Marker;
/// Define a counter of the provided name.
fn counter(&self, name: &str) -> Counter {
Counter { write: write_fn(self, Counter, name) }
}
fn counter(&self, name: &str) -> Counter;
/// Define a timer of the provided name.
fn timer(&self, name: &str) -> Timer {
Timer { write: write_fn(self, Timer, name) }
}
fn timer(&self, name: &str) -> Timer;
/// Define a gauge of the provided name.
fn gauge(&self, name: &str) -> Gauge {
Gauge { write: write_fn(self, Gauge, name) }
}
fn gauge(&self, name: &str) -> Gauge;
/// Define a metric of the specified type.
fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> M;
fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> M;
/// Record or send a value for a previously defined metric.
fn write(&self, metric: &M, value: Value);
/// Join namespace and prepend in newly defined metrics.
#[deprecated(since = "0.7.0", note = "Misleading terminology, use with_suffix() instead.")]
fn with_prefix(&self, name: &str) -> Self {
self.with_suffix(name)
}
/// Join namespace and prepend in newly defined metrics.
fn with_suffix(&self, name: &str) -> Self;
}
/// Scopes can implement buffering, requiring flush operations to commit metric values.
@ -256,13 +259,44 @@ impl<M> MetricInput<M> for MetricScope<M>
where
M: Clone + Send + Sync + 'static,
{
fn define_metric(&self, kind: Kind, name: &str, rate: Sampling) -> M {
(self.define_fn)(kind, name, rate)
/// Define an event counter of the provided name.
fn marker(&self, name: &str) -> Marker {
Marker { write: scope_write_fn(self, Marker, name) }
}
/// Define a counter of the provided name.
fn counter(&self, name: &str) -> Counter {
Counter { write: scope_write_fn(self, Counter, name) }
}
/// Define a timer of the provided name.
fn timer(&self, name: &str) -> Timer {
Timer { write: scope_write_fn(self, Timer, name) }
}
/// Define a gauge of the provided name.
fn gauge(&self, name: &str) -> Gauge {
Gauge { write: scope_write_fn(self, Gauge, name) }
}
fn define_metric(&self, source_ns: &Namespace, kind: Kind, name: &str, rate: Sampling) -> M {
(self.define_fn)(source_ns, kind, name, rate)
}
fn write(&self, metric: &M, value: Value) {
self.command_fn.write(metric, value);
}
fn with_suffix(&self, name: &str) -> Self {
MetricScope {
namespace: self.namespace.with_suffix(name),
flush_on_drop: self.flush_on_drop,
define_fn: self.define_fn.clone(),
command_fn: self.command_fn.clone(),
}
}
}
/// Scopes can implement buffering, in which case they can be flushed.
@ -304,11 +338,11 @@ struct MetricWriter<M> {
}
impl<M: Send + Sync + Clone + 'static> DefineMetric for MetricScope<M> {
fn define_metric_object(&self, kind: Kind, name: &str, rate: Sampling)
fn define_metric_object(&self, namespace: &Namespace, kind: Kind, name: &str, rate: Sampling)
-> Box<WriteMetric + Send + Sync>
{
Box::new(MetricWriter {
define_fn: self.define_metric(kind, name, rate),
define_fn: self.define_metric(namespace, kind, name, rate),
command_fn: self.command_fn.clone(),
})
}
@ -322,20 +356,10 @@ impl<M> WriteMetric for MetricWriter<M> {
//// Mutators impl
impl<M: Send + Sync + Clone + 'static> WithNamespace for MetricScope<M> {
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
let ns = &names.into();
MetricScope {
flush_on_drop: self.flush_on_drop,
define_fn: add_namespace(ns, self.define_fn.clone()),
command_fn: self.command_fn.clone(),
}
}
}
impl<M: Send + Sync + Clone + 'static> WithCache for MetricScope<M> {
fn with_cache(&self, cache_size: usize) -> Self {
MetricScope {
namespace: self.namespace.clone(),
flush_on_drop: self.flush_on_drop,
define_fn: add_cache(cache_size, self.define_fn.clone()),
command_fn: self.command_fn.clone(),
@ -351,7 +375,7 @@ mod bench {
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = new_aggregate();
let metrics = MetricAggregator::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}

View File

@ -33,6 +33,8 @@ pub type ScoreSnapshot = (Kind, String, Vec<ScoreType>);
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
pub struct Scoreboard {
namespace: Namespace,
/// The kind of metric.
kind: Kind,
@ -44,11 +46,12 @@ pub struct Scoreboard {
impl Scoreboard {
/// Create a new Scoreboard to track summary values of a metric
pub fn new(kind: Kind, name: String) -> Self {
pub fn new(namespace: Namespace, kind: Kind, name: String) -> Self {
Scoreboard {
namespace,
kind,
name,
scores: unsafe { mem::transmute(Scoreboard::blank(TimeHandle::now().into())) },
scores: unsafe { mem::transmute(Scoreboard::blank(accurate_clock_micros() as usize)) },
}
}
@ -92,7 +95,7 @@ impl Scoreboard {
/// Map raw scores (if any) to applicable statistics
pub fn reset(&self) -> Option<ScoreSnapshot> {
let now: usize = TimeHandle::now().into();
let now: usize = accurate_clock_micros() as usize;
let mut scores = Scoreboard::blank(now);
if self.snapshot(now, &mut scores) {
let duration_seconds = (now - scores[0]) as f64 / 1_000.0;
@ -108,13 +111,24 @@ impl Scoreboard {
snapshot.push(Min(scores[4] as u64));
snapshot.push(Mean(scores[2] as f64 / scores[1] as f64));
}
Timer | Counter => {
Timer => {
snapshot.push(Count(scores[1] as u64));
snapshot.push(Sum(scores[2] as u64));
snapshot.push(Max(scores[3] as u64));
snapshot.push(Min(scores[4] as u64));
snapshot.push(Mean(scores[2] as f64 / scores[1] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
Counter => {
snapshot.push(Count(scores[1] as u64));
snapshot.push(Sum(scores[2] as u64));
snapshot.push(Max(scores[3] as u64));
snapshot.push(Min(scores[4] as u64));
snapshot.push(Mean(scores[2] as f64 / scores[1] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[2] as f64 / duration_seconds))
}
}
@ -147,19 +161,19 @@ mod bench {
#[bench]
fn bench_score_update_marker(b: &mut test::Bencher) {
let metric = Scoreboard::new(Marker, "event_a".to_string());
let metric = Scoreboard::new(ROOT_NS.clone(), Marker, "event_a".to_string());
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn bench_score_update_count(b: &mut test::Bencher) {
let metric = Scoreboard::new(Counter, "event_a".to_string());
let metric = Scoreboard::new(ROOT_NS.clone(), Counter, "event_a".to_string());
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn bench_score_empty_snapshot(b: &mut test::Bencher) {
let metric = Scoreboard::new(Counter, "event_a".to_string());
let metric = Scoreboard::new(ROOT_NS.clone(), Counter, "event_a".to_string());
let mut scores = Scoreboard::blank(0);
b.iter(|| test::black_box(metric.snapshot(0, &mut scores)));
}

View File

@ -6,6 +6,5 @@ pub use core::*;
pub use scope::*;
pub use aggregate::*;
pub use namespace::*;
metrics!(<Aggregate> pub DIPSTICK_METRICS = "dipstick");

View File

@ -11,7 +11,7 @@ use std::sync::{Arc, RwLock};
pub use std::net::ToSocketAddrs;
metrics! {
<Aggregate> DIPSTICK_METRICS.with_prefix("statsd") => {
<Aggregate> DIPSTICK_METRICS.with_suffix("statsd") => {
Marker SEND_ERR: "send_failed";
Counter SENT_BYTES: "sent_bytes";
}
@ -30,9 +30,8 @@ where
let buffered = false;
Ok(metric_output(
move |kind, name, rate| {
let mut prefix = String::with_capacity(32);
prefix.push_str(name);
move |namespace, kind, name, rate| {
let mut prefix = namespace.join(name, ".");
prefix.push(':');
let mut suffix = String::with_capacity(16);
@ -160,7 +159,7 @@ mod bench {
#[bench]
pub fn timer_statsd(b: &mut test::Bencher) {
let sd = to_statsd("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(Kind::Timer, "timer", 1000000.0);
let timer = sd.define_metric(&ROOT_NS, Kind::Timer, "timer", 1000000.0);
b.iter(|| test::black_box(sd.write(&timer, 2000)));
}