This commit is contained in:
Francis Lalonde 2018-10-26 01:20:47 +00:00
parent 3360ba7b9e
commit ea812eb9a0
52 changed files with 564 additions and 535 deletions

View File

@ -11,7 +11,7 @@ minimal impact on applications and a choice of output to downstream systems.
Dipstick is a toolkit to help all sorts of application collect and send out metrics.
As such, it needs a bit of set up to suit one's needs.
Skimming through the handbook [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook)
Skimming through the [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook)
should help you get an idea of the possible configurations.
In short, dipstick-enabled apps _can_:

View File

@ -20,7 +20,7 @@ fn main() {
app_metrics.counter("just_once").count(4);
// metric names can be prepended with a common prefix
let prefixed_metrics = app_metrics.add_naming("subsystem");
let prefixed_metrics = app_metrics.add_prefix("subsystem");
let event = prefixed_metrics.marker("event_c");
let gauge = prefixed_metrics.gauge("gauge_d");

View File

@ -11,7 +11,7 @@ use std::env::args;
use std::str::FromStr;
fn main() {
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
let event = bucket.marker("a");
let args = &mut args();
args.next();

View File

@ -13,7 +13,7 @@ use std::str::FromStr;
fn main() {
let event = Proxy::default().marker("a");
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
Proxy::default().set_target(bucket.clone());

View File

@ -11,7 +11,7 @@ use std::env::args;
use std::str::FromStr;
fn main() {
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
let queue = InputQueueScope::wrap(bucket.clone(), 10000);
let event = queue.marker("a");
let args = &mut args();

View File

@ -7,11 +7,11 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let bucket = Bucket::new().add_naming("test");
let bucket = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
bucket.set_target(Graphite::send_to("localhost:2003").expect("Socket")
.add_naming("machine1").add_naming("application"));
bucket.set_flush_target(Graphite::send_to("localhost:2003").expect("Socket")
.add_prefix("machine1").add_prefix("application"));
bucket.flush_every(Duration::from_secs(3));

View File

@ -8,10 +8,10 @@ use std::io;
use dipstick::*;
fn main() {
let metrics = Bucket::new().add_naming("test");
let metrics = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
metrics.set_target(Stream::write_to(io::stdout()));
metrics.set_flush_target(Stream::write_to(io::stdout()));
metrics.flush_every(Duration::from_secs(3));

View File

@ -10,8 +10,8 @@ use std::thread::sleep;
fn main() {
let bucket = Bucket::new();
Bucket::set_default_target(Stream::write_to(io::stdout()));
let bucket = AtomicBucket::new();
AtomicBucket::set_default_target(Stream::write_to(io::stdout()));
let persistent_marker = bucket.marker("persistent");

View File

@ -9,8 +9,8 @@ use std::io;
fn main() {
let app_metrics = Bucket::new();
app_metrics.set_target(Stream::write_to(io::stdout()));
let app_metrics = AtomicBucket::new();
app_metrics.set_flush_target(Stream::write_to(io::stdout()));
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -8,7 +8,7 @@ use std::io;
use dipstick::*;
fn main() {
let metrics = Stream::write_to(io::stdout()).cached(5).input().add_naming("cache");
let metrics = Stream::write_to(io::stdout()).cached(5).input().add_prefix("cache");
loop {
// report some ad-hoc metric values from our "application" loop

View File

@ -17,25 +17,25 @@ metrics!{
fn main() {
let one_minute = Bucket::new();
let one_minute = AtomicBucket::new();
one_minute.flush_every(Duration::from_secs(60));
let five_minutes = Bucket::new();
let five_minutes = AtomicBucket::new();
five_minutes.flush_every(Duration::from_secs(300));
let fifteen_minutes = Bucket::new();
let fifteen_minutes = AtomicBucket::new();
fifteen_minutes.flush_every(Duration::from_secs(900));
let all_buckets = MultiInputScope::new()
.add_target(one_minute)
.add_target(five_minutes)
.add_target(fifteen_minutes)
.add_naming("machine_name");
.add_prefix("machine_name");
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
Bucket::set_default_target(Stream::write_to(io::stdout()));
Bucket::set_default_stats(stats_all);
AtomicBucket::set_default_target(Stream::write_to(io::stdout()));
AtomicBucket::set_default_stats(stats_all);
loop {
COUNTER.count(17);

View File

@ -8,19 +8,19 @@ use dipstick::*;
fn main() {
fn custom_statistics(
kind: Kind,
mut name: Name,
kind: InputKind,
mut name: MetricName,
score: ScoreType,
) -> Option<(Kind, Name, Value)> {
) -> Option<(InputKind, MetricName, MetricValue)> {
match (kind, score) {
// do not export gauge scores
(Kind::Gauge, _) => None,
(InputKind::Gauge, _) => None,
// prepend and append to metric name
(_, ScoreType::Count(count)) => {
if let Some(last) = name.pop_back() {
Some((
Kind::Counter,
InputKind::Counter,
name.append("customized_add_prefix")
.append(format!("{}_and_a_suffix", last)),
count,
@ -42,10 +42,10 @@ fn main() {
}
// send application metrics to aggregator
Bucket::set_default_target(Stream::stderr());
Bucket::set_default_stats(custom_statistics);
AtomicBucket::set_default_target(Stream::stderr());
AtomicBucket::set_default_stats(custom_statistics);
let app_metrics = Bucket::new();
let app_metrics = AtomicBucket::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -10,7 +10,7 @@ fn main() {
let metrics =
Graphite::send_to("localhost:2003")
.expect("Connected")
.add_naming("my_app")
.add_prefix("my_app")
.input();
loop {

View File

@ -2,7 +2,7 @@
extern crate dipstick;
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Naming};
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Prefixed};
use std::time::Duration;
use std::io;
@ -15,9 +15,9 @@ fn main() {
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiInput::input()
.add_target(Stream::write_to(io::stdout()).add_naming("yeah"))
.add_target(Stream::write_to(io::stdout()).add_naming("ouch"))
.add_naming("cool")
.add_target(Stream::write_to(io::stdout()).add_prefix("yeah"))
.add_target(Stream::write_to(io::stdout()).add_prefix("ouch"))
.add_prefix("cool")
.input();
loop {

View File

@ -15,13 +15,13 @@ fn main() {
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiOutput::output()
.add_target(Stream::write_to(io::stderr()).add_naming("out_1"))
.add_target(Stream::write_to(io::stderr()).add_naming("out_2"))
.add_naming("out_both").input();
.add_target(Stream::write_to(io::stderr()).add_prefix("out_1"))
.add_target(Stream::write_to(io::stderr()).add_prefix("out_2"))
.add_prefix("out_both").input();
loop {
different_type_metrics.new_metric("counter_a".into(), Kind::Counter).write(123, labels![]);
same_type_metrics.new_metric("timer_a".into(), Kind::Timer).write(6677, labels![]);
different_type_metrics.new_metric("counter_a".into(), InputKind::Counter).write(123, labels![]);
same_type_metrics.new_metric("timer_a".into(), InputKind::Timer).write(6677, labels![]);
std::thread::sleep(Duration::from_millis(400));
}
}

View File

@ -4,12 +4,12 @@ extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Proxy, Stream, InputScope, Input, Naming};
use dipstick::{Proxy, Stream, InputScope, Input, Prefixed};
fn main() {
let root_proxy = Proxy::default();
let sub = root_proxy.add_naming("sub");
let sub = root_proxy.add_prefix("sub");
let count1 = root_proxy.counter("counter_a");
@ -22,12 +22,12 @@ fn main() {
count2.count(2);
// route every metric from the root to stdout with prefix "root"
root_proxy.set_target(stdout.add_naming("root"));
root_proxy.set_target(stdout.add_prefix("root"));
count1.count(3);
count2.count(4);
// route metrics from "sub" to stdout with prefix "mutant"
sub.set_target(stdout.add_naming("mutant"));
sub.set_target(stdout.add_prefix("mutant"));
count1.count(5);
count2.count(6);

View File

@ -17,7 +17,7 @@ pub fn raw_write() {
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(
"count_a".into(),
dipstick::Kind::Counter,
dipstick::InputKind::Counter,
);
counter.write(1, labels![]);
}

View File

@ -1,6 +1,5 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
//extern crate badlog;
extern crate dipstick;
use dipstick::*;
@ -11,7 +10,7 @@ fn main() {
Statsd::send_to("localhost:8125")
.expect("Connected")
// .with_sampling(Sampling::Random(0.2))
.add_naming("my_app")
.add_prefix("my_app")
.input();
let counter = metrics.counter("counter_a");

View File

@ -1,6 +1,5 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
//extern crate badlog;
extern crate dipstick;
use dipstick::*;
@ -11,7 +10,7 @@ fn main() {
Statsd::send_to("localhost:8125")
.expect("Connected")
.sampled(Sampling::Random(0.2))
.add_naming("my_app")
.add_prefix("my_app")
.input();
let counter = metrics.counter("counter_a");

View File

@ -1,18 +1,19 @@
//! A sample application asynchronously printing metrics to stdout.
//! Print metrics to stderr with custom formatter including a label.
extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Stream, InputScope, Input, Formatting, AppLabel,
Name, Kind, LineTemplate, LineFormat, LineOp, LabelOp};
MetricName, InputKind, LineTemplate, LineFormat, LineOp, LabelOp};
/// Generates template like "$METRIC $value $label_value["abc"]\n"
struct MyFormat;
impl LineFormat for MyFormat {
fn template(&self, name: &Name, _kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate {
vec![
LineOp::Literal(format!("{} ", name.join(".")).into()),
LineOp::Literal(format!("{} ", name.join(".")).to_uppercase().into()),
LineOp::ValueAsText,
LineOp::Literal(" ".into()),
LineOp::LabelExists("abc".into(),

View File

@ -1,11 +1,19 @@
# the dipstick handbook
**IN PROGRESS**
# The dipstick handbook
## table of contents
This handbook's purpose is to get you started instrumenting your apps with dipstick
and give an idea of what's possible.
## introduction
For more details, consult the [docs](https://docs.rs/dipstick/).
## static metrics macro
## Overview
To achieve it's flexibility, Dipstick decouples the metrics _inputs_ from the metric _outputs_.
For example, incrementing a counter in the application may not result in immediate output to a file or to the network.
Conversely, it is also possible that an app will output metrics data even though no values were recorded.
While this makes things generally simpler, it requires the programmer to decide beforehand how metrics will be handled.
## Static metrics
For speed and easier maintenance, metrics are usually defined statically:
@ -25,4 +33,4 @@ fn main() {
}
```
Metric definition macros are just `lazy_static!` wrappers.
(Metric definition macros are just `lazy_static!` wrappers.)

View File

@ -1,4 +1,28 @@
# input
# Input
Metrics input are the measurement instruments that are called from application code.
The inputs are high-level components that are assumed to be callable
from all contexts, regardless of threading, security, etc.
Each metric input has a name and a kind.
A metric's name is a short alphanumeric identifier.
A metric's kind can be one of four kinds:
- Counter
- Marker
- Timer
- Gauge
The actual flow of measured values varies depending on how the metrics backend has been configured.
Skip to the output section for more details on backend configuration.
## Counters and Markers
## Timers
## Gauges
## namespace

View File

@ -1,2 +0,0 @@
pub mod bucket;
pub mod scores;

View File

@ -1,174 +0,0 @@
use core::input::Kind;
use core::Value;
use std::mem;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::usize;
use self::ScoreType::*;
#[derive(Debug, Clone, Copy)]
/// Possibly aggregated scores.
pub enum ScoreType {
/// Number of times the metric was used.
Count(u64),
/// Sum of metric values reported.
Sum(u64),
/// Biggest value reported.
Max(u64),
/// Smallest value reported.
Min(u64),
/// Average value (hit count / sum, non-atomic)
Mean(f64),
/// Mean rate (hit count / period length in seconds, non-atomic)
Rate(f64),
}
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
pub struct Scoreboard {
/// The kind of metric
kind: Kind,
/// The actual recorded metric scores
scores: [AtomicUsize; 4],
}
impl Scoreboard {
/// Create a new Scoreboard to track summary values of a metric
pub fn new(kind: Kind) -> Self {
Scoreboard {
kind,
scores: unsafe { mem::transmute(Scoreboard::blank()) },
}
}
/// Returns the metric's kind.
pub fn metric_kind(&self) -> Kind {
self.kind
}
#[inline]
fn blank() -> [usize; 4] {
[0, 0, usize::MIN, usize::MAX]
}
/// Update scores with new value
pub fn update(&self, value: Value) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[0].fetch_add(1, AcqRel);
match self.kind {
Kind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
swap_if(&self.scores[2], value, |new, current| new > current);
swap_if(&self.scores[3], value, |new, current| new < current);
}
}
}
/// Reset scores to zero, return previous values
fn snapshot(&self, scores: &mut [usize; 4]) -> bool {
// NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies
scores[0] = self.scores[0].swap(0, AcqRel);
scores[1] = self.scores[1].swap(0, AcqRel);
// if hit count is zero, then no values were recorded.
if scores[0] == 0 {
return false;
}
scores[2] = self.scores[2].swap(usize::MIN, AcqRel);
scores[3] = self.scores[3].swap(usize::MAX, AcqRel);
true
}
/// Map raw scores (if any) to applicable statistics
pub fn reset(&self, duration_seconds: f64) -> Option<Vec<ScoreType>> {
let mut scores = Scoreboard::blank();
if self.snapshot(&mut scores) {
let mut snapshot = Vec::new();
match self.kind {
Kind::Marker => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Kind::Gauge => {
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
}
Kind::Timer => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Kind::Counter => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
}
Some(snapshot)
} else {
None
}
}
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) {
let mut current = counter.load(Acquire);
while compare(new_value, current) {
if counter.compare_and_swap(current, new_value, Release) == new_value {
// update successful
break;
}
// race detected, retry
current = counter.load(Acquire);
}
}
#[cfg(feature = "bench")]
mod bench {
use core::input::Kind;
use super::*;
use test;
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Marker);
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn update_count(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Counter);
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn empty_snapshot(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Counter);
let scores = &mut Scoreboard::blank();
b.iter(|| test::black_box(metric.snapshot(scores)));
}
}

View File

@ -1,21 +1,26 @@
//! Maintain aggregated metrics for deferred reporting,
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::{Name};
use core::input::{Kind, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::{MetricName};
use core::input::{InputKind, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output, output_none};
use core::clock::TimeHandle;
use core::{Value, Flush};
use aggregate::scores::{Scoreboard, ScoreType};
use core::{MetricValue, Flush};
use bucket::{ScoreType, stats_summary};
use bucket::ScoreType::*;
use core::error;
use std::mem;
use std::usize;
use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::sync::{Arc, RwLock};
use std::fmt;
use std::borrow::Borrow;
/// A function type to transform aggregated scores into publishable statistics.
pub type StatsFn = Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static;
pub type StatsFn = Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static;
fn initial_stats() -> &'static StatsFn {
&stats_summary
@ -34,21 +39,21 @@ lazy_static! {
/// Central aggregation structure.
/// Maintains a list of metrics for enumeration when used as source.
#[derive(Debug, Clone)]
pub struct Bucket {
pub struct AtomicBucket {
attributes: Attributes,
inner: Arc<RwLock<InnerBucket>>,
inner: Arc<RwLock<InnerAtomicBucket>>,
}
struct InnerBucket {
metrics: BTreeMap<Name, Arc<Scoreboard>>,
struct InnerAtomicBucket {
metrics: BTreeMap<MetricName, Arc<AtomicScores>>,
period_start: TimeHandle,
stats: Option<Arc<Fn(Kind, Name, ScoreType)
-> Option<(Kind, Name, Value)> + Send + Sync + 'static>>,
stats: Option<Arc<Fn(InputKind, MetricName, ScoreType)
-> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>,
output: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
}
impl fmt::Debug for InnerBucket {
impl fmt::Debug for InnerAtomicBucket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "metrics: {:?}", self.metrics)?;
write!(f, "period_start: {:?}", self.period_start)
@ -56,10 +61,10 @@ impl fmt::Debug for InnerBucket {
}
lazy_static! {
static ref PERIOD_LENGTH: Name = "_period_length".into();
static ref PERIOD_LENGTH: MetricName = "_period_length".into();
}
impl InnerBucket {
impl InnerAtomicBucket {
pub fn flush(&mut self) -> error::Result<()> {
let stats_fn = match self.stats {
@ -90,13 +95,13 @@ impl InnerBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> {
pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
self.period_start = now;
let mut snapshot: Vec<(&Name, Kind, Vec<ScoreType>)> = self.metrics.iter()
let mut snapshot: Vec<(&MetricName, InputKind, Vec<ScoreType>)> = self.metrics.iter()
.flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) {
Some((name, scores.metric_kind(), values))
} else {
@ -112,39 +117,41 @@ impl InnerBucket {
} else {
// TODO add switch for metadata such as PERIOD_LENGTH
if self.publish_metadata {
snapshot.push((&PERIOD_LENGTH, Kind::Timer, vec![ScoreType::Sum((duration_seconds * 1000.0) as u64)]));
snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as u64)]));
}
for metric in snapshot {
for score in metric.2 {
let filtered = (stats_fn)(metric.1, metric.0.clone(), score);
let filtered = stats(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: OutputMetric = publish_scope.new_metric(name, kind);
let metric: OutputMetric = target.new_metric(name, kind);
// TODO provide some bucket context through labels?
metric.write(value, labels![])
}
}
}
publish_scope.flush()
target.flush()
}
}
}
impl<S: AsRef<str>> From<S> for Bucket {
fn from(name: S) -> Bucket {
Bucket::new().add_naming(name.as_ref())
impl<S: AsRef<str>> From<S> for AtomicBucket {
fn from(name: S) -> AtomicBucket {
AtomicBucket::new().add_prefix(name.as_ref())
}
}
impl Bucket {
/// Build a new metric aggregation
pub fn new() -> Bucket {
Bucket {
impl AtomicBucket {
/// Build a new atomic bucket.
pub fn new() -> AtomicBucket {
AtomicBucket {
attributes: Attributes::default(),
inner: Arc::new(RwLock::new(InnerBucket {
inner: Arc::new(RwLock::new(InnerAtomicBucket {
metrics: BTreeMap::new(),
period_start: TimeHandle::now(),
stats: None,
output: None,
// TODO add API toggle for metadata publish
publish_metadata: false,
}))
}
@ -153,7 +160,7 @@ impl Bucket {
/// Set the default aggregated metrics statistics generator.
pub fn set_default_stats<F>(func: F)
where
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static
{
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func)
}
@ -176,7 +183,7 @@ impl Bucket {
/// Set the default aggregated metrics statistics generator.
pub fn set_stats<F>(&self, func: F)
where
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static
{
self.inner.write().expect("Aggregator").stats = Some(Arc::new(func))
}
@ -187,7 +194,7 @@ impl Bucket {
}
/// Install a new receiver for all aggregated metrics, replacing any previous receiver.
pub fn set_target(&self, new_config: impl Output + Send + Sync + 'static) {
pub fn set_flush_target(&self, new_config: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config))
}
@ -204,21 +211,21 @@ impl Bucket {
}
impl InputScope for Bucket {
/// Lookup or create a scoreboard for the requested metric.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let scoreb = self.inner
impl InputScope for AtomicBucket {
/// Lookup or create scores for the requested metric.
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let scores = self.inner
.write()
.expect("Aggregator")
.metrics
.entry(self.naming_append(name))
.or_insert_with(|| Arc::new(Scoreboard::new(kind)))
.entry(self.prefix_append(name))
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
.clone();
InputMetric::new(move |value, _labels| scoreb.update(value))
InputMetric::new(move |value, _labels| scores.update(value))
}
}
impl Flush for Bucket {
impl Flush for AtomicBucket {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) -> error::Result<()> {
@ -227,64 +234,127 @@ impl Flush for Bucket {
}
}
impl WithAttributes for Bucket {
impl WithAttributes for AtomicBucket {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
#[allow(dead_code)]
pub fn stats_all(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match score {
ScoreType::Count(hit) => Some((Kind::Counter, name.make_name("count"), hit)),
ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)),
ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as Value)),
ScoreType::Max(max) => Some((Kind::Gauge, name.make_name("max"), max)),
ScoreType::Min(min) => Some((Kind::Gauge, name.make_name("min"), min)),
ScoreType::Rate(rate) => Some((Kind::Gauge, name.make_name("rate"), rate.round() as Value)),
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
struct AtomicScores {
/// The kind of metric
kind: InputKind,
/// The actual recorded metric scores
scores: [AtomicUsize; 4],
}
impl AtomicScores {
/// Create new scores to track summary values of a metric
pub fn new(kind: InputKind) -> Self {
AtomicScores {
kind,
scores: unsafe { mem::transmute(AtomicScores::blank()) },
}
}
/// Returns the metric's kind.
pub fn metric_kind(&self) -> InputKind {
self.kind
}
#[inline]
fn blank() -> [usize; 4] {
[0, 0, usize::MIN, usize::MAX]
}
/// Update scores with new value
pub fn update(&self, value: MetricValue) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[0].fetch_add(1, AcqRel);
match self.kind {
InputKind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
swap_if(&self.scores[2], value, |new, current| new > current);
swap_if(&self.scores[3], value, |new, current| new < current);
}
}
}
/// Reset scores to zero, return previous values
fn snapshot(&self, scores: &mut [usize; 4]) -> bool {
// NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies
scores[0] = self.scores[0].swap(0, AcqRel);
scores[1] = self.scores[1].swap(0, AcqRel);
// if hit count is zero, then no values were recorded.
if scores[0] == 0 {
return false;
}
scores[2] = self.scores[2].swap(usize::MIN, AcqRel);
scores[3] = self.scores[3].swap(usize::MAX, AcqRel);
true
}
/// Map raw scores (if any) to applicable statistics
pub fn reset(&self, duration_seconds: f64) -> Option<Vec<ScoreType>> {
let mut scores = AtomicScores::blank();
if self.snapshot(&mut scores) {
let mut snapshot = Vec::new();
match self.kind {
InputKind::Marker => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
InputKind::Gauge => {
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
}
InputKind::Timer => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
InputKind::Counter => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
}
Some(snapshot)
} else {
None
}
}
}
/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
_ => match score {
ScoreType::Mean(avg) => Some((Kind::Gauge, name, avg.round() as Value)),
_ => None,
},
}
}
/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
Kind::Counter | Kind::Timer => match score {
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
Kind::Gauge => match score {
ScoreType::Mean(mean) => Some((Kind::Gauge, name, mean.round() as Value)),
_ => None,
},
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) {
let mut current = counter.load(Acquire);
while compare(new_value, current) {
if counter.compare_and_swap(current, new_value, Release) == new_value {
// update successful
break;
}
// race detected, retry
current = counter.load(Acquire);
}
}
@ -294,17 +364,36 @@ mod bench {
use test;
use super::*;
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Marker);
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn update_count(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Counter);
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn empty_snapshot(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Counter);
let scores = &mut AtomicScores::blank();
b.iter(|| test::black_box(metric.snapshot(scores)));
}
#[bench]
fn aggregate_marker(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("event_a".into(), Kind::Marker);
let sink = AtomicBucket::new();
let metric = sink.new_metric("event_a".into(), InputKind::Marker);
b.iter(|| test::black_box(metric.write(1, labels![])));
}
#[bench]
fn aggregate_counter(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("count_a".into(), Kind::Counter);
let sink = AtomicBucket::new();
let metric = sink.new_metric("count_a".into(), InputKind::Counter);
b.iter(|| test::black_box(metric.write(1, labels![])));
}
@ -313,16 +402,18 @@ mod bench {
#[cfg(test)]
mod test {
use super::*;
use bucket::{stats_all, stats_average, stats_summary};
use core::clock::{mock_clock_advance, mock_clock_reset};
use output::map::StatsMap;
use std::time::Duration;
use std::collections::BTreeMap;
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, Value> {
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, MetricValue> {
mock_clock_reset();
let metrics = Bucket::new().add_naming("test");
let metrics = AtomicBucket::new().add_prefix("test");
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");

84
src/bucket/mod.rs Executable file
View File

@ -0,0 +1,84 @@
pub mod atomic;
use core::input::InputKind;
use core::MetricValue;
use core::name::{MetricName};
/// Possibly aggregated scores.
#[derive(Debug, Clone, Copy)]
pub enum ScoreType {
/// Number of times the metric was used.
Count(u64),
/// Sum of metric values reported.
Sum(u64),
/// Biggest value reported.
Max(u64),
/// Smallest value reported.
Min(u64),
/// Average value (hit count / sum, non-atomic)
Mean(f64),
/// Mean rate (hit count / period length in seconds, non-atomic)
Rate(f64),
}
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
#[allow(dead_code)]
pub fn stats_all(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match score {
ScoreType::Count(hit) => Some((InputKind::Counter, name.make_name("count"), hit)),
ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)),
ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as MetricValue)),
ScoreType::Max(max) => Some((InputKind::Gauge, name.make_name("max"), max)),
ScoreType::Min(min) => Some((InputKind::Gauge, name.make_name("min"), min)),
ScoreType::Rate(rate) => Some((InputKind::Gauge, name.make_name("rate"), rate.round() as MetricValue)),
}
}
/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_average(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match kind {
InputKind::Marker => match score {
ScoreType::Count(count) => Some((InputKind::Counter, name, count)),
_ => None,
},
_ => match score {
ScoreType::Mean(avg) => Some((InputKind::Gauge, name, avg.round() as MetricValue)),
_ => None,
},
}
}
/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match kind {
InputKind::Marker => match score {
ScoreType::Count(count) => Some((InputKind::Counter, name, count)),
_ => None,
},
InputKind::Counter | InputKind::Timer => match score {
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
InputKind::Gauge => match score {
ScoreType::Mean(mean) => Some((InputKind::Gauge, name, mean.round() as MetricValue)),
_ => None,
},
}
}

14
src/cache/cache_in.rs vendored
View File

@ -1,9 +1,9 @@
//! Cache metric definitions.
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use cache::lru_cache as lru;
use core::error;
@ -24,7 +24,7 @@ pub trait CachedInput: Input + Send + Sync + 'static + Sized {
pub struct InputCache {
attributes: Attributes,
target: Arc<InputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, InputMetric>>>,
}
impl InputCache {
@ -61,7 +61,7 @@ impl Input for InputCache {
pub struct InputScopeCache {
attributes: Attributes,
target: Arc<InputScope + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, InputMetric>>>,
}
impl WithAttributes for InputScopeCache {
@ -70,8 +70,8 @@ impl WithAttributes for InputScopeCache {
}
impl InputScope for InputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let lookup = {
self.cache.write().expect("Cache Lock").get(&name).cloned()
};

View File

@ -1,10 +1,10 @@
//! Cache metric definitions.
use core::Flush;
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::input::Kind;
use core::input::InputKind;
use cache::lru_cache as lru;
use core::error;
@ -26,7 +26,7 @@ pub trait CachedOutput: Output + Send + Sync + 'static + Sized {
pub struct OutputCache {
attributes: Attributes,
target: Arc<OutputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, OutputMetric>>>,
}
impl OutputCache {
@ -63,7 +63,7 @@ impl Output for OutputCache {
pub struct OutputScopeCache {
attributes: Attributes,
target: Rc<OutputScope + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, OutputMetric>>>,
}
impl WithAttributes for OutputScopeCache {
@ -72,8 +72,8 @@ impl WithAttributes for OutputScopeCache {
}
impl OutputScope for OutputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_append(name);
let lookup = {
self.cache.write().expect("Cache Lock").get(&name).cloned()
};

View File

@ -1,6 +1,6 @@
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
use std::hash::Hash;
use std::collections::HashMap;

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::collections::{HashMap};
use core::name::{NameParts, Name};
use core::name::{NameParts, MetricName};
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
#[derive(Debug, Clone, Copy)]
@ -43,7 +43,7 @@ pub trait WithAttributes: Clone {
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn mut_attributes(&mut self) -> &mut Attributes;
/// Utility method. Clone a component and mutate its attributes at once.
/// Clone the component and mutate its attributes at once.
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self {
let mut cloned = self.clone();
(edit)(cloned.mut_attributes());
@ -52,21 +52,21 @@ pub trait WithAttributes: Clone {
}
/// Name operations support.
pub trait Naming {
pub trait Prefixed {
/// Returns namespace of component.
fn get_naming(&self) -> &NameParts;
fn get_prefixes(&self) -> &NameParts;
/// Join namespace and prepend in newly defined metrics.
fn add_naming<S: Into<String>>(&self, name: S) -> Self;
/// Extend the namespace metrics will be defined in.
fn add_prefix<S: Into<String>>(&self, name: S) -> Self;
/// Append any name parts to the name's namespace.
fn naming_append<S: Into<Name>>(&self, name: S) -> Name {
name.into().append(self.get_naming().clone())
fn prefix_append<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().append(self.get_prefixes().clone())
}
/// Prepend any name parts to the name's namespace.
fn naming_prepend<S: Into<Name>>(&self, name: S) -> Name {
name.into().prepend(self.get_naming().clone())
fn prefix_prepend<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().prepend(self.get_prefixes().clone())
}
}
@ -80,16 +80,16 @@ pub trait Label {
}
impl<T: WithAttributes> Naming for T {
impl<T: WithAttributes> Prefixed for T {
/// Returns namespace of component.
fn get_naming(&self) -> &NameParts {
fn get_prefixes(&self) -> &NameParts {
&self.get_attributes().naming
}
/// Adds a name part to any existing naming.
/// Return a clone of the component with the updated naming.
fn add_naming<S: Into<String>>(&self, name: S) -> Self {
fn add_prefix<S: Into<String>>(&self, name: S) -> Self {
let name = name.into();
self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone()))
}

View File

@ -6,7 +6,7 @@ use std::time::Duration;
use std::time::Instant;
use core::Value;
use core::MetricValue;
#[derive(Debug, Copy, Clone)]
/// A handle to the start time of a counter.
@ -21,13 +21,13 @@ impl TimeHandle {
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_us(self) -> Value {
pub fn elapsed_us(self) -> MetricValue {
let duration = now() - self.0;
duration.as_secs() * 1_000_000 + duration.subsec_micros() as Value
duration.as_secs() * 1_000_000 + duration.subsec_micros() as MetricValue
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_ms(self) -> Value {
pub fn elapsed_ms(self) -> MetricValue {
self.elapsed_us() / 1000
}
}

View File

@ -1,7 +1,7 @@
use core::clock::TimeHandle;
use core::{Value, Flush};
use core::name::Name;
use ::{Labels};
use core::{MetricValue, Flush};
use core::name::MetricName;
use core::label::Labels;
use std::sync::Arc;
use std::fmt;
@ -36,26 +36,26 @@ impl<T: Input + Send + Sync + 'static> InputDyn for T {
pub trait InputScope: Flush {
/// Define a generic metric of the specified type.
/// It is preferable to use counter() / marker() / timer() / gauge() methods.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric;
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric;
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.new_metric(name.into(), Kind::Counter).into()
self.new_metric(name.into(), InputKind::Counter).into()
}
/// Define a marker.
fn marker(&self, name: &str) -> Marker {
self.new_metric(name.into(), Kind::Marker).into()
self.new_metric(name.into(), InputKind::Marker).into()
}
/// Define a timer.
fn timer(&self, name: &str) -> Timer {
self.new_metric(name.into(), Kind::Timer).into()
self.new_metric(name.into(), InputKind::Timer).into()
}
/// Define a gauge.
fn gauge(&self, name: &str) -> Gauge {
self.new_metric(name.into(), Kind::Gauge).into()
self.new_metric(name.into(), InputKind::Gauge).into()
}
}
@ -63,7 +63,7 @@ pub trait InputScope: Flush {
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
inner: Arc<Fn(Value, Labels) + Send + Sync>
inner: Arc<Fn(MetricValue, Labels) + Send + Sync>
}
impl fmt::Debug for InputMetric {
@ -74,20 +74,20 @@ impl fmt::Debug for InputMetric {
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
InputMetric { inner: Arc::new(metric) }
}
/// Collect a new value for this metric.
#[inline]
pub fn write(&self, value: Value, labels: Labels) {
pub fn write(&self, value: MetricValue, labels: Labels) {
(self.inner)(value, labels)
}
}
/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Kind {
pub enum InputKind {
/// Handling one item at a time.
Marker,
/// Handling quantities or multiples.
@ -98,15 +98,15 @@ pub enum Kind {
Timer,
}
/// Used by the metrics! macro to obtain the Kind from the stringified type.
impl<'a> From<&'a str> for Kind {
fn from(s: &str) -> Kind {
/// Used by the metrics! macro to obtain the InputKind from the stringified type.
impl<'a> From<&'a str> for InputKind {
fn from(s: &str) -> InputKind {
match s {
"Marker" => Kind::Marker,
"Counter" => Kind::Counter,
"Gauge" => Kind::Gauge,
"Timer" => Kind::Timer,
_ => panic!("No Kind '{}' defined", s)
"Marker" => InputKind::Marker,
"Counter" => InputKind::Counter,
"Gauge" => InputKind::Gauge,
"Timer" => InputKind::Timer,
_ => panic!("No InputKind '{}' defined", s)
}
}
}

View File

@ -3,7 +3,7 @@
//! This is also kept in a separate module because it is not to be exposed outside of the crate.
use core::input::{Marker, InputScope, Counter};
use core::attributes::Naming;
use core::attributes::Prefixed;
use core::proxy::Proxy;
metrics!{

View File

@ -13,7 +13,7 @@ pub mod scheduler;
pub mod metrics;
/// Base type for recorded metric values.
pub type Value = u64;
pub type MetricValue = u64;
/// Both InputScope and OutputScope share the ability to flush the recorded data.
pub trait Flush {
@ -31,10 +31,9 @@ pub mod test {
#[test]
fn test_to_void() {
let c = void::Void::metrics().input();
let m = c.new_metric("test".into(), input::Kind::Marker);
let m = c.new_metric("test".into(), input::InputKind::Marker);
m.write(33, labels![]);
}
}
#[cfg(feature = "bench")]
@ -42,7 +41,7 @@ pub mod bench {
use super::input::*;
use super::clock::*;
use super::super::aggregate::bucket::*;
use super::super::bucket::atomic::*;
use test;
#[bench]
@ -52,7 +51,7 @@ pub mod bench {
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = Bucket::new();
let metrics = AtomicBucket::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}

View File

@ -28,15 +28,15 @@ impl NameParts {
}
/// Make a name in this namespace
pub fn make_name<S: Into<String>>(&self, leaf: S) -> Name {
pub fn make_name<S: Into<String>>(&self, leaf: S) -> MetricName {
let mut nodes = self.clone();
nodes.push_back(leaf.into());
Name { nodes }
MetricName { nodes }
}
/// Extract a copy of the last name part
/// Panics if empty
pub fn short(&self) -> Name {
pub fn short(&self) -> MetricName {
self.back().expect("Short metric name").clone().into()
}
}
@ -70,11 +70,11 @@ impl DerefMut for NameParts {
/// The name of a metric, including the concatenated possible namespaces in which it was defined.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct Name {
pub struct MetricName {
nodes: NameParts,
}
impl Name {
impl MetricName {
/// Prepend to the existing namespace.
pub fn prepend<S: Into<NameParts>>(mut self, namespace: S) -> Self {
@ -101,20 +101,20 @@ impl Name {
}
}
impl<S: Into<String>> From<S> for Name {
impl<S: Into<String>> From<S> for MetricName {
fn from(name: S) -> Self {
Name { nodes: NameParts::from(name) }
MetricName { nodes: NameParts::from(name) }
}
}
impl Deref for Name {
impl Deref for MetricName {
type Target = NameParts;
fn deref(&self) -> &Self::Target {
&self.nodes
}
}
impl DerefMut for Name {
impl DerefMut for MetricName {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.nodes
}

View File

@ -1,7 +1,7 @@
use core::input::{InputScope, InputMetric, Input, Kind};
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
@ -9,7 +9,7 @@ use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Provide thread-safe locking to RawScope implementers.
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
@ -23,8 +23,8 @@ impl WithAttributes for LockingScopeBox {
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {

View File

@ -1,8 +1,8 @@
use core::{Flush, Value};
use core::input::Kind;
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::name::MetricName;
use core::void::Void;
use ::{Labels};
use core::label::Labels;
use std::rc::Rc;
@ -10,26 +10,26 @@ use std::rc::Rc;
pub trait OutputScope: Flush {
/// Define a raw metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric;
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric;
}
/// Output metrics are not thread safe.
#[derive(Clone)]
pub struct OutputMetric {
inner: Rc<Fn(Value, Labels)>
inner: Rc<Fn(MetricValue, Labels)>
}
impl OutputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Labels) + 'static>(metric: F) -> OutputMetric {
pub fn new<F: Fn(MetricValue, Labels) + 'static>(metric: F) -> OutputMetric {
OutputMetric { inner: Rc::new(metric) }
}
/// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form.
/// This shouldn't matter as metrics should be of type Counter, Marker, etc.
#[inline]
pub fn write(&self, value: Value, labels: Labels) {
pub fn write(&self, value: MetricValue, labels: Labels) {
(self.inner)(value, labels)
}
}

View File

@ -1,9 +1,9 @@
//! Decouple metric definition from configuration with trait objects.
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::{Name, NameParts};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::{MetricName, NameParts};
use core::Flush;
use core::input::{Kind, InputMetric, InputScope};
use core::input::{InputKind, InputMetric, InputScope};
use core::void::VOID_INPUT;
use core::error;
@ -26,7 +26,7 @@ lazy_static! {
struct ProxyMetric {
// basic info for this metric, needed to recreate new corresponding trait object if target changes
name: NameParts,
kind: Kind,
kind: InputKind,
// the metric trait object to proxy metric values to
// the second part can be up to namespace.len() + 1 if this metric was individually targeted
@ -174,13 +174,13 @@ impl Proxy {
/// Replace target for this proxy and it's children.
pub fn set_target<T: InputScope + Send + Sync + 'static>(&self, target: T) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.set_target(self.get_naming(), Arc::new(target));
inner.set_target(self.get_prefixes(), Arc::new(target));
}
/// Replace target for this proxy and it's children.
pub fn unset_target(&self) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.unset_target(self.get_naming());
inner.unset_target(self.get_prefixes());
}
/// Replace target for this proxy and it's children.
@ -197,19 +197,19 @@ impl Proxy {
impl<S: AsRef<str>> From<S> for Proxy {
fn from(name: S) -> Proxy {
Proxy::new().add_naming(name.as_ref())
Proxy::new().add_prefix(name.as_ref())
}
}
impl InputScope for Proxy {
/// Lookup or create a proxy stub for the requested metric.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name: Name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name: MetricName = self.prefix_append(name);
let mut inner = self.inner.write().expect("Dispatch Lock");
let proxy = inner
.metrics
.get(&name)
// TODO validate that Kind matches existing
// TODO validate that InputKind matches existing
.and_then(|proxy_ref| Weak::upgrade(proxy_ref))
.unwrap_or_else(|| {
let namespace = &*name;
@ -235,7 +235,7 @@ impl InputScope for Proxy {
impl Flush for Proxy {
fn flush(&self) -> error::Result<()> {
self.inner.write().expect("Dispatch Lock").flush(self.get_naming())
self.inner.write().expect("Dispatch Lock").flush(self.get_prefixes())
}
}
@ -249,11 +249,11 @@ mod bench {
use super::*;
use test;
use aggregate::bucket::Bucket;
use bucket::atomic::AtomicBucket;
#[bench]
fn proxy_marker_to_aggregate(b: &mut test::Bencher) {
ROOT_PROXY.set_target(Bucket::new());
ROOT_PROXY.set_target(AtomicBucket::new());
let metric = ROOT_PROXY.marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}

View File

@ -1,6 +1,6 @@
use core::output::{Output, OutputScope, OutputMetric};
use core::name::Name;
use core::input::{Kind, InputDyn, InputScope};
use core::name::MetricName;
use core::input::{InputKind, InputDyn, InputScope};
use core::Flush;
use std::sync::Arc;
@ -40,7 +40,7 @@ impl Output for Void {
}
impl OutputScope for VoidOutput {
fn new_metric(&self, _name: Name, _kind: Kind) -> OutputMetric {
fn new_metric(&self, _name: MetricName, _kind: InputKind) -> OutputMetric {
OutputMetric::new(|_value, _labels| {})
}
}

View File

@ -27,10 +27,10 @@ mod macros;
pub use macros::*;
mod core;
pub use core::{Flush, Value};
pub use core::attributes::{Naming, Sampling, Sampled, Buffered, Buffering};
pub use core::name::{Name, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, Kind};
pub use core::{Flush, MetricValue};
pub use core::attributes::{Prefixed, Sampling, Sampled, Buffered, Buffering};
pub use core::name::{MetricName, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, InputKind};
pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric};
pub use core::scheduler::{ScheduleFlush, CancelHandle};
pub use core::out_lock::{LockingScopeBox};
@ -51,9 +51,9 @@ pub use output::statsd::{Statsd, StatsdScope, StatsdMetric};
pub use output::map::{StatsMap};
pub use output::log::{Log, LogScope};
mod aggregate;
pub use aggregate::bucket::{Bucket, stats_all, stats_average, stats_summary};
pub use aggregate::scores::{ScoreType, Scoreboard};
mod bucket;
pub use bucket::{ScoreType, stats_all, stats_average, stats_summary};
pub use bucket::atomic::{AtomicBucket};
mod cache;
pub use cache::cache_in::CachedInput;

View File

@ -130,27 +130,27 @@ macro_rules! metrics {
// SUB BRANCH NODE - public identifier
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_naming($e); }
lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_prefix($e); }
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE - private identifier
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_naming($e); }
lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_prefix($e); }
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};

View File

@ -1,9 +1,9 @@
//! Dispatch metrics to multiple sinks.
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::error;
use std::sync::Arc;
@ -75,8 +75,8 @@ impl MultiInputScope {
}
impl InputScope for MultiInputScope {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = &self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = &self.prefix_append(name);
let metrics: Vec<InputMetric> = self.scopes.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();

View File

@ -1,9 +1,9 @@
//! Dispatch metrics to multiple sinks.
use core::Flush;
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::Kind;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::InputKind;
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::error;
@ -76,8 +76,8 @@ impl MultiOutputScope {
}
impl OutputScope for MultiOutputScope {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = &self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = &self.prefix_append(name);
let metrics: Vec<OutputMetric> = self.scopes.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();

View File

@ -1,6 +1,6 @@
use core::name::Name;
use core::input::Kind;
use core::Value;
use core::name::MetricName;
use core::input::InputKind;
use core::MetricValue;
use self::LineOp::*;
use std::io;
@ -15,7 +15,7 @@ pub enum LineOp {
/// Print metric value as text.
ValueAsText,
/// Print metric value, divided by the given scale, as text.
ScaledValueAsText(Value),
ScaledValueAsText(MetricValue),
/// Print the newline character.labels.lookup(key)
NewLine,
}
@ -43,7 +43,7 @@ impl From<Vec<LineOp>> for LineTemplate {
impl LineTemplate {
/// Template execution applies commands in turn, writing to the output.
pub fn print<L>(&self, output: &mut io::Write, value: Value, lookup: L) -> Result<(), io::Error>
pub fn print<L>(&self, output: &mut io::Write, value: MetricValue, lookup: L) -> Result<(), io::Error>
where L: Fn(&str) -> Option<Arc<String>>
{
for cmd in &self.ops {
@ -85,7 +85,7 @@ pub trait Formatting {
pub trait LineFormat: Send + Sync {
/// Prepare a template for output of metric values.
fn template(&self, name: &Name, kind: Kind) -> LineTemplate;
fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate;
}
/// A simple metric output format of "MetricName {Value}"
@ -96,7 +96,7 @@ pub struct SimpleFormat {
}
impl LineFormat for SimpleFormat {
fn template(&self, name: &Name, _kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate {
let mut header = name.join(".");
header.push(' ');
LineTemplate {
@ -112,12 +112,12 @@ impl LineFormat for SimpleFormat {
#[cfg(test)]
pub mod test {
use super::*;
use ::Labels;
use core::label::Labels;
pub struct TestFormat;
impl LineFormat for TestFormat {
fn template(&self, name: &Name, kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate {
let mut header: String = format!("{:?}", kind);
header.push('/');
header.push_str(&name.join("."));
@ -143,9 +143,9 @@ pub mod test {
fn print_label_exists() {
let labels: Labels = labels!("test_key" => "456");
let format = TestFormat {};
let mut name = Name::from("abc");
let mut name = MetricName::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let template = format.template(&name, InputKind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |key| labels.lookup(key)).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 test_key=456\n", String::from_utf8(out).unwrap());
@ -154,9 +154,9 @@ pub mod test {
#[test]
fn print_label_not_exists() {
let format = TestFormat {};
let mut name = Name::from("abc");
let mut name = MetricName::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let template = format.template(&name, InputKind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |_key| None).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 \n", String::from_utf8(out).unwrap());

View File

@ -1,9 +1,9 @@
//! Send metrics to a graphite server.
use core::attributes::{Buffered, Attributes, WithAttributes, Naming};
use core::name::Name;
use core::{Flush, Value};
use core::input::Kind;
use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
@ -71,13 +71,13 @@ pub struct GraphiteScope {
impl OutputScope for GraphiteScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let mut prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let mut prefix = self.prefix_prepend(name).join(".");
prefix.push(' ');
let scale = match kind {
// timers are in µs, but we give graphite milliseconds
Kind::Timer => 1000,
InputKind::Timer => 1000,
_ => 1,
};
@ -99,7 +99,7 @@ impl Flush for GraphiteScope {
}
impl GraphiteScope {
fn print(&self, metric: &GraphiteMetric, value: Value) {
fn print(&self, metric: &GraphiteMetric, value: MetricValue) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
@ -195,7 +195,7 @@ mod bench {
#[bench]
pub fn immediate_graphite(b: &mut test::Bencher) {
let sd = Graphite::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
@ -204,7 +204,7 @@ mod bench {
pub fn buffering_graphite(b: &mut test::Bencher) {
let sd = Graphite::send_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}

View File

@ -1,7 +1,7 @@
use core::{Flush};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed};
use core::name::MetricName;
use core::error;
use cache::cache_in;
use queue::queue_in;
@ -75,8 +75,8 @@ impl queue_in::QueuedInput for Log {}
impl cache_in::CachedInput for Log {}
impl InputScope for LogScope {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
@ -136,7 +136,7 @@ mod test {
#[test]
fn test_to_log() {
let c = super::Log::log_to().input();
let m = c.new_metric("test".into(), Kind::Marker);
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}

View File

@ -1,6 +1,6 @@
use core::{Flush, Value};
use core::input::Kind;
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::name::MetricName;
use core::output::{OutputMetric, OutputScope};
use std::rc::Rc;
@ -11,11 +11,11 @@ use std::collections::BTreeMap;
/// Every received value for a metric replaces the previous one (if any).
#[derive(Clone, Default)]
pub struct StatsMap {
inner: Rc<RefCell<BTreeMap<String, Value>>>,
inner: Rc<RefCell<BTreeMap<String, MetricValue>>>,
}
impl OutputScope for StatsMap {
fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric {
fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric {
let write_to = self.inner.clone();
let name: String = name.join(".");
OutputMetric::new(move |value, _labels| {
@ -26,7 +26,7 @@ impl OutputScope for StatsMap {
impl Flush for StatsMap {}
impl From<StatsMap> for BTreeMap<String, Value> {
impl From<StatsMap> for BTreeMap<String, MetricValue> {
fn from(map: StatsMap) -> Self {
// FIXME this is is possibly a full map copy, for nothing.
// into_inner() is what we'd really want here but would require some `unsafe`? don't know how to do this yet.

View File

@ -5,10 +5,10 @@
//! - Serve metrics with basic HTTP server
//! - Print metrics to a buffer provided by an HTTP framework.
use core::{Flush, Value};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Naming};
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
@ -47,8 +47,8 @@ impl PrometheusScope {
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric {
let mut _prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric {
let mut _prefix = self.prefix_prepend(name).join(".");
OutputMetric::new(|_value, _labels| {})
}
}

View File

@ -1,10 +1,10 @@
//! Send metrics to a statsd server.
use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Prefixed};
use core::name::MetricName;
use core::pcg32;
use core::{Flush, Value};
use core::input::Kind;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
@ -78,21 +78,21 @@ impl Sampled for StatsdScope {}
impl OutputScope for StatsdScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let mut prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let mut prefix = self.prefix_prepend(name).join(".");
prefix.push(':');
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match kind {
Kind::Marker | Kind::Counter => "c",
Kind::Gauge => "g",
Kind::Timer => "ms",
InputKind::Marker | InputKind::Counter => "c",
InputKind::Gauge => "g",
InputKind::Timer => "ms",
});
let scale = match kind {
// timers are in µs, statsd wants ms
Kind::Timer => 1000,
InputKind::Timer => 1000,
_ => 1,
};
@ -127,7 +127,7 @@ impl Flush for StatsdScope {
}
impl StatsdScope {
fn print(&self, metric: &StatsdMetric, value: Value) {
fn print(&self, metric: &StatsdMetric, value: MetricValue) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
@ -212,16 +212,16 @@ impl Drop for StatsdScope {
//pub struct StatsdFormat;
//
//impl Format for StatsdFormat {
// fn template(&self, name: &Name, kind: Kind) -> Template {
// fn template(&self, name: &Name, kind: InputKind) -> Template {
// let mut before_value = name.join(".");
// before_value.push(':');
//
// let mut after_value = String::with_capacity(16);
// after_value.push('|');
// after_value.push_str(match kind {
// Kind::Marker | Kind::Counter => "c",
// Kind::Gauge => "g",
// Kind::Timer => "ms",
// InputKind::Marker | InputKind::Counter => "c",
// InputKind::Gauge => "g",
// InputKind::Timer => "ms",
// });
//
// // specify sampling rate if any
@ -232,7 +232,7 @@ impl Drop for StatsdScope {
// // scale timer values
// let value_text = match kind {
// // timers are in µs, statsd wants ms
// Kind::Timer => ScaledValueAsText(1000),
// InputKind::Timer => ScaledValueAsText(1000),
// _ => ValueAsText,
// };
//
@ -258,7 +258,7 @@ mod bench {
#[bench]
pub fn immediate_statsd(b: &mut test::Bencher) {
let sd = Statsd::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
@ -267,7 +267,7 @@ mod bench {
pub fn buffering_statsd(b: &mut test::Bencher) {
let sd = Statsd::send_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}

View File

@ -3,9 +3,9 @@
// TODO parameterize templates
use core::{Flush};
use core::input::Kind;
use core::attributes::{Attributes, WithAttributes, Buffered, Naming};
use core::name::Name;
use core::input::InputKind;
use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
@ -118,8 +118,8 @@ impl<W: Write + Send + Sync + 'static> WithAttributes for TextScope<W> {
impl<W: Write + Send + Sync + 'static> Buffered for TextScope<W> {}
impl<W: Write + Send + Sync + 'static> OutputScope for TextScope<W> {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
let entries = self.entries.clone();
@ -180,13 +180,13 @@ impl<W: Write + Send + Sync + 'static> Drop for TextScope<W> {
#[cfg(test)]
mod test {
use super::*;
use core::input::Kind;
use core::input::InputKind;
use std::io;
#[test]
fn sink_print() {
let c = super::Stream::write_to(io::stdout()).output();
let m = c.new_metric("test".into(), Kind::Marker);
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}
}

View File

@ -2,14 +2,14 @@
//! Metrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{Kind, Input, InputScope, InputDyn, InputMetric};
use core::{Value, Flush};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::{InputKind, Input, InputScope, InputDyn, InputMetric};
use core::{MetricValue, Flush};
use core::metrics;
use cache::cache_in::CachedInput;
use core::error;
use ::{ Labels};
use core::label::Labels;
use std::sync::Arc;
use std::sync::mpsc;
@ -90,7 +90,7 @@ impl Input for InputQueue {
/// Async commands should be of no concerns to applications.
pub enum InputQueueCmd {
/// Send metric write
Write(InputMetric, Value, Labels),
Write(InputMetric, MetricValue, Labels),
/// Send metric flush
Flush(Arc<InputScope + Send + Sync + 'static>),
}
@ -121,8 +121,8 @@ impl WithAttributes for InputQueueScope {
}
impl InputScope for InputQueueScope {
fn new_metric(&self, name: Name, kind:Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = self.target.new_metric(name, kind);
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {

View File

@ -1,16 +1,16 @@
//! Queue metrics for write on a separate thread,
//! RawMetrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
//!
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output};
use core::{Value, Flush};
use core::{MetricValue, Flush};
use core::metrics;
use cache::cache_in;
use core::error;
use ::{Labels};
use core::label::Labels;
use std::rc::Rc;
use std::ops;
@ -95,7 +95,7 @@ impl Input for OutputQueue {
/// Async commands should be of no concerns to applications.
pub enum OutputQueueCmd {
/// Send metric write
Write(Arc<OutputMetric>, Value, Labels),
Write(Arc<OutputMetric>, MetricValue, Labels),
/// Send metric flush
Flush(Arc<UnsafeScope>),
}
@ -115,8 +115,8 @@ impl WithAttributes for OutputQueueScope {
}
impl InputScope for OutputQueueScope {
fn new_metric(&self, name: Name, kind:Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = Arc::new(self.target.new_metric(name, kind));
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {