Fix marker rate, add Namespace

This commit is contained in:
Francis Lalonde 2018-01-16 15:00:47 -05:00
parent a3cd89cca9
commit fce1c72965
20 changed files with 249 additions and 49 deletions

View File

@ -4,6 +4,7 @@ members = [
"examples/counter_timer_gauge/",
"examples/multi_outs/",
"examples/aggregate_print/",
"examples/summary_print/",
"examples/async_print/",
"examples/custom_publish/",
"examples/raw_log/",

View File

@ -92,29 +92,31 @@ let _app_metrics = app_metrics(to_stdout()).with_async_queue(64);
The async queue uses a Rust channel and a standalone thread.
The current behavior is to block when full.
Metric definitions can be cached to make using _ad-hoc metrics_ faster:
```rust,skt-run
let app_metrics = app_metrics(to_log().with_cache(512));
app_metrics.gauge(format!("my_gauge_{}", 34)).value(44);
```
The preferred way is to _predefine metrics_,
possibly in a [lazy_static!](https://crates.io/crates/lazy_static) block:
For better performance and easy maintenance, metrics should usually be predefined:
```rust,skt-plain
#[macro_use] extern crate dipstick;
#[macro_use] extern crate lazy_static;
extern crate dipstick;
use dipstick::*;
lazy_static! {
pub static ref METRICS: AppMetrics<String> = app_metrics(to_stdout());
pub static ref COUNTER_A: AppCounter<String> = METRICS.counter("counter_a");
}
app_metric!(String, APP_METRICS, app_metrics(to_stdout()));
app_counter!(String, APP_METRICS, {
COUNTER_A: "counter_a",
});
fn main() {
COUNTER_A.count(11);
}
```
Metric definition macros are just `lazy_static!` wrappers.
Where necessary, metrics can be defined _ad-hoc_:
```rust,skt-run
let user_name = "john_day";
let app_metrics = app_metrics(to_log().with_cache(512));
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
```
Defining a cache is optional but will speed up re-definition of common ad-hoc metrics.
Timers can be used multiple ways:
```rust,skt-run

View File

@ -7,7 +7,7 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let to_aggregate = aggregate(summary, to_stdout());
let to_aggregate = aggregate(all_stats, to_stdout());
let app_metrics = app_metrics(to_aggregate);

View File

@ -13,7 +13,7 @@ fn main() {
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_b");
let subsystem_metrics = metrics.with_prefix("subsystem.");
let subsystem_metrics = metrics.with_name("subsystem");
let event = subsystem_metrics.marker("event_c");
let gauge = subsystem_metrics.gauge("gauge_d");

View File

@ -19,7 +19,7 @@ fn main() {
app_metrics.counter("just_once").count(4);
// metric names can be prepended with a common prefix
let prefixed_metrics = app_metrics.with_prefix("subsystem.");
let prefixed_metrics = app_metrics.with_name("subsystem".to_string());
let event = prefixed_metrics.marker("event_c");
let gauge = prefixed_metrics.gauge("gauge_d");

View File

@ -11,7 +11,7 @@ fn main() {
let metrics = app_metrics(
to_graphite("localhost:2003").expect("Connecting")
.with_namespace(&["my", "app"]));
.with_namespace(&["my", "app"][..]));
loop {
metrics.counter("counter_a").count(123);

View File

@ -15,8 +15,8 @@ fn main() {
let same_type_metrics = app_metrics(
&[
// use slices to combine multiple metrics of the same type
to_stdout().with_prefix("yeah"),
to_stdout().with_prefix("ouch"),
to_stdout().with_name("yeah"),
to_stdout().with_name("ouch"),
to_stdout().with_sampling_rate(0.5),
][..],
);

View File

@ -0,0 +1,7 @@
[package]
name = "summary_print"
version = "0.0.0"
workspace = "../../"
[dependencies]
dipstick = { path = '../../' }

View File

@ -0,0 +1,37 @@
//! A sample application continuously aggregating metrics,
//! printing the summary stats every three seconds
extern crate dipstick;
use std::time::Duration;
use dipstick::*;
fn main() {
let to_aggregate = aggregate(summary, to_stdout());
let app_metrics = app_metrics(to_aggregate);
app_metrics.flush_every(Duration::from_secs(3));
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_a");
let gauge = app_metrics.gauge("gauge_a");
let marker = app_metrics.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}

View File

@ -220,9 +220,9 @@ where
}
impl<M: Send + Sync + Clone + 'static> WithNamespace for AppMetrics<M> {
fn with_namespace(&self, nspace: &[&str]) -> Self {
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
AppMetrics {
chain: Arc::new(self.chain.with_namespace(nspace)),
chain: Arc::new(self.chain.with_name(names)),
scope: self.scope.clone(),
}
}

View File

@ -9,7 +9,7 @@ use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
mod_metric!(Aggregate, QUEUE_METRICS = DIPSTICK_METRICS.with_prefix("async_queue"));
mod_metrics!(Aggregate, QUEUE_METRICS = DIPSTICK_METRICS.with_prefix("async_queue"));
mod_marker!(Aggregate, QUEUE_METRICS, { SEND_FAILED: "send_failed" });
/// Enqueue collected metrics for dispatch on background thread.

View File

@ -209,9 +209,9 @@ impl<M: Send + Sync + Clone + 'static> Chain<M> {
/// Create a new metric chain with the provided metric definition and scope creation functions.
pub fn new<MF, WF>(make_metric: MF, make_scope: WF) -> Self
where
MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static,
WF: Fn(bool) -> ControlScopeFn<M> + Send + Sync + 'static,
where
MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static,
WF: Fn(bool) -> ControlScopeFn<M> + Send + Sync + 'static,
{
Chain {
// capture the provided closures in Arc to provide cheap clones
@ -246,8 +246,8 @@ impl<M: Send + Sync + Clone + 'static> Chain<M> {
/// Intercept metric definition without changing the metric type.
pub fn mod_metric<MF>(&self, mod_fn: MF) -> Chain<M>
where
MF: Fn(DefineMetricFn<M>) -> DefineMetricFn<M>,
where
MF: Fn(DefineMetricFn<M>) -> DefineMetricFn<M>,
{
Chain {
define_metric_fn: mod_fn(self.define_metric_fn.clone()),
@ -257,9 +257,9 @@ impl<M: Send + Sync + Clone + 'static> Chain<M> {
/// Intercept both metric definition and scope creation, possibly changing the metric type.
pub fn mod_both<MF, N>(&self, mod_fn: MF) -> Chain<N>
where
MF: Fn(DefineMetricFn<M>, OpenScopeFn<M>) -> (DefineMetricFn<N>, OpenScopeFn<N>),
N: Clone + Send + Sync,
where
MF: Fn(DefineMetricFn<M>, OpenScopeFn<M>) -> (DefineMetricFn<N>, OpenScopeFn<N>),
N: Clone + Send + Sync,
{
let (metric_fn, scope_fn) =
mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone());
@ -271,8 +271,8 @@ impl<M: Send + Sync + Clone + 'static> Chain<M> {
/// Intercept scope creation.
pub fn mod_scope<MF>(&self, mod_fn: MF) -> Self
where
MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
where
MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
{
Chain {
define_metric_fn: self.define_metric_fn.clone(),
@ -391,4 +391,3 @@ impl<M: Clone> ScopeTimer<M> {
value
}
}

102
src/dispatch.rs Normal file
View File

@ -0,0 +1,102 @@
use core::*;
use chain::*;
use std::collections::{HashMap, LinkedList};
pub struct MetricHandle (usize);
pub struct ScopeHandle (usize);
pub trait Observer {
/// Register a new metric.
/// Only one metric of a certain name will be defined.
/// Observer must return a MetricHandle that uniquely identifies the metric.
fn metric_create(&self, kind: Kind, name: &str, rate: Rate) -> MetricHandle;
/// Drop a previously registered metric.
/// Drop is called once per handle.
/// Dropped handle will never be used again.
/// Drop is only called with previously registered handles.
fn metric_drop(&self, metric: MetricHandle);
/// Open a new scope.
/// Observer must return a new ScopeHandle that uniquely identifies the scope.
fn scope_open(&self, buffered: bool) -> ScopeHandle;
/// Write metric value to a scope.
/// Observers only receive previously registered handles.
fn scope_write(&self, scope: ScopeHandle, metric: MetricHandle, value:Value);
/// Flush a scope.
/// Observers only receive previously registered handles.
fn scope_flush(&self, scope: ScopeHandle);
/// Drop a previously registered scope.
/// Drop is called once per handle.
/// Dropped handle will never be used again.
/// Drop is only called with previously registered handles.
fn scope_close(&self, scope: ScopeHandle);
}
pub struct ChainObserver<T> {
chain: Chain<T>
}
impl<T> Observer for ChainObserver<T> {
fn metric_create(&self, kind: Kind, name: &str, rate: Rate) -> MetricHandle {
self.chain.define_metric(kind, name, rate)
}
fn metric_drop(&self, metric: MetricHandle) {}
fn scope_open(&self, buffered: bool) -> ScopeHandle {}
fn scope_write(&self, scope: ScopeHandle, metric: MetricHandle, value:Value) {}
fn scope_flush(&self, scope: ScopeHandle) {}
fn scope_close(&self, scope: ScopeHandle) {}
}
pub struct Dispatcher {
active_observers: usize,
metrics: HashMap<String, Dispatch>,
observers: RwLock<Vec<Observer>>
}
/// Aggregate metrics in memory.
/// Depending on the type of metric, count, sum, minimum and maximum of values will be tracked.
/// Needs to be connected to a publish to be useful.
/// ```
/// use dipstick::*;
/// let sink = aggregate(4, summary, to_stdout());
/// let metrics = global_metrics(sink);
/// metrics.marker("my_event").mark();
/// metrics.marker("my_event").mark();
/// ```
pub fn dispatch<E, M>(stat_fn: E, to_chain: Chain<M>) -> Chain<Dispatch>
where
E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
M: Clone + Send + Sync + Debug + 'static,
{
let metrics = Arc::new(RwLock::new(HashMap::new()));
let metrics0 = metrics.clone();
let publish = Arc::new(Publisher::new(stat_fn, to_chain));
Chain::new(
move |kind, name, _rate| {
// add metric
},
move |_buffered| {
// open scope
ControlScopeFn::new(move |cmd| match cmd {
ScopeCmd::Write(metric, value) => {
let metric: &Aggregate = metric;
metric.update(value)
},
ScopeCmd::Flush => {
let metrics = metrics.read().expect("Locking metrics scoreboards");
let snapshot = metrics.values().flat_map(|score| score.reset()).collect();
publish.publish(snapshot);
}
})
},
)
}

View File

@ -13,7 +13,7 @@ use std::fmt::Debug;
use socket::RetrySocket;
mod_metric!(Aggregate, GRAPHITE_METRICS = DIPSTICK_METRICS.with_prefix("graphite"));
mod_metrics!(Aggregate, GRAPHITE_METRICS = DIPSTICK_METRICS.with_prefix("graphite"));
mod_marker!(Aggregate, GRAPHITE_METRICS, {
SEND_ERR: "send_failed",
TRESHOLD_EXCEEDED: "bufsize_exceeded",

View File

@ -33,11 +33,14 @@ mod lru_cache;
pub mod error;
#[macro_use]
pub mod macros;
pub mod core;
pub use core::*;
#[macro_use]
pub mod macros;
//pub mod dispatch;
//pub use dispatch::*;
mod output;
pub use output::*;

View File

@ -22,7 +22,7 @@ macro_rules! time {
/// Define application-scoped metrics.
#[macro_export]
macro_rules! app_metric {
macro_rules! app_metrics {
($type_param: ty, $metric_id: ident = $app_metrics: expr) => {
lazy_static! { pub static ref $metric_id: AppMetrics<$type_param> = $app_metrics; }
};
@ -66,7 +66,7 @@ macro_rules! app_timer {
/// Define module-scoped metrics.
#[macro_export]
macro_rules! mod_metric {
macro_rules! mod_metrics {
($type_param: ty, $metric_id: ident = $mod_metrics: expr) => {
lazy_static! { static ref $metric_id: AppMetrics<$type_param> = $mod_metrics; }
};
@ -111,7 +111,7 @@ macro_rules! mod_timer {
mod test_app {
use self_metrics::*;
app_metric!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"));
app_metrics!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"));
app_marker!(Aggregate, TEST_METRICS, {
M1: "failed",
@ -154,7 +154,7 @@ mod test_app {
mod test_mod {
use self_metrics::*;
mod_metric!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"));
mod_metrics!(Aggregate, TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"));
mod_marker!(Aggregate, TEST_METRICS, {
M1: "failed",

View File

@ -3,26 +3,72 @@ use core::*;
use std::sync::Arc;
const DEFAULT_SEPARATOR: &'static str = ".";
/// A list of parts of a metric's name.
#[derive(Debug, Clone)]
pub struct Namespace (Vec<String>);
impl Namespace {
/// Make this namespace a subspace of the parent.
pub fn subspace_of(self, parent: &Namespace) -> Self {
Namespace([parent.0.clone(), self.0].concat())
}
/// Combine name parts into a string.
pub fn join(&self, separator: &str) -> String {
self.0.join(separator)
}
}
impl<'a> From<&'a str> for Namespace {
fn from(name: &'a str) -> Namespace {
Namespace(vec![name.to_string()])
}
}
impl From<String> for Namespace {
fn from(name: String) -> Namespace {
Namespace(vec![name])
}
}
impl<'a, 'b: 'a> From<&'b [&'a str]> for Namespace {
fn from(names: &'a [&'a str]) -> Namespace {
Namespace(names.iter().map(|n| n.to_string()).collect())
}
}
/// Prepend metric names with custom prefix.
pub trait WithNamespace
where
Self: Sized,
{
/// Insert prefix in newly defined metrics.
// #[deprecated(since = "0.6.3", note = "Use `with_name` instead.")]
fn with_prefix<AS: AsRef<str>>(&self, prefix: AS) -> Self {
self.with_namespace(&[prefix.as_ref()])
}
/// Join namespace and prepend in newly defined metrics.
fn with_namespace(&self, names: &[&str]) -> Self;
// #[deprecated(since = "0.6.3", note = "Use `with_name` instead.")]
fn with_namespace(&self, names: &[&str]) -> Self {
self.with_name(names)
}
/// Join namespace and prepend in newly defined metrics.
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self;
}
impl<M: Send + Sync + Clone + 'static> WithNamespace for Chain<M> {
fn with_namespace(&self, names: &[&str]) -> Self {
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
let ninto = names.into();
self.mod_metric(|next| {
let nspace = names.join(".");
let nspace = ninto.join(DEFAULT_SEPARATOR);
Arc::new(move |kind, name, rate| {
let name = [nspace.as_ref(), name].join(".");
let name = [nspace.as_ref(), name].join(DEFAULT_SEPARATOR);
(next)(kind, name.as_ref(), rate)
})
})
@ -31,7 +77,7 @@ impl<M: Send + Sync + Clone + 'static> WithNamespace for Chain<M> {
/// deprecated, use with_prefix() omitting any previously supplied separator
#[deprecated(since = "0.5.0",
note = "Use `with_prefix` instead, omitting any previously supplied separator.")]
note = "Use `with_name` instead, omitting any previously supplied separator.")]
pub fn prefix<M, IC>(prefix: &str, chain: IC) -> Chain<M>
where
M: Clone + Send + Sync + 'static,

View File

@ -99,11 +99,14 @@ impl Scoreboard {
if self.snapshot(now, &mut scores) {
let duration_seconds = (now - scores[0]) as f64 / 1_000_000_000.0;
println!("duration {}s", duration_seconds);
println!("rate {}/s", scores[2] as f64 / duration_seconds);
let mut snapshot = Vec::new();
match self.kind {
Marker => {
snapshot.push(Count(scores[1] as u64));
snapshot.push(Rate(scores[2] as f64 / duration_seconds))
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
Gauge => {
snapshot.push(Max(scores[3] as u64));

View File

@ -28,4 +28,4 @@ fn build_self_metrics() -> AppMetrics<Aggregate> {
lazy_static! { static ref AGGREGATOR: Chain<Aggregate> = build_aggregator(); }
/// Application metrics are collected to the aggregator
app_metric!(Aggregate, DIPSTICK_METRICS = build_self_metrics());
app_metrics!(Aggregate, DIPSTICK_METRICS = build_self_metrics());

View File

@ -9,7 +9,7 @@ use std::sync::{Arc, RwLock};
pub use std::net::ToSocketAddrs;
mod_metric!(Aggregate, STATSD_METRICS = DIPSTICK_METRICS.with_prefix("statsd"));
mod_metrics!(Aggregate, STATSD_METRICS = DIPSTICK_METRICS.with_prefix("statsd"));
mod_marker!(Aggregate, STATSD_METRICS, { SEND_ERR: "send_failed" });
mod_counter!(Aggregate, STATSD_METRICS, { SENT_BYTES: "sent_bytes" });