Renaming to Bucket, Proxy, Name, Input, Output...

This commit is contained in:
Francis Lalonde 2018-06-15 17:33:28 -04:00
parent c72c66edc9
commit b35c014e60
32 changed files with 333 additions and 326 deletions

View File

@ -135,7 +135,7 @@ timer.interval_us(123_456);
Related metrics can share a namespace:
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let db_metrics = app_metrics.with_prefix("database");
let db_metrics = app_metrics.add_name("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
```

View File

@ -7,11 +7,11 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let metrics = MetricAggregator::new().with_prefix("test");
let metrics = Bucket::new().add_name("test");
// MetricAggregator::set_default_output(to_stdout());
// Bucket::set_default_output(to_stdout());
metrics.set_output(to_graphite("localhost:2003").expect("Graphite host name and port")
.with_prefix("machine1").with_prefix("application"));
.add_name("machine1").add_name("application"));
metrics.flush_every(Duration::from_secs(3));

View File

@ -7,9 +7,9 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let metrics = MetricAggregator::new().with_prefix("test");
let metrics = Bucket::new().add_name("test");
// MetricAggregator::set_default_output(to_stdout());
// Bucket::set_default_output(to_stdout());
metrics.set_output(to_stdout());
metrics.flush_every(Duration::from_secs(3));

View File

@ -13,7 +13,7 @@ fn main() {
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_b");
let subsystem_metrics = metrics.with_prefix("subsystem");
let subsystem_metrics = metrics.add_name("subsystem");
let event = subsystem_metrics.marker("event_c");
let gauge = subsystem_metrics.gauge("gauge_d");

View File

@ -19,7 +19,7 @@ fn main() {
app_metrics.counter("just_once").count(4);
// metric names can be prepended with a common prefix
let prefixed_metrics = app_metrics.with_prefix("subsystem");
let prefixed_metrics = app_metrics.add_name("subsystem");
let event = prefixed_metrics.marker("event_c");
let gauge = prefixed_metrics.gauge("gauge_d");

View File

@ -15,7 +15,7 @@ fn main() {
// add counts forever, non-stop
println!("\n------- open scope");
let metrics = output.open_scope();
let metrics = output.new_input_dyn();
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");

View File

@ -9,9 +9,9 @@ use dipstick::*;
fn main() {
fn custom_statistics(
kind: Kind,
mut name: Namespace,
mut name: Name,
score: ScoreType,
) -> Option<(Kind, Namespace, Value)> {
) -> Option<(Kind, Name, Value)> {
match (kind, score) {
// do not export gauge scores
(Kind::Gauge, _) => None,
@ -19,7 +19,7 @@ fn main() {
// prepend and append to metric name
(_, ScoreType::Count(count)) => {
if let Some(last) = name.pop() {
name.push("customized_with_prefix");
name.push("customized_add_name".into());
name.push(format!("{}_and_a_suffix", last));
Some((
Kind::Counter,
@ -32,7 +32,7 @@ fn main() {
},
// scaling the score value and appending unit to name
(kind, ScoreType::Sum(sum)) => Some((kind, name.with_prefix("per_thousand"), sum / 1000)),
(kind, ScoreType::Sum(sum)) => Some((kind, name.add_name("per_thousand"), sum / 1000)),
// using the unmodified metric name
(kind, ScoreType::Mean(avg)) => Some((kind, name, avg.round() as u64)),
@ -43,10 +43,10 @@ fn main() {
}
// send application metrics to aggregator
MetricAggregator::set_default_output(to_stdout());
MetricAggregator::set_default_stats(custom_statistics);
Bucket::set_default_output(to_stdout());
Bucket::set_default_stats(custom_statistics);
let app_metrics = MetricAggregator::new();
let app_metrics = Bucket::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -12,8 +12,8 @@ fn main() {
let metrics =
to_graphite("localhost:2003")
.expect("Connecting")
.with_prefix("my_app")
.open_scope();
.add_name("my_app")
.new_input_dyn();
loop {
metrics.counter("counter_a").count(123);

View File

@ -9,7 +9,7 @@ use dipstick::*;
use std::time::Duration;
// undeclared root (un-prefixed) metrics
metrics!(<MetricAggregator> pub AGGREGATE = to_aggregate() => {
metrics!(<Bucket> pub AGGREGATE = to_aggregate() => {
// create counter "some_counter"
pub Counter ROOT_COUNTER: "root_counter";
// create gauge "root_gauge"
@ -19,14 +19,14 @@ metrics!(<MetricAggregator> pub AGGREGATE = to_aggregate() => {
});
metrics!( <MetricAggregator> AGGREGATE.with_prefix("module_prefix") => {
metrics!( <Bucket> AGGREGATE.add_name("module_prefix") => {
// create counter "module_prefix.module_counter"
Counter MOD_COUNTER: "module_counter";
});
fn main() {
// print aggregated metrics to the console
MetricAggregator::set_default_output(to_stdout());
Bucket::set_default_output(to_stdout());
// enable autoflush...
AGGREGATE.flush_every(Duration::from_millis(4000));

View File

@ -37,7 +37,7 @@ metrics!(LIB_METRICS => {
});
fn main() {
ROOT_DISPATCH.set_target(to_stdout().open());
ROOT_PROXY.set_target(to_stdout().new_input());
loop {
ROOT_COUNTER.count(123);

View File

@ -18,17 +18,17 @@
//#[ignore(deprecated)]
//app_metrics!(
// MultiOutput, SAME_TYPE = to_multi()
// .with_output(to_stdout().with_prefix("yeah"))
// .with_output(to_stdout().with_prefix("ouch"))
// .with_output(to_stdout().add_name("yeah"))
// .with_output(to_stdout().add_name("ouch"))
//);
//
//#[ignore(deprecated)]
//app_metrics!(
// MultiOutput, MUTANT_CHILD = SAME_TYPE.with_prefix("super").with_prefix("duper")
// MultiOutput, MUTANT_CHILD = SAME_TYPE.add_name("super").add_name("duper")
//);
fn main() {
// let mmm: &OpenScope = &to_stdout();
// let mmm: &OutputDyn = &to_stdout();
//
// loop {
// DIFFERENT_TYPES.counter("counter_a").count(123);

View File

@ -9,13 +9,13 @@ fn main() {
// will output metrics to graphite and to stdout
let different_type_metrics = MultiOutput::new()
.with_output(to_graphite("localhost:2003").expect("Connecting"))
.with_output(to_stdout()).open();
.with_output(to_stdout()).new_input();
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiOutput::new()
.with_output(to_stdout().with_prefix("yeah"))
.with_output(to_stdout().with_prefix("ouch"))
.with_prefix("cool").open();
.with_output(to_stdout().add_name("yeah"))
.with_output(to_stdout().add_name("ouch"))
.add_name("cool").new_input();
loop {
different_type_metrics.counter("counter_a").count(123);

View File

@ -3,7 +3,7 @@
extern crate dipstick;
use dipstick::{MetricOutput, MetricInput};
use dipstick::{Output, Input};
fn main() {
raw_write()
@ -11,11 +11,11 @@ fn main() {
pub fn raw_write() {
// setup dual metric channels
let metrics_log = dipstick::to_log().open();
let metrics_log = dipstick::to_log().new_input();
// define and send metrics using raw channel API
let counter = metrics_log.define_metric(
&"count_a".into(),
let counter = metrics_log.new_metric(
"count_a".into(),
dipstick::Kind::Counter,
);
counter.write(1);

View File

@ -8,7 +8,7 @@ use dipstick::*;
fn main() {
// print only 1 out of every 10000 metrics recorded
let app_metrics = to_statsd("statsd:8125").expect("Statsd")
.with_sampling_rate(Sampling::SampleRate(0.0001)).open_scope();
.with_sampling_rate(Sampling::SampleRate(0.0001)).new_input_dyn();
let marker = app_metrics.marker("marker_a");

View File

@ -8,7 +8,7 @@ use dipstick::*;
fn main() {
let app_metrics = MetricAggregator::new();
let app_metrics = Bucket::new();
app_metrics.set_output(to_stdout());
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -2,9 +2,9 @@
//! Metrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
//!
use core::{MetricInput, Value, WriteFn, Namespace, Kind, Flush, Marker, WithPrefix,
use core::{Input, Value, WriteFn, Name, Kind, Flush, Marker, WithName,
Attributes, WithAttributes};
use aggregate::MetricAggregator;
use bucket::Bucket;
use error;
use self_metrics::DIPSTICK_METRICS;
@ -13,14 +13,14 @@ use std::sync::mpsc;
use std::thread;
metrics!{
<MetricAggregator> DIPSTICK_METRICS.with_prefix("async_queue") => {
<Bucket> DIPSTICK_METRICS.add_name("async_queue") => {
/// Maybe queue was full?
Marker SEND_FAILED: "send_failed";
}
}
/// Wrap the input with an async dispatch queue for lower app latency.
pub fn to_async<IN: MetricInput + Send + Sync + 'static + Clone>(input: IN, queue_length: usize) -> AsyncInput {
pub fn to_async<IN: Input + Send + Sync + 'static + Clone>(input: IN, queue_length: usize) -> AsyncInput {
AsyncInput::wrap(input, queue_length)
}
@ -46,7 +46,7 @@ pub enum AsyncCmd {
pub struct AsyncInput {
attributes: Attributes,
sender: Arc<mpsc::SyncSender<AsyncCmd>>,
input: Arc<MetricInput + Send + Sync + 'static>
input: Arc<Input + Send + Sync + 'static>
}
impl WithAttributes for AsyncInput {
@ -56,7 +56,7 @@ impl WithAttributes for AsyncInput {
impl AsyncInput {
/// Wrap the input with an async dispatch queue for lower app latency.
pub fn wrap(input: impl MetricInput + Send + Sync + 'static + Clone, queue_length: usize) -> Self {
pub fn wrap(input: impl Input + Send + Sync + 'static + Clone, queue_length: usize) -> Self {
let flusher = input.clone();
let (sender, receiver) = mpsc::sync_channel::<AsyncCmd>(queue_length);
thread::spawn(move || {
@ -83,9 +83,9 @@ impl AsyncInput {
}
}
impl MetricInput for AsyncInput {
fn define_metric(&self, name: &Namespace, kind:Kind) -> WriteFn {
let target_metric = self.input.define_metric(&self.qualified_name(name), kind);
impl Input for AsyncInput {
fn new_metric(&self, name: Name, kind:Kind) -> WriteFn {
let target_metric = self.input.new_metric(self.qualified_name(name), kind);
let sender = self.sender.clone();
WriteFn::new(move |value| {
if let Err(e) = sender.send(AsyncCmd::Write(target_metric.clone(), value)) {

View File

@ -1,6 +1,6 @@
//! Maintain aggregated metrics for deferred reporting,
//!
use core::{Kind, Value, Namespace, WithPrefix, NO_METRIC_OUTPUT, MetricInput, Flush, OpenScope, WriteFn, WithAttributes, Attributes};
use core::{Kind, Value, Name, WithName, NO_METRIC_OUTPUT, Input, Flush, OutputDyn, WriteFn, WithAttributes, Attributes};
use clock::TimeHandle;
use core::Kind::*;
use error;
@ -12,63 +12,63 @@ use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
/// A function type to transform aggregated scores into publishable statistics.
pub type StatsFn = Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static;
pub type StatsFn = Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static;
fn initial_stats() -> &'static StatsFn {
&summary
}
fn initial_output() -> Arc<OpenScope + Send + Sync> {
fn initial_output() -> Arc<OutputDyn + Send + Sync> {
NO_METRIC_OUTPUT.clone()
}
lazy_static! {
static ref DEFAULT_AGGREGATE_STATS: RwLock<Arc<StatsFn>> = RwLock::new(Arc::new(initial_stats()));
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OpenScope + Send + Sync>> = RwLock::new(initial_output());
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OutputDyn + Send + Sync>> = RwLock::new(initial_output());
}
/// Create a new metric aggregator
pub fn to_aggregate() -> MetricAggregator {
MetricAggregator::new()
pub fn to_aggregate() -> Bucket {
Bucket::new()
}
/// Central aggregation structure.
/// Maintains a list of metrics for enumeration when used as source.
#[derive(Debug, Clone)]
pub struct MetricAggregator {
pub struct Bucket {
attributes: Attributes,
inner: Arc<RwLock<InnerAggregator>>,
inner: Arc<RwLock<InnerBucket>>,
}
#[derive(Derivative)]
#[derivative(Debug)]
struct InnerAggregator {
metrics: BTreeMap<Namespace, Arc<Scoreboard>>,
struct InnerBucket {
metrics: BTreeMap<Name, Arc<Scoreboard>>,
period_start: TimeHandle,
#[derivative(Debug = "ignore")]
stats: Option<Arc<Fn(Kind, Namespace, ScoreType)
-> Option<(Kind, Namespace, Value)> + Send + Sync + 'static>>,
stats: Option<Arc<Fn(Kind, Name, ScoreType)
-> Option<(Kind, Name, Value)> + Send + Sync + 'static>>,
#[derivative(Debug = "ignore")]
output: Option<Arc<OpenScope + Send + Sync + 'static>>,
output: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
}
lazy_static! {
static ref PERIOD_LENGTH: Namespace = "_period_length".into();
static ref PERIOD_LENGTH: Name = "_period_length".into();
}
impl InnerAggregator {
impl InnerBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, publish_scope: &MetricInput, stats_fn: &StatsFn) {
pub fn flush_to(&mut self, publish_scope: &Input, stats_fn: &StatsFn) {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
self.period_start = now;
let mut snapshot: Vec<(&Namespace, Kind, Vec<ScoreType>)> = self.metrics.iter()
let mut snapshot: Vec<(&Name, Kind, Vec<ScoreType>)> = self.metrics.iter()
.flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) {
Some((name, scores.metric_kind(), values))
} else {
@ -89,7 +89,7 @@ impl InnerAggregator {
for score in metric.2 {
let filtered = (stats_fn)(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: WriteFn = publish_scope.define_metric(&name, kind);
let metric: WriteFn = publish_scope.new_metric(name, kind);
(metric)(value)
}
}
@ -99,18 +99,18 @@ impl InnerAggregator {
}
impl<S: AsRef<str>> From<S> for MetricAggregator {
fn from(name: S) -> MetricAggregator {
MetricAggregator::new().with_prefix(name.as_ref())
impl<S: AsRef<str>> From<S> for Bucket {
fn from(name: S) -> Bucket {
Bucket::new().add_name(name.as_ref())
}
}
impl MetricAggregator {
impl Bucket {
/// Build a new metric aggregation
pub fn new() -> MetricAggregator {
MetricAggregator {
pub fn new() -> Bucket {
Bucket {
attributes: Attributes::default(),
inner: Arc::new(RwLock::new(InnerAggregator {
inner: Arc::new(RwLock::new(InnerBucket {
metrics: BTreeMap::new(),
period_start: TimeHandle::now(),
stats: None,
@ -123,7 +123,7 @@ impl MetricAggregator {
/// Set the default aggregated metrics statistics generator.
pub fn set_default_stats<F>(func: F)
where
F: Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
{
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func)
}
@ -134,7 +134,7 @@ impl MetricAggregator {
}
/// Install a new receiver for all aggregateed metrics, replacing any previous receiver.
pub fn set_default_output(default_config: impl OpenScope + Send + Sync + 'static) {
pub fn set_default_output(default_config: impl OutputDyn + Send + Sync + 'static) {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(default_config);
}
@ -146,7 +146,7 @@ impl MetricAggregator {
/// Set the default aggregated metrics statistics generator.
pub fn set_stats<F>(&self, func: F)
where
F: Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
{
self.inner.write().expect("Aggregator").stats = Some(Arc::new(func))
}
@ -157,7 +157,7 @@ impl MetricAggregator {
}
/// Install a new receiver for all aggregated metrics, replacing any previous receiver.
pub fn set_output(&self, new_config: impl OpenScope + Send + Sync + 'static) {
pub fn set_output(&self, new_config: impl OutputDyn + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config))
}
@ -167,14 +167,14 @@ impl MetricAggregator {
}
/// Flush the aggregator scores using the specified scope and stats.
pub fn flush_to(&self, publish_scope: &MetricInput, stats_fn: &StatsFn) {
pub fn flush_to(&self, publish_scope: &Input, stats_fn: &StatsFn) {
let mut inner = self.inner.write().expect("Aggregator");
inner.flush_to(publish_scope, stats_fn);
}
// /// Discard scores for ad-hoc metrics.
// pub fn cleanup(&self) {
// let orphans: Vec<Namespace> = self.inner.read().expect("Aggregator").metrics.iter()
// let orphans: Vec<Name> = self.inner.read().expect("Aggregator").metrics.iter()
// // is aggregator now the sole owner?
// // TODO use weak ref + impl Drop to mark abandoned metrics (see dispatch)
// .filter(|&(_k, v)| Arc::strong_count(v) == 1)
@ -190,9 +190,9 @@ impl MetricAggregator {
}
impl MetricInput for MetricAggregator {
impl Input for Bucket {
/// Lookup or create a scoreboard for the requested metric.
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let scoreb = self.inner
.write()
.expect("Aggregator")
@ -204,12 +204,12 @@ impl MetricInput for MetricAggregator {
}
}
impl WithAttributes for MetricAggregator {
impl WithAttributes for Bucket {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Flush for MetricAggregator {
impl Flush for Bucket {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) -> error::Result<()> {
@ -221,8 +221,8 @@ impl Flush for MetricAggregator {
};
let pub_scope = match &inner.output {
&Some(ref out) => out.open_scope(),
&None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().open_scope(),
&Some(ref out) => out.new_input_dyn(),
&None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().new_input_dyn(),
};
inner.flush_to(pub_scope.as_ref(), stats_fn.as_ref());
@ -236,14 +236,14 @@ impl Flush for MetricAggregator {
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
#[allow(dead_code)]
pub fn all_stats(kind: Kind, name: Namespace, score: ScoreType) -> Option<(Kind, Namespace, Value)> {
pub fn all_stats(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match score {
Count(hit) => Some((Counter, name.with_prefix("count"), hit)),
Sum(sum) => Some((kind, name.with_prefix("sum"), sum)),
Mean(mean) => Some((kind, name.with_prefix("mean"), mean.round() as Value)),
Max(max) => Some((Gauge, name.with_prefix("max"), max)),
Min(min) => Some((Gauge, name.with_prefix("min"), min)),
Rate(rate) => Some((Gauge, name.with_prefix("rate"), rate.round() as Value)),
Count(hit) => Some((Counter, name.add_name("count"), hit)),
Sum(sum) => Some((kind, name.add_name("sum"), sum)),
Mean(mean) => Some((kind, name.add_name("mean"), mean.round() as Value)),
Max(max) => Some((Gauge, name.add_name("max"), max)),
Min(min) => Some((Gauge, name.add_name("min"), min)),
Rate(rate) => Some((Gauge, name.add_name("rate"), rate.round() as Value)),
}
}
@ -252,7 +252,7 @@ pub fn all_stats(kind: Kind, name: Namespace, score: ScoreType) -> Option<(Kind,
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn average(kind: Kind, name: Namespace, score: ScoreType) -> Option<(Kind, Namespace, Value)> {
pub fn average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Marker => match score {
Count(count) => Some((Counter, name, count)),
@ -272,7 +272,7 @@ pub fn average(kind: Kind, name: Namespace, score: ScoreType) -> Option<(Kind, N
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn summary(kind: Kind, name: Namespace, score: ScoreType) -> Option<(Kind, Namespace, Value)> {
pub fn summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Marker => match score {
Count(count) => Some((Counter, name, count)),
@ -294,19 +294,19 @@ mod bench {
use test;
use core::*;
use aggregate::MetricAggregator;
use bucket::Bucket;
#[bench]
fn aggregate_marker(b: &mut test::Bencher) {
let sink = MetricAggregator::new();
let metric = sink.define_metric(&"event_a".into(), Kind::Marker);
let sink = Bucket::new();
let metric = sink.new_metric("event_a".into(), Kind::Marker);
b.iter(|| test::black_box(metric.write(1)));
}
#[bench]
fn aggregate_counter(b: &mut test::Bencher) {
let sink = MetricAggregator::new();
let metric = sink.define_metric(&"count_a".into(), Kind::Counter);
let sink = Bucket::new();
let metric = sink.new_metric("count_a".into(), Kind::Counter);
b.iter(|| test::black_box(metric.write(1)));
}
@ -315,7 +315,7 @@ mod bench {
#[cfg(test)]
mod test {
use core::*;
use aggregate::{MetricAggregator, all_stats, summary, average, StatsFn};
use bucket::{Bucket, all_stats, summary, average, StatsFn};
use clock::{mock_clock_advance, mock_clock_reset};
use map::StatsMap;
@ -325,7 +325,7 @@ mod test {
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, Value> {
mock_clock_reset();
let metrics = MetricAggregator::new().with_prefix("test");
let metrics = Bucket::new().add_name("test");
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");

View File

@ -21,7 +21,7 @@ pub fn add_cache<M>(cache_size: usize, next: DefineMetricFn<M>) -> DefineMetricF
where
M: Clone + Send + Sync + 'static,
{
let cache: RwLock<lru::LRUCache<Namespace, M>> =
let cache: RwLock<lru::LRUCache<Name, M>> =
RwLock::new(lru::LRUCache::with_capacity(cache_size));
Arc::new(move |name, kind, rate| {
let mut cache = cache.write().expect("Metric Cache");

View File

@ -6,7 +6,7 @@ use clock::TimeHandle;
use scheduler::{set_schedule, CancelHandle};
use std::time::Duration;
use std::sync::Arc;
use std::ops::Deref;
use std::ops;
use text;
use error;
@ -73,7 +73,7 @@ impl Default for Buffering {
/// Field access must go through `is_` and `get_` methods declared in sub-traits.
#[derive(Debug, Clone, Default)]
pub struct Attributes {
namespace: Namespace,
namespace: Name,
sampling_rate: Sampling,
buffering: Buffering,
}
@ -100,33 +100,31 @@ pub trait WithAttributes: Clone {
}
}
/// Namespace operations support.
pub trait WithPrefix {
/// Name operations support.
pub trait WithName {
/// Return the namespace of the component.
fn get_namespace(&self) -> &Namespace;
fn get_namespace(&self) -> &Name;
/// Join namespace and prepend in newly defined metrics.
fn with_prefix(&self, name: &str) -> Self;
fn add_name(&self, name: &str) -> Self;
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, metric_name: &Namespace) -> Namespace;
fn qualified_name(&self, metric_name: Name) -> Name;
}
impl<T: WithAttributes> WithPrefix for T {
fn get_namespace(&self) -> &Namespace {
impl<T: WithAttributes> WithName for T {
fn get_namespace(&self) -> &Name {
&self.get_attributes().namespace
}
/// Join namespace and prepend in newly defined metrics.
fn with_prefix(&self, name: &str) -> Self {
self.with_attributes(|new_attr| new_attr.namespace = new_attr.namespace.with_prefix(name))
fn add_name(&self, name: &str) -> Self {
self.with_attributes(|new_attr| new_attr.namespace = new_attr.namespace.add_name(name))
}
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, metric_name: &Namespace) -> Namespace {
let mut full_name = self.get_attributes().namespace.clone();
full_name.extend(metric_name);
full_name
fn qualified_name(&self, name: Name) -> Name {
self.get_attributes().namespace.add_name(name)
}
}
@ -170,49 +168,49 @@ pub trait WithBuffering: WithAttributes {
/// Does _not_ include the metric's "short" name itself.
/// Can be empty.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
pub struct Namespace {
pub struct Name {
inner: Vec<String>,
}
impl Namespace {
impl Name {
/// Returns true if this namespace contains no elements.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
// /// Returns true if this namespace contains no elements.
// pub fn is_empty(&self) -> bool {
// self.inner.is_empty()
// }
/// Append a component to the name.
pub fn push(&mut self, name: impl Into<String>) {
self.inner.push(name.into())
}
// /// Append a component to the name.
// pub fn push(&mut self, name: impl Into<String>) {
// self.inner.push(name.into())
// }
/// Concatenate with another namespace into a new one.
pub fn with_prefix(&self, prefix: &str) -> Self {
pub fn add_name(&self, name: impl Into<Name>) -> Self {
let mut cloned = self.clone();
cloned.push(prefix);
cloned.inner.extend_from_slice(&name.into().inner);
cloned
}
/// Returns a copy of this namespace with the second namespace appended.
/// Both original namespaces stay untouched.
pub fn extend(&mut self, name: &Namespace) {
self.inner.extend_from_slice(&name.inner);
}
// pub fn extend(&mut self, name: &Name) {
// self.inner.extend_from_slice(&name.inner);
// }
/// Returns true if the specified namespace is a subset or is equal to this namespace.
pub fn starts_with(&self, name: &Namespace) -> bool {
pub fn starts_with(&self, name: &Name) -> bool {
(self.inner.len() >= name.inner.len()) && (name.inner[..] == self.inner[..name.inner.len()])
}
/// Remove the last part of the namespace, returning it or None if namespace was empty.
pub fn pop(&mut self) -> Option<String> {
self.inner.pop()
}
// /// Remove the last part of the namespace, returning it or None if namespace was empty.
// pub fn pop(&mut self) -> Option<String> {
// self.inner.pop()
// }
/// Returns the number of substrings constituting this namespace.
pub fn len(&self) -> usize {
self.inner.len()
}
// /// Returns the number of substrings constituting this namespace.
// pub fn len(&self) -> usize {
// self.inner.len()
// }
/// Combine name parts into a string.
pub fn join(&self, separator: &str) -> String {
@ -232,70 +230,85 @@ impl Namespace {
}
}
impl<S: Into<String>> From<S> for Namespace {
fn from(name: S) -> Namespace {
impl ops::Deref for Name {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ops::DerefMut for Name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<S: Into<String>> From<S> for Name {
fn from(name: S) -> Name {
let name: String = name.into();
if name.is_empty() {
Namespace::default()
Name::default()
} else {
Namespace { inner: vec![name] }
Name { inner: vec![name] }
}
}
}
/// A function trait that opens a new metric capture scope.
pub trait MetricOutput: OpenScope {
pub trait Output: OutputDyn {
/// Type of input scope provided by this output.
type Input: MetricInput + 'static;
type Input: Input + 'static;
/// Get an input scope for this metric output.
fn open(&self) -> Self::Input;
/// Open a new input scope from this output.
fn new_input(&self) -> Self::Input;
}
/// Wrap a MetricConfig in a non-generic trait.
pub trait OpenScope {
/// Open a new metrics scope
fn open_scope(&self) -> Arc<MetricInput + Send + Sync + 'static>;
/// Dynamic variant of the Output trait
pub trait OutputDyn {
/// Open a new metric input with dynamic typing.
fn new_input_dyn(&self) -> Arc<Input + Send + Sync + 'static>;
}
/// Blanket impl that provides all MetricOuputs their "trait object flavor"
impl<T: MetricOutput + Send + Sync + 'static> OpenScope for T {
fn open_scope(&self) -> Arc<MetricInput + Send + Sync + 'static> {
Arc::new(self.open())
/// Blanket impl that provides Outputs their dynamic flavor.
impl<T: Output + Send + Sync + 'static> OutputDyn for T {
fn new_input_dyn(&self) -> Arc<Input + Send + Sync + 'static> {
Arc::new(self.new_input())
}
}
lazy_static! {
/// The reference instance identifying an uninitialized metric config.
pub static ref NO_METRIC_OUTPUT: Arc<OpenScope + Send + Sync> = Arc::new(text::to_void());
pub static ref NO_METRIC_OUTPUT: Arc<OutputDyn + Send + Sync> = Arc::new(text::to_void());
/// The reference instance identifying an uninitialized metric scope.
pub static ref NO_METRIC_SCOPE: Arc<MetricInput + Send + Sync> = NO_METRIC_OUTPUT.open_scope();
pub static ref NO_METRIC_SCOPE: Arc<Input + Send + Sync> = NO_METRIC_OUTPUT.new_input_dyn();
}
/// Define metrics, write values and flush them.
pub trait MetricInput: Send + Sync + Flush {
pub trait Input: Send + Sync + Flush {
/// Define a metric of the specified type.
fn define_metric(&self, namespace: &Namespace, kind: Kind) -> WriteFn;
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn;
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.define_metric(&name.into(), Kind::Counter).into()
self.new_metric(name.into(), Kind::Counter).into()
}
/// Define a marker.
fn marker(&self, name: &str) -> Marker {
self.define_metric(&name.into(), Kind::Marker).into()
self.new_metric(name.into(), Kind::Marker).into()
}
/// Define a timer.
fn timer(&self, name: &str) -> Timer {
self.define_metric(&name.into(), Kind::Timer).into()
self.new_metric(name.into(), Kind::Timer).into()
}
/// Define a gauge.
fn gauge(&self, name: &str) -> Gauge {
self.define_metric(&name.into(), Kind::Gauge).into()
self.new_metric(name.into(), Kind::Gauge).into()
}
}
@ -345,7 +358,7 @@ impl WriteFn {
}
}
impl Deref for WriteFn {
impl ops::Deref for WriteFn {
type Target = (Fn(Value) + Send + Sync);
fn deref(&self) -> &Self::Target {
@ -533,7 +546,7 @@ mod bench {
use core::*;
use clock::TimeHandle;
use test;
use aggregate::MetricAggregator;
use bucket::Bucket;
#[bench]
fn get_instant(b: &mut test::Bencher) {
@ -542,7 +555,7 @@ mod bench {
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = MetricAggregator::new();
let metrics = Bucket::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}

View File

@ -2,22 +2,22 @@ use core::*;
//use async_queue::WithAsyncQueue;
//use sample::WithSamplingRate;
use aggregate::MetricAggregator;
use bucket::Bucket;
/// Backward compatibility alias.
pub type Aggregate = MetricAggregator;
pub type Aggregate = Bucket;
///// Aggregate metrics in memory.
///// Depending on the type of metric, count, sum, minimum and maximum of values will be tracked.
///// Needs to be connected to a publish to be useful.
//#[deprecated(since = "0.7.0", note = "Use `MetricAggregator::new()` instead.")]
//pub fn aggregate<M, E, P>(stats_fn: E, pub_scope: P) -> MetricAggregator
//#[deprecated(since = "0.7.0", note = "Use `Bucket::new()` instead.")]
//pub fn aggregate<M, E, P>(stats_fn: E, pub_scope: P) -> Bucket
// where
// E: Fn(Kind, Namespace, ScoreType) -> Option<(Kind, Namespace, Value)> + Send + Sync + 'static,
// E: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static,
// P: Into<MetricOutput<M>>,
// M: Send + Sync + 'static + Clone,
//{
// let agg = MetricAggregator::new();
// let agg = Bucket::new();
// agg.set_stats(stats_fn);
// agg.set_output(pub_scope);
// agg
@ -58,7 +58,7 @@ pub type Aggregate = MetricAggregator;
/// Help transition to new syntax
#[deprecated(since = "0.7.0", note = "Use Metrics instead")]
pub type AppMetrics = MetricInput;
pub type AppMetrics = Input;
/// Help transition to new syntax
#[deprecated(since = "0.7.0", note = "Use Marker instead")]
@ -203,7 +203,7 @@ mod legacy_test {
use self_metrics::*;
use deprecated::*;
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.with_prefix("test_prefix"));
metrics!(<Aggregate> TEST_METRICS = DIPSTICK_METRICS.add_name("test_prefix"));
app_marker!(<Aggregate> TEST_METRICS => {
M1: "failed",

View File

@ -5,7 +5,7 @@ use std::error;
use std::fmt::{self, Display, Formatter};
use std::result;
use std::sync::mpsc;
use async_queue;
use async;
use self::Error::*;
/// Any error that may result from dipstick usage.
@ -14,7 +14,7 @@ pub enum Error {
/// A generic I/O error.
IO(io::Error),
/// An error from the async metric queue.
Async(mpsc::SendError<async_queue::AsyncCmd>)
Async(mpsc::SendError<async::AsyncCmd>)
}
impl Display for Error {
@ -51,8 +51,8 @@ impl From<io::Error> for Error {
}
}
impl From<mpsc::SendError<async_queue::AsyncCmd>> for Error {
fn from(err: mpsc::SendError<async_queue::AsyncCmd>) -> Self {
impl From<mpsc::SendError<async::AsyncCmd>> for Error {
fn from(err: mpsc::SendError<async::AsyncCmd>) -> Self {
Async(err)
}
}

View File

@ -1,7 +1,7 @@
//! Send metrics to a graphite server.
use core::*;
use aggregate::*;
use bucket::*;
use error;
use self_metrics::DIPSTICK_METRICS;
@ -15,7 +15,7 @@ use std::fmt::Debug;
use socket::RetrySocket;
metrics!{
<MetricAggregator> DIPSTICK_METRICS.with_prefix("graphite") => {
<Bucket> DIPSTICK_METRICS.add_name("graphite") => {
Marker SEND_ERR: "send_failed";
Marker TRESHOLD_EXCEEDED: "bufsize_exceeded";
Counter SENT_BYTES: "sent_bytes";
@ -29,11 +29,11 @@ pub struct GraphiteOutput {
buffered: bool,
}
impl MetricOutput for GraphiteOutput {
impl Output for GraphiteOutput {
type Input = GraphiteInput;
fn open(&self) -> GraphiteInput {
fn new_input(&self) -> GraphiteInput {
GraphiteInput {
attributes: self.attributes.clone(),
buffer: ScopeBuffer {
@ -50,16 +50,16 @@ impl WithAttributes for GraphiteOutput {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// Graphite MetricInput
/// Graphite Input
#[derive(Debug, Clone)]
pub struct GraphiteInput {
attributes: Attributes,
buffer: ScopeBuffer,
}
impl MetricInput for GraphiteInput {
impl Input for GraphiteInput {
/// Define a metric of the specified type.
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let mut prefix = self.qualified_name(name).join(".");
prefix.push(' ');
@ -212,16 +212,16 @@ mod bench {
#[bench]
pub fn unbuffered_graphite(b: &mut test::Bencher) {
let sd = to_graphite("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(&"timer".into(), Kind::Timer);
let sd = to_graphite("localhost:8125").unwrap().new_input_dyn();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000)));
}
#[bench]
pub fn buffered_graphite(b: &mut test::Bencher) {
let sd = to_buffered_graphite("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(&"timer".into(), Kind::Timer);
let sd = to_buffered_graphite("localhost:8125").unwrap().new_input_dyn();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000)));
}

View File

@ -24,17 +24,17 @@ pub mod error;
pub use error::{Error, Result};
pub mod core;
pub use core::{Value, Kind, Namespace, WithPrefix, Marker, Timer, Counter, Gauge, MetricInput, Flush,
MetricOutput, NO_METRIC_OUTPUT, OpenScope, ScheduleFlush, WithSamplingRate, Sampling, WithBuffering, Buffering};
pub use core::{Value, Kind, Name, WithName, Marker, Timer, Counter, Gauge, Input, Flush,
Output, NO_METRIC_OUTPUT, OutputDyn, ScheduleFlush, WithSamplingRate, Sampling, WithBuffering, Buffering};
#[macro_use]
pub mod macros;
pub mod dispatch;
pub use dispatch::{MetricDispatch, to_dispatch, ROOT_DISPATCH};
pub mod proxy;
pub use proxy::{InputProxy, ROOT_PROXY};
mod aggregate;
pub use aggregate::{MetricAggregator, to_aggregate, summary, all_stats, average};
mod bucket;
pub use bucket::{Bucket, to_aggregate, summary, all_stats, average};
mod text;
pub use text::{to_buffered_stdout, to_stdout, TextOutput, BufferedTextOutput, BufferedTextInput};
@ -69,8 +69,8 @@ pub use socket::RetrySocket;
mod multi;
pub use multi::{MultiOutput, MultiInput, to_multi};
mod async_queue;
pub use async_queue::{AsyncInput, to_async};
mod async;
pub use async::{AsyncInput, to_async};
mod scheduler;
pub use scheduler::{set_schedule, CancelHandle};

View File

@ -1,4 +1,4 @@
use core::{Namespace, WithPrefix, Value, WriteFn, Kind, MetricOutput, MetricInput, Flush, WithAttributes, Attributes};
use core::{Name, WithName, Value, WriteFn, Kind, Output, Input, Flush, WithAttributes, Attributes};
use error;
use std::sync::{RwLock, Arc};
use text;
@ -10,14 +10,14 @@ use log;
#[derive(Clone)]
pub struct LogOutput {
attributes: Attributes,
format_fn: Arc<Fn(&Namespace, Kind) -> Vec<String> + Send + Sync>,
format_fn: Arc<Fn(&Name, Kind) -> Vec<String> + Send + Sync>,
print_fn: Arc<Fn(&mut Vec<u8>, &[String], Value) -> error::Result<()> + Send + Sync>,
}
impl MetricOutput for LogOutput {
impl Output for LogOutput {
type Input = LogOutput;
fn open(&self) -> Self::Input {
fn new_input(&self) -> Self::Input {
self.clone()
}
}
@ -27,8 +27,8 @@ impl WithAttributes for LogOutput {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl MetricInput for LogOutput {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
impl Input for LogOutput {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let template = (self.format_fn)(&name, kind);
@ -61,14 +61,14 @@ pub fn to_log() -> LogOutput {
#[derive(Clone)]
pub struct BufferedLogOutput {
attributes: Attributes,
format_fn: Arc<Fn(&Namespace, Kind) -> Vec<String> + Send + Sync>,
format_fn: Arc<Fn(&Name, Kind) -> Vec<String> + Send + Sync>,
buffer_print_fn: Arc<Fn(&mut Vec<u8>, &[String], Value) -> error::Result<()> + Send + Sync>,
}
impl MetricOutput for BufferedLogOutput {
impl Output for BufferedLogOutput {
type Input = BufferedLogInput;
fn open(&self) -> Self::Input {
fn new_input(&self) -> Self::Input {
BufferedLogInput {
attributes: self.attributes.clone(),
entries: Arc::new(RwLock::new(Vec::new())),
@ -95,8 +95,8 @@ impl WithAttributes for BufferedLogInput {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl MetricInput for BufferedLogInput {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
impl Input for BufferedLogInput {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let template = (self.output.format_fn)(&name, kind);
@ -148,8 +148,8 @@ mod test {
#[test]
fn test_to_log() {
let c = super::to_log().open_scope();
let m = c.define_metric(&"test".into(), Kind::Marker);
let c = super::to_log().new_input_dyn();
let m = c.new_metric("test".into(), Kind::Marker);
(m)(33);
}

View File

@ -53,22 +53,22 @@ macro_rules! metrics {
};
($(#[$attr:meta])* pub $METRIC_ID:ident = $e:expr $(;)*) => {
metrics! {$(#[$attr])* <MetricDispatch> pub $METRIC_ID = $e; }
metrics! {$(#[$attr])* <InputProxy> pub $METRIC_ID = $e; }
};
($(#[$attr:meta])* pub $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
metrics! {$(#[$attr])* <MetricDispatch> pub $METRIC_ID = $e => { $($REMAINING)* } }
metrics! {$(#[$attr])* <InputProxy> pub $METRIC_ID = $e => { $($REMAINING)* } }
};
($(#[$attr:meta])* $METRIC_ID:ident = $e:expr $(;)*) => {
metrics! {$(#[$attr])* <MetricDispatch> $METRIC_ID = $e; }
metrics! {$(#[$attr])* <InputProxy> $METRIC_ID = $e; }
};
($(#[$attr:meta])* $METRIC_ID:ident = $e:expr => { $($REMAINING:tt)+ }) => {
metrics! {$(#[$attr])* <MetricDispatch> $METRIC_ID = $e => { $($REMAINING)* } }
metrics! {$(#[$attr])* <InputProxy> $METRIC_ID = $e => { $($REMAINING)* } }
};
($(#[$attr:meta])* $METRIC_ID:ident => { $($REMAINING:tt)+ }) => {
metrics! {<MetricDispatch> $METRIC_ID => { $($REMAINING)* } }
metrics! {<InputProxy> $METRIC_ID => { $($REMAINING)* } }
};
($e:expr => { $($REMAINING:tt)+ }) => {
metrics! {<MetricDispatch> $e => { $($REMAINING)* } }
metrics! {<InputProxy> $e => { $($REMAINING)* } }
};
}
@ -132,10 +132,10 @@ macro_rules! __metrics_block {
#[cfg(test)]
mod test {
use core::*;
use aggregate::MetricAggregator;
use bucket::Bucket;
use self_metrics::*;
metrics!(<MetricAggregator> DIPSTICK_METRICS.with_prefix("test_prefix") => {
metrics!(<Bucket> DIPSTICK_METRICS.add_name("test_prefix") => {
Marker M1: "failed";
Marker M2: "success";
Counter C1: "failed";

View File

@ -1,4 +1,4 @@
use core::{Value, WriteFn, Kind, Namespace, MetricInput, Flush};
use core::{Value, WriteFn, Kind, Name, Input, Flush};
use std::sync::{Arc, RwLock};
use std::collections::BTreeMap;
@ -16,8 +16,8 @@ impl StatsMap {
}
}
impl MetricInput for StatsMap {
fn define_metric(&self, name: &Namespace, _kind: Kind) -> WriteFn {
impl Input for StatsMap {
fn new_metric(&self, name: Name, _kind: Kind) -> WriteFn {
let write_to = self.inner.clone();
let name: String = name.join(".");
WriteFn::new(move |value| {

View File

@ -1,6 +1,6 @@
//! Dispatch metrics to multiple sinks.
use core::{MetricOutput, MetricInput, Namespace, WithPrefix, OpenScope, Kind, WriteFn, Flush, WithAttributes, Attributes};
use core::{Output, Input, Name, WithName, OutputDyn, Kind, WriteFn, Flush, WithAttributes, Attributes};
use error;
use std::sync::Arc;
@ -8,7 +8,7 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct MultiOutput {
attributes: Attributes,
outputs: Vec<Arc<OpenScope + Send + Sync>>,
outputs: Vec<Arc<OutputDyn + Send + Sync>>,
}
/// Create a new multi-output.
@ -16,11 +16,11 @@ pub fn to_multi() -> MultiOutput {
MultiOutput::new()
}
impl MetricOutput for MultiOutput {
impl Output for MultiOutput {
type Input = MultiInput;
fn open(&self) -> Self::Input {
let inputs = self.outputs.iter().map(|out| out.open_scope()).collect();
fn new_input(&self) -> Self::Input {
let inputs = self.outputs.iter().map(|out| out.new_input_dyn()).collect();
MultiInput {
attributes: self.attributes.clone(),
inputs,
@ -38,7 +38,7 @@ impl MultiOutput {
}
/// Returns a clone of the dispatch with the new output added to the list.
pub fn with_output<O: OpenScope + Send + Sync + 'static>(&self, out: O) -> Self {
pub fn with_output<O: OutputDyn + Send + Sync + 'static>(&self, out: O) -> Self {
let mut cloned = self.clone();
cloned.outputs.push(Arc::new(out));
cloned
@ -54,13 +54,13 @@ impl WithAttributes for MultiOutput {
#[derive(Clone)]
pub struct MultiInput {
attributes: Attributes,
inputs: Vec<Arc<MetricInput + Send + Sync>>,
inputs: Vec<Arc<Input + Send + Sync>>,
}
impl MetricInput for MultiInput {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let write_fns: Vec<WriteFn> = self.inputs.iter().map(|input| input.define_metric(&name, kind)).collect();
impl Input for MultiInput {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let ref name = self.qualified_name(name);
let write_fns: Vec<WriteFn> = self.inputs.iter().map(move |input| input.new_metric(name.clone(), kind)).collect();
WriteFn::new(move |value| for w in &write_fns {
(w)(value)
})

View File

@ -16,7 +16,7 @@ use std::fmt::Debug;
use socket::RetrySocket;
metrics!{
<Aggregate> DIPSTICK_METRICS.with_prefix("prometheus") => {
<Aggregate> DIPSTICK_METRICS.add_name("prometheus") => {
Marker SEND_ERR: "send_failed";
Marker TRESHOLD_EXCEEDED: "bufsize_exceeded";
Counter SENT_BYTES: "sent_bytes";
@ -51,7 +51,7 @@ pub fn to_buffered_prometheus<ADDR>(address: ADDR) -> error::Result<MetricOutput
))
}
fn prometheus_metric(namespace: &Namespace, kind: Kind, rate: Sampling) -> Prometheus {
fn prometheus_metric(namespace: &Name, kind: Kind, rate: Sampling) -> Prometheus {
let mut prefix = namespace.join(".");
prefix.push(' ');
@ -180,16 +180,16 @@ mod bench {
#[bench]
pub fn unbufferd_prometheus(b: &mut test::Bencher) {
let sd = to_prometheus("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0);
let sd = to_prometheus("localhost:8125").unwrap().new_input_dyn();
let timer = sd.new_metric(&"timer".into(), Kind::Timer, 1000000.0);
b.iter(|| test::black_box(sd.write(&timer, 2000)));
}
#[bench]
pub fn buffered_prometheus(b: &mut test::Bencher) {
let sd = to_buffered_prometheus("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(&"timer".into(), Kind::Timer, 1000000.0);
let sd = to_buffered_prometheus("localhost:8125").unwrap().new_input_dyn();
let timer = sd.new_metric(&"timer".into(), Kind::Timer, 1000000.0);
b.iter(|| test::black_box(sd.write(&timer, 2000)));
}

View File

@ -1,6 +1,6 @@
//! Decouple metric definition from configuration with trait objects.
use core::{Namespace, WithPrefix, Kind, MetricInput, WriteFn, NO_METRIC_OUTPUT, Flush, WithAttributes, Attributes};
use core::{Name, WithName, Kind, Input, WriteFn, NO_METRIC_OUTPUT, Flush, WithAttributes, Attributes};
use error;
use std::collections::{HashMap, BTreeMap};
@ -12,36 +12,31 @@ lazy_static! {
/// Root of the default metrics dispatch, usable by all libraries and apps.
/// Libraries should create their metrics into sub subspaces of this.
/// Applications should configure on startup where the dispatched metrics should go.
/// Exceptionally, one can create its own MetricDispatch root, separate from this one.
pub static ref ROOT_DISPATCH: MetricDispatch = MetricDispatch::new();
}
/// Return the default dispatch's root.
pub fn to_dispatch() -> MetricDispatch {
ROOT_DISPATCH.clone()
/// Exceptionally, one can create its own InputProxy root, separate from this one.
pub static ref ROOT_PROXY: InputProxy = InputProxy::new();
}
/// A dynamically dispatched metric.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct MetricProxy {
struct ProxiedMetric {
// basic info for this metric, needed to recreate new corresponding trait object if target changes
name: Namespace,
name: Name,
kind: Kind,
// the metric trait object to dispatch metric values to
// the second part can be up to namespace.len() + 1 if this metric was individually targeted
// 0 if no target assigned
#[derivative(Debug = "ignore")]
write_metric: (AtomicRefCell<(WriteFn, usize)>),
target: (AtomicRefCell<(WriteFn, usize)>),
// a reference to the the parent dispatcher to remove the metric from when it is dropped
#[derivative(Debug = "ignore")]
dispatch: Arc<RwLock<InnerDispatch>>,
dispatch: Arc<RwLock<InnerProxy>>,
}
/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out.
impl Drop for MetricProxy {
impl Drop for ProxiedMetric {
fn drop(&mut self) {
self.dispatch.write().expect("Dispatch Lock").drop_metric(&self.name)
}
@ -52,18 +47,18 @@ impl Drop for MetricProxy {
/// Allows defining metrics before a concrete type has been selected.
/// Allows replacing metrics backend on the fly at runtime.
#[derive(Clone)]
pub struct MetricDispatch {
pub struct InputProxy {
attributes: Attributes,
inner: Arc<RwLock<InnerDispatch>>,
inner: Arc<RwLock<InnerProxy>>,
}
struct InnerDispatch {
struct InnerProxy {
// namespaces can target one, many or no metrics
targets: HashMap<Namespace, Arc<MetricInput + Send + Sync>>,
targets: HashMap<Name, Arc<Input + Send + Sync>>,
// last part of the namespace is the metric's name
metrics: BTreeMap<Namespace, Weak<MetricProxy>>,
metrics: BTreeMap<Name, Weak<ProxiedMetric>>,
}
impl InnerDispatch {
impl InnerProxy {
fn new() -> Self {
Self {
@ -72,7 +67,7 @@ impl InnerDispatch {
}
}
fn set_target(&mut self, target_name: Namespace, target_scope: Arc<MetricInput + Send + Sync>) {
fn set_target(&mut self, target_name: Name, target_scope: Arc<Input + Send + Sync>) {
self.targets.insert(target_name.clone(), target_scope.clone());
for (metric_name, metric) in self.metrics.range_mut(target_name.clone()..) {
if let Some(metric) = metric.upgrade() {
@ -80,22 +75,21 @@ impl InnerDispatch {
if !metric_name.starts_with(&target_name) { break }
// check if metric targeted by _lower_ namespace
if metric.write_metric.borrow().1 > target_name.len() { continue }
if metric.target.borrow().1 > target_name.len() { continue }
let target_metric = target_scope.define_metric(&metric.name, metric.kind);
*metric.write_metric.borrow_mut() = (target_metric, target_name.len());
let target_metric = target_scope.new_metric(metric.name.clone(), metric.kind);
*metric.target.borrow_mut() = (target_metric, target_name.len());
}
}
}
fn get_effective_target(&self, namespace: &Namespace)
-> Option<(Arc<MetricInput + Send + Sync>, usize)> {
if let Some(target) = self.targets.get(namespace) {
return Some((target.clone(), namespace.len()));
fn get_effective_target(&self, name: &Name) -> Option<(Arc<Input + Send + Sync>, usize)> {
if let Some(target) = self.targets.get(name) {
return Some((target.clone(), name.len()));
}
// no 1:1 match, scan upper namespaces
let mut name = namespace.clone();
let mut name = name.clone();
while let Some(_popped) = name.pop() {
if let Some(target) = self.targets.get(&name) {
return Some((target.clone(), name.len()))
@ -104,14 +98,14 @@ impl InnerDispatch {
None
}
fn unset_target(&mut self, namespace: &Namespace) {
fn unset_target(&mut self, namespace: &Name) {
if self.targets.remove(namespace).is_none() {
// nothing to do
return
}
let (up_target, up_nslen) = self.get_effective_target(namespace)
.unwrap_or_else(|| (NO_METRIC_OUTPUT.open_scope(), 0));
.unwrap_or_else(|| (NO_METRIC_OUTPUT.new_input_dyn(), 0));
// update all affected metrics to next upper targeted namespace
for (name, metric) in self.metrics.range_mut(namespace..) {
@ -120,21 +114,21 @@ impl InnerDispatch {
if let Some(mut metric) = metric.upgrade() {
// check if metric targeted by _lower_ namespace
if metric.write_metric.borrow().1 > namespace.len() { continue }
if metric.target.borrow().1 > namespace.len() { continue }
let new_metric = up_target.define_metric(name, metric.kind);
*metric.write_metric.borrow_mut() = (new_metric, up_nslen);
let new_metric = up_target.new_metric(name.clone(), metric.kind);
*metric.target.borrow_mut() = (new_metric, up_nslen);
}
}
}
fn drop_metric(&mut self, name: &Namespace) {
fn drop_metric(&mut self, name: &Name) {
if self.metrics.remove(name).is_none() {
panic!("Could not remove DelegatingMetric weak ref from delegation point")
}
}
fn flush(&self, namespace: &Namespace) -> error::Result<()> {
fn flush(&self, namespace: &Name) -> error::Result<()> {
if let Some((target, _nslen)) = self.get_effective_target(namespace) {
target.flush()
} else {
@ -144,7 +138,7 @@ impl InnerDispatch {
}
impl MetricDispatch {
impl InputProxy {
/// Create a new "private" metric dispatch root. This is usually not what you want.
/// Since this dispatch will not be part of the standard dispatch tree,
@ -152,14 +146,14 @@ impl MetricDispatch {
/// its existence this may never happen and metrics will not be dispatched anywhere.
/// If you want to use the standard dispatch tree, use #metric_dispatch() instead.
pub fn new() -> Self {
MetricDispatch {
InputProxy {
attributes: Attributes::default(),
inner: Arc::new(RwLock::new(InnerDispatch::new())),
inner: Arc::new(RwLock::new(InnerProxy::new())),
}
}
/// Replace target for this dispatch and it's children.
pub fn set_target<IS: MetricInput + Send + Sync + 'static>(&self, target: IS) {
pub fn set_target<IS: Input + Send + Sync + 'static>(&self, target: IS) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.set_target(self.get_namespace().clone(), Arc::new(target));
}
@ -172,15 +166,15 @@ impl MetricDispatch {
}
impl<S: AsRef<str>> From<S> for MetricDispatch {
fn from(name: S) -> MetricDispatch {
MetricDispatch::new().with_prefix(name.as_ref())
impl<S: AsRef<str>> From<S> for InputProxy {
fn from(name: S) -> InputProxy {
InputProxy::new().add_name(name.as_ref())
}
}
impl MetricInput for MetricDispatch {
impl Input for InputProxy {
/// Lookup or create a dispatch stub for the requested metric.
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let mut inner = self.inner.write().expect("Dispatch Lock");
let proxy = inner
@ -192,28 +186,28 @@ impl MetricInput for MetricDispatch {
let name2 = name.clone();
// not found, define new
let (target, target_namespace_length) = inner.get_effective_target(&name)
.unwrap_or_else(|| (NO_METRIC_OUTPUT.open_scope(), 0));
let metric_object = target.define_metric(&name, kind);
let proxy = Arc::new(MetricProxy {
.unwrap_or_else(|| (NO_METRIC_OUTPUT.new_input_dyn(), 0));
let metric_object = target.new_metric(name.clone(), kind);
let proxy = Arc::new(ProxiedMetric {
name,
kind,
write_metric: AtomicRefCell::new((metric_object, target_namespace_length)),
target: AtomicRefCell::new((metric_object, target_namespace_length)),
dispatch: self.inner.clone(),
});
inner.metrics.insert(name2, Arc::downgrade(&proxy));
proxy
});
WriteFn::new(move |value| (proxy.write_metric.borrow().0)(value))
WriteFn::new(move |value| (proxy.target.borrow().0)(value))
}
}
impl Flush for MetricDispatch {
impl Flush for InputProxy {
fn flush(&self) -> error::Result<()> {
self.inner.write().expect("Dispatch Lock").flush(self.get_namespace())
}
}
impl WithAttributes for MetricDispatch {
impl WithAttributes for InputProxy {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
@ -222,20 +216,20 @@ impl WithAttributes for MetricDispatch {
mod bench {
use core::*;
use dispatch::*;
use proxy::*;
use test;
use aggregate::MetricAggregator;
use bucket::Bucket;
#[bench]
fn dispatch_marker_to_aggregate(b: &mut test::Bencher) {
ROOT_DISPATCH.set_target(MetricAggregator::new());
let metric = ROOT_DISPATCH.marker("event_a");
ROOT_PROXY.set_target(Bucket::new());
let metric = ROOT_PROXY.marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}
#[bench]
fn dispatch_marker_to_void(b: &mut test::Bencher) {
let metric = ROOT_DISPATCH.marker("event_a");
let metric = ROOT_PROXY.marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}

View File

@ -2,13 +2,13 @@
//! Because the possibly high volume of data, this is pre-set to use aggregation.
//! This is also kept in a separate module because it is not to be exposed outside of the crate.
pub use aggregate::MetricAggregator;
pub use bucket::Bucket;
//lazy_static! {
// pub static ref DIPSTICK_METRICS: MetricAggregator = "dipstick".into();
// pub static ref DIPSTICK_METRICS: Bucket = "dipstick".into();
//}
metrics!{
/// Aggregator of dipstick's own internal metrics.
<MetricAggregator> pub DIPSTICK_METRICS = "dipstick";
<Bucket> pub DIPSTICK_METRICS = "dipstick";
}

View File

@ -1,11 +1,11 @@
//! Send metrics to a statsd server.
use core::{MetricInput, MetricOutput, Value, WriteFn, Attributes, WithAttributes, Kind,
Flush, Counter, Marker, Namespace, WithSamplingRate, WithPrefix, WithBuffering, Sampling};
use core::{Input, Output, Value, WriteFn, Attributes, WithAttributes, Kind,
Flush, Counter, Marker, Name, WithSamplingRate, WithName, WithBuffering, Sampling};
use pcg32;
use error;
use self_metrics::DIPSTICK_METRICS;
use aggregate::MetricAggregator;
use bucket::Bucket;
use std::net::UdpSocket;
use std::sync::{Arc, RwLock};
@ -13,7 +13,7 @@ use std::sync::{Arc, RwLock};
pub use std::net::ToSocketAddrs;
metrics! {
<MetricAggregator> DIPSTICK_METRICS.with_prefix("statsd") => {
<Bucket> DIPSTICK_METRICS.add_name("statsd") => {
Marker SEND_ERR: "send_failed";
Counter SENT_BYTES: "sent_bytes";
}
@ -39,9 +39,9 @@ pub fn to_statsd<ADDR: ToSocketAddrs>(address: ADDR) -> error::Result<StatsdOutp
})
}
impl MetricOutput for StatsdOutput {
impl Output for StatsdOutput {
type Input = StatsdInput;
fn open(&self) -> Self::Input {
fn new_input(&self) -> Self::Input {
StatsdInput {
attributes: self.attributes.clone(),
buffer: Arc::new(RwLock::new(InputBuffer {
@ -74,8 +74,8 @@ pub struct StatsdInput {
buffer: Arc<RwLock<InputBuffer>>,
}
impl MetricInput for StatsdInput {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
impl Input for StatsdInput {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let mut prefix = self.qualified_name(name).join(".");
prefix.push(':');
@ -204,8 +204,8 @@ mod bench {
#[bench]
pub fn timer_statsd(b: &mut test::Bencher) {
let sd = to_statsd("localhost:8125").unwrap().open_scope();
let timer = sd.define_metric(&"timer".into(), Kind::Timer);
let sd = to_statsd("localhost:8125").unwrap().new_input_dyn();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000)));
}

View File

@ -2,7 +2,7 @@
// TODO parameterize templates
// TODO define backing structs that can flush() on Drop
use core::{Namespace, WithPrefix, Value, WriteFn, Kind, MetricOutput, MetricInput, Flush, WithAttributes, Attributes};
use core::{Name, WithName, Value, WriteFn, Kind, Output, Input, Flush, WithAttributes, Attributes};
use error;
use std::sync::{RwLock, Arc};
use std::io::{Write, BufWriter, self};
@ -12,7 +12,7 @@ use std::io::{Write, BufWriter, self};
pub struct TextOutput<W: Write + Send + Sync + 'static> {
attributes: Attributes,
inner: Arc<RwLock<W>>,
format_fn: Arc<Fn(&Namespace, Kind) -> Vec<String> + Send + Sync>,
format_fn: Arc<Fn(&Name, Kind) -> Vec<String> + Send + Sync>,
print_fn: Arc<Fn(&mut W, &[String], Value) -> error::Result<()> + Send + Sync>,
}
@ -33,15 +33,15 @@ impl<W: Write + Send + Sync + 'static> WithAttributes for TextOutput<W> {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl<W: Write + Send + Sync + 'static> MetricOutput for TextOutput<W> {
impl<W: Write + Send + Sync + 'static> Output for TextOutput<W> {
type Input = TextOutput<W>;
fn open(&self) -> Self::Input {
fn new_input(&self) -> Self::Input {
self.clone()
}
}
impl<W: Write + Send + Sync + 'static> MetricInput for TextOutput<W> {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
impl<W: Write + Send + Sync + 'static> Input for TextOutput<W> {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let template = (self.format_fn)(&name, kind);
let print_fn = self.print_fn.clone();
@ -66,7 +66,7 @@ impl<W: Write + Send + Sync + 'static> Flush for TextOutput<W> {
pub struct BufferedTextOutput<W: Write + Send + Sync + 'static> {
attributes: Attributes,
inner: Arc<RwLock<W>>,
format_fn: Arc<Fn(&Namespace, Kind) -> Vec<String> + Send + Sync>,
format_fn: Arc<Fn(&Name, Kind) -> Vec<String> + Send + Sync>,
buffer_print_fn: Arc<Fn(&mut Vec<u8>, &[String], Value) -> error::Result<()> + Send + Sync>,
// flush_print_fn: Arc<Fn(&mut W, &mut [String]) -> error::Result<()> + Send + Sync>,
}
@ -88,11 +88,11 @@ impl<W: Write + Send + Sync + 'static> WithAttributes for BufferedTextOutput<W>
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl<W: Write + Send + Sync + 'static> MetricOutput for BufferedTextOutput<W> {
impl<W: Write + Send + Sync + 'static> Output for BufferedTextOutput<W> {
type Input = BufferedTextInput<W>;
fn open(&self) -> Self::Input {
fn new_input(&self) -> Self::Input {
BufferedTextInput {
attributes: self.attributes.clone(),
entries: Arc::new(RwLock::new(Vec::new())),
@ -123,8 +123,8 @@ impl<W: Write + Send + Sync + 'static> WithAttributes for BufferedTextInput<W> {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl<W: Write + Send + Sync + 'static> MetricInput for BufferedTextInput<W> {
fn define_metric(&self, name: &Namespace, kind: Kind) -> WriteFn {
impl<W: Write + Send + Sync + 'static> Input for BufferedTextInput<W> {
fn new_metric(&self, name: Name, kind: Kind) -> WriteFn {
let name = self.qualified_name(name);
let template = (self.output.format_fn)(&name, kind);
@ -166,7 +166,7 @@ pub fn to_stdout() -> TextOutput<io::Stdout> {
}
}
pub fn format_name(name: &Namespace, _kind: Kind) -> Vec<String> {
pub fn format_name(name: &Name, _kind: Kind) -> Vec<String> {
let mut z = name.join(".");
z.push_str(" ");
vec![z]
@ -200,16 +200,16 @@ pub fn to_buffered_stdout() -> BufferedTextOutput<BufWriter<io::Stdout>> {
#[derive(Clone)]
pub struct Void {}
impl MetricOutput for Void {
impl Output for Void {
type Input = Void;
fn open(&self) -> Void {
fn new_input(&self) -> Void {
self.clone()
}
}
impl MetricInput for Void {
fn define_metric(&self, _name: &Namespace, _kind: Kind) -> WriteFn {
impl Input for Void {
fn new_metric(&self, _name: Name, _kind: Kind) -> WriteFn {
WriteFn::new(|_value| {})
}
}
@ -227,15 +227,15 @@ mod test {
#[test]
fn sink_print() {
let c = super::to_stdout().open_scope();
let m = c.define_metric(&"test".into(), Kind::Marker);
let c = super::to_stdout().new_input_dyn();
let m = c.new_metric("test".into(), Kind::Marker);
(m)(33);
}
#[test]
fn test_to_void() {
let c = super::to_void().open_scope();
let m = c.define_metric(&"test".into(), Kind::Marker);
let c = super::to_void().new_input_dyn();
let m = c.new_metric("test".into(), Kind::Marker);
(m)(33);
}