Merge pull request #29 from fralalonde/superdoc

Superdoc
This commit is contained in:
Francis Lalonde 2018-11-12 08:42:19 -05:00
commit 0b66ffdbfd
59 changed files with 796 additions and 702 deletions

2
.gitignore vendored
View File

@ -2,3 +2,5 @@ target/
**/*.rs.bk
Cargo.lock
src/prometheus_proto.rs
.idea
cmake-*

262
HANDBOOK.md Executable file
View File

@ -0,0 +1,262 @@
# The dipstick handbook
This handbook's purpose is to get you started instrumenting your apps with Dipstick
and give an idea of what's possible.
# Background
Dipstick was born of the desire to build a metrics library that would allow to select from,
switch between and combine multiple backends.
Such a design has multiple benefits:
- simplified instrumentation
- flexible configuration
- easier metrics testing
Because of its Rust nature, performance, safety and ergonomy are also prime concerns.
## API Overview
Dipstick's API is split between _input_ and _output_ layers.
The input layer provides named metrics such as counters and timers to be used by the application.
The output layer controls how metric values will be recorded and emitted by the configured backend(s).
Input and output layers are decoupled, making code instrumentation independent of output configuration.
Intermediates can also be added between input and output for features or performance characteristics.
Although this handbook covers input before output, implementation can certainly be performed the other way around.
For more details, consult the [docs](https://docs.rs/dipstick/).
## Metrics Input
A metrics library first job is to help a program collect measurements about its operations.
Dipstick provides a restricted but robust set of _four_ instrument types, taking a stance against
an application's functional code having to pick what statistics should be tracked for each defined metric.
This helps to enforce contracts with downstream metrics systems and keeps code free of configuration elements.
#### Counter
Count number of elements processed, e.g. number of bytes received.
#### Marker
A monotonic counter. e.g. to record the processing of individual events.
Default aggregated statistics for markers are not the same as those for counters.
Value-less metric also makes for a safer API, preventing values other than 1 from being passed.
#### Timer
Measure an operation's duration.
Usable either through the time! macro, the closure form or explicit calls to start() and stop().
While timers internal precision are in nanoseconds, their accuracy depends on platform OS and hardware.
Timer's default output format is milliseconds but is scalable up or down.
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
let start = timer.start();
/* slow code here */
timer.stop(start);
timer.interval_us(123_456);
```
### Gauge
An instant observation of a resource's value.
Observation of gauges neither automatic or tied to the output of metrics,
it must be scheduled independently or called explicitly through the code.
### Names
Each metric must be given a name upon creation.
Names are opaque to the application and are used only to identify the metrics upon output.
Names may be prepended with a namespace by each configured backend.
Aggregated statistics may also append identifiers to the metric's name.
Names should exclude characters that can interfere with namespaces, separator and output protocols.
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let db_metrics = app_metrics.add_prefix("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
```
### Labels
Some backends (such as Prometheus) allow "tagging" the metrics with labels to provide additional context,
such as the URL or HTTP method requested from a web server.
Dipstick offers the thread-local ThreadLabel and global AppLabel context maps to transparently carry
metadata to the backends configured to use it.
Notes about labels:
- Using labels may incur a significant runtime cost because
of the additional implicit parameter that has to be carried around.
- Labels runtime costs may be even higher if async queuing is used
since current context has to be persisted across threads.
- While internally supported, single metric labels are not yet part of the input API.
If this is important to you, consider using dynamically defined metrics or open a GitHub issue!
### Static vs dynamic metrics
Metric inputs are usually setup statically upon application startup.
```rust,skt-plain
#[macro_use]
extern crate dipstick;
use dipstick::*;
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
fn main() {
route_aggregate_metrics(to_stdout());
COUNTER_A.count(11);
}
```
The static metric definition macro is just `lazy_static!` wrapper.
## Dynamic metrics
If necessary, metrics can also be defined "dynamically", with a possibly new name for every value.
This is more flexible but has a higher runtime cost, which may be alleviated with caching.
```rust,skt-run
let user_name = "john_day";
let app_metrics = to_log().with_cache(512);
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
```
## Metrics Output
A metrics library's second job is to help a program emit metric values that can be used in further systems.
Dipstick provides an assortment of drivers for network or local metrics output.
Multiple outputs can be used at a time, each with its own configuration.
### Types
These output type are provided, some are extensible, you may write your own if you need to.
#### Stream
Write values to any Write trait implementer, including files, stderr and stdout.
#### Log
Write values to the log using the log crate.
### Map
Insert metric values in a map.
#### Statsd
Send metrics to a remote host over UDP using the statsd format.
#### Graphite
Send metrics to a remote host over TCP using the graphite format.
#### TODO Prometheus
Send metrics to a remote host over TCP using the Prometheus JSON or ProtoBuf format.
### Attributes
Attributes change the outputs behavior.
#### Prefixes
Outputs can be given Prefixes.
Prefixes are prepended to the Metrics names emitted by this output.
With network outputs, a typical use of Prefixes is to identify the network host,
environment and application that metrics originate from.
#### Formatting
Stream and Log outputs have configurable formatting that enables usage of custom templates.
Other outputs, such as Graphite, have a fixed format because they're intended to be processed by a downstream system.
#### Buffering
Most outputs provide optional buffering, which can be used to optimized throughput at the expense of higher latency.
If enabled, buffering is usually a best-effort affair, to safely limit the amount of memory that is used by the metrics.
#### Sampling
Some outputs such as statsd also have the ability to sample metrics.
If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy.
```rust,skt-fail
let _app_metrics = to_statsd("server:8125")?.with_sampling_rate(0.01);
```
## Intermediates
### Proxy
Because the input's actual _implementation_ depends on the output configuration,
it is necessary to create an output channel before defining any metrics.
This is often not possible because metrics configuration could be dynamic (e.g. loaded from a file),
which might happen after the static initialization phase in which metrics are defined.
To get around this catch-22, Dipstick provides a Proxy which acts as intermediate output,
allowing redirection to the effective output after it has been set up.
### Bucket
Another intermediate output is the Bucket, which can be used to aggregate metric values.
Bucket-aggregated values can be used to infer statistics which will be flushed out to
Bucket aggregation is performed locklessly and is very fast.
Count, Sum, Min, Max and Mean are tracked where they make sense, depending on the metric type.
#### Preset bucket statistics
Published statistics can be selected with presets such as `all_stats` (see previous example),
`summary`, `average`.
#### Custom bucket statistics
For more control over published statistics, provide your own strategy:
```rust,skt-run
metrics(aggregate());
set_default_aggregate_fn(|_kind, name, score|
match score {
ScoreType::Count(count) =>
Some((Kind::Counter, vec![name, ".per_thousand"], count / 1000)),
_ => None
});
```
#### Scheduled publication
Aggregate metrics and schedule to be periodical publication in the background:
```rust,skt-run
use std::time::Duration;
let app_metrics = metric_scope(aggregate());
route_aggregate_metrics(to_stdout());
app_metrics.flush_every(Duration::from_secs(3));
```
### Multi
Like Constructicons, multiple metrics outputs can assemble, creating a unified facade that transparently dispatches
input metrics to each constituent output.
```rust,skt-fail,no_run
let _app_metrics = metric_scope((
to_stdout(),
to_statsd("localhost:8125")?.with_namespace(&["my", "app"])
));
```
### Queue
Metrics can be recorded asynchronously:
```rust,skt-run
let _app_metrics = metric_scope(to_stdout().queue(64));
```
The async queue uses a Rust channel and a standalone thread.
If the queue ever fills up under heavy load, the behavior reverts to blocking (rather than dropping metrics).
## Facilities

View File

@ -11,7 +11,8 @@ minimal impact on applications and a choice of output to downstream systems.
Dipstick is a toolkit to help all sorts of application collect and send out metrics.
As such, it needs a bit of set up to suit one's needs.
Skimming through the handbook [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook)
Skimming through the [handbook](https://github.com/fralalonde/dipstick/tree/master/HANDBOOK.md)
and many [examples](https://github.com/fralalonde/dipstick/tree/master/examples)
should help you get an idea of the possible configurations.
In short, dipstick-enabled apps _can_:
@ -31,7 +32,8 @@ For convenience, dipstick builds on stable Rust with minimal, feature-gated depe
### Non-goals
For performance reasons, dipstick will not
Dipstick's focus is on metrics collection (input) and forwarding (output).
Although it will happily track aggregated statistics, for the sake of simplicity and performance Dipstick will not
- plot graphs
- send alerts
- track histograms
@ -77,4 +79,3 @@ dipstick = "0.7.0"
## License
Dipstick is licensed under the terms of the Apache 2.0 and MIT license.

View File

@ -10,7 +10,7 @@ use protoc_rust as protoc;
fn main() {
// generates handbook tests for `README.md`.
#[cfg(feature="skeptic")]
skeptic::generate_doc_tests(&["README.md"]);
skeptic::generate_doc_tests(&["README.md", "HANDBOOK.md"]);
#[cfg(feature="proto, prometheus")]
protoc::run(protoc::Args {

View File

@ -20,7 +20,7 @@ fn main() {
app_metrics.counter("just_once").count(4);
// metric names can be prepended with a common prefix
let prefixed_metrics = app_metrics.add_naming("subsystem");
let prefixed_metrics = app_metrics.add_prefix("subsystem");
let event = prefixed_metrics.marker("event_c");
let gauge = prefixed_metrics.gauge("gauge_d");

View File

@ -11,7 +11,7 @@ use std::env::args;
use std::str::FromStr;
fn main() {
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
let event = bucket.marker("a");
let args = &mut args();
args.next();

View File

@ -13,7 +13,7 @@ use std::str::FromStr;
fn main() {
let event = Proxy::default().marker("a");
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
Proxy::default().set_target(bucket.clone());

View File

@ -11,7 +11,7 @@ use std::env::args;
use std::str::FromStr;
fn main() {
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
let queue = InputQueueScope::wrap(bucket.clone(), 10000);
let event = queue.marker("a");
let args = &mut args();

View File

@ -7,11 +7,11 @@ use std::time::Duration;
use dipstick::*;
fn main() {
let bucket = Bucket::new().add_naming("test");
let bucket = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
bucket.set_target(Graphite::send_to("localhost:2003").expect("Socket")
.add_naming("machine1").add_naming("application"));
bucket.set_flush_target(Graphite::send_to("localhost:2003").expect("Socket")
.add_prefix("machine1").add_prefix("application"));
bucket.flush_every(Duration::from_secs(3));

View File

@ -8,10 +8,10 @@ use std::io;
use dipstick::*;
fn main() {
let metrics = Bucket::new().add_naming("test");
let metrics = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
metrics.set_target(Stream::write_to(io::stdout()));
metrics.set_flush_target(Stream::write_to(io::stdout()));
metrics.flush_every(Duration::from_secs(3));

View File

@ -10,8 +10,8 @@ use std::thread::sleep;
fn main() {
let bucket = Bucket::new();
Bucket::set_default_target(Stream::write_to(io::stdout()));
let bucket = AtomicBucket::new();
AtomicBucket::set_default_target(Stream::write_to(io::stdout()));
let persistent_marker = bucket.marker("persistent");

View File

@ -9,8 +9,8 @@ use std::io;
fn main() {
let app_metrics = Bucket::new();
app_metrics.set_target(Stream::write_to(io::stdout()));
let app_metrics = AtomicBucket::new();
app_metrics.set_flush_target(Stream::write_to(io::stdout()));
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -8,7 +8,7 @@ use std::io;
use dipstick::*;
fn main() {
let metrics = Stream::write_to(io::stdout()).cached(5).input().add_naming("cache");
let metrics = Stream::write_to(io::stdout()).cached(5).input().add_prefix("cache");
loop {
// report some ad-hoc metric values from our "application" loop

View File

@ -17,25 +17,25 @@ metrics!{
fn main() {
let one_minute = Bucket::new();
let one_minute = AtomicBucket::new();
one_minute.flush_every(Duration::from_secs(60));
let five_minutes = Bucket::new();
let five_minutes = AtomicBucket::new();
five_minutes.flush_every(Duration::from_secs(300));
let fifteen_minutes = Bucket::new();
let fifteen_minutes = AtomicBucket::new();
fifteen_minutes.flush_every(Duration::from_secs(900));
let all_buckets = MultiInputScope::new()
.add_target(one_minute)
.add_target(five_minutes)
.add_target(fifteen_minutes)
.add_naming("machine_name");
.add_prefix("machine_name");
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
Bucket::set_default_target(Stream::write_to(io::stdout()));
Bucket::set_default_stats(stats_all);
AtomicBucket::set_default_target(Stream::write_to(io::stdout()));
AtomicBucket::set_default_stats(stats_all);
loop {
COUNTER.count(17);

View File

@ -8,19 +8,19 @@ use dipstick::*;
fn main() {
fn custom_statistics(
kind: Kind,
mut name: Name,
kind: InputKind,
mut name: MetricName,
score: ScoreType,
) -> Option<(Kind, Name, Value)> {
) -> Option<(InputKind, MetricName, MetricValue)> {
match (kind, score) {
// do not export gauge scores
(Kind::Gauge, _) => None,
(InputKind::Gauge, _) => None,
// prepend and append to metric name
(_, ScoreType::Count(count)) => {
if let Some(last) = name.pop_back() {
Some((
Kind::Counter,
InputKind::Counter,
name.append("customized_add_prefix")
.append(format!("{}_and_a_suffix", last)),
count,
@ -42,10 +42,10 @@ fn main() {
}
// send application metrics to aggregator
Bucket::set_default_target(Stream::stderr());
Bucket::set_default_stats(custom_statistics);
AtomicBucket::set_default_target(Stream::stderr());
AtomicBucket::set_default_stats(custom_statistics);
let app_metrics = Bucket::new();
let app_metrics = AtomicBucket::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -10,7 +10,7 @@ fn main() {
let metrics =
Graphite::send_to("localhost:2003")
.expect("Connected")
.add_naming("my_app")
.add_prefix("my_app")
.input();
loop {

View File

@ -2,7 +2,7 @@
extern crate dipstick;
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Naming};
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Prefixed};
use std::time::Duration;
use std::io;
@ -15,9 +15,9 @@ fn main() {
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiInput::input()
.add_target(Stream::write_to(io::stdout()).add_naming("yeah"))
.add_target(Stream::write_to(io::stdout()).add_naming("ouch"))
.add_naming("cool")
.add_target(Stream::write_to(io::stdout()).add_prefix("yeah"))
.add_target(Stream::write_to(io::stdout()).add_prefix("ouch"))
.add_prefix("cool")
.input();
loop {

View File

@ -15,13 +15,13 @@ fn main() {
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiOutput::output()
.add_target(Stream::write_to(io::stderr()).add_naming("out_1"))
.add_target(Stream::write_to(io::stderr()).add_naming("out_2"))
.add_naming("out_both").input();
.add_target(Stream::write_to(io::stderr()).add_prefix("out_1"))
.add_target(Stream::write_to(io::stderr()).add_prefix("out_2"))
.add_prefix("out_both").input();
loop {
different_type_metrics.new_metric("counter_a".into(), Kind::Counter).write(123, labels![]);
same_type_metrics.new_metric("timer_a".into(), Kind::Timer).write(6677, labels![]);
different_type_metrics.new_metric("counter_a".into(), InputKind::Counter).write(123, labels![]);
same_type_metrics.new_metric("timer_a".into(), InputKind::Timer).write(6677, labels![]);
std::thread::sleep(Duration::from_millis(400));
}
}

View File

@ -4,12 +4,12 @@ extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Proxy, Stream, InputScope, Input, Naming};
use dipstick::{Proxy, Stream, InputScope, Input, Prefixed};
fn main() {
let root_proxy = Proxy::default();
let sub = root_proxy.add_naming("sub");
let sub = root_proxy.add_prefix("sub");
let count1 = root_proxy.counter("counter_a");
@ -22,12 +22,12 @@ fn main() {
count2.count(2);
// route every metric from the root to stdout with prefix "root"
root_proxy.set_target(stdout.add_naming("root"));
root_proxy.set_target(stdout.add_prefix("root"));
count1.count(3);
count2.count(4);
// route metrics from "sub" to stdout with prefix "mutant"
sub.set_target(stdout.add_naming("mutant"));
sub.set_target(stdout.add_prefix("mutant"));
count1.count(5);
count2.count(6);

View File

@ -17,7 +17,7 @@ pub fn raw_write() {
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(
"count_a".into(),
dipstick::Kind::Counter,
dipstick::InputKind::Counter,
);
counter.write(1, labels![]);
}

View File

@ -1,6 +1,5 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
//extern crate badlog;
extern crate dipstick;
use dipstick::*;
@ -11,7 +10,7 @@ fn main() {
Statsd::send_to("localhost:8125")
.expect("Connected")
// .with_sampling(Sampling::Random(0.2))
.add_naming("my_app")
.add_prefix("my_app")
.input();
let counter = metrics.counter("counter_a");

View File

@ -1,6 +1,5 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
//extern crate badlog;
extern crate dipstick;
use dipstick::*;
@ -11,7 +10,7 @@ fn main() {
Statsd::send_to("localhost:8125")
.expect("Connected")
.sampled(Sampling::Random(0.2))
.add_naming("my_app")
.add_prefix("my_app")
.input();
let counter = metrics.counter("counter_a");

View File

@ -1,18 +1,19 @@
//! A sample application asynchronously printing metrics to stdout.
//! Print metrics to stderr with custom formatter including a label.
extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Stream, InputScope, Input, Formatting, AppLabel,
Name, Kind, LineTemplate, LineFormat, LineOp, LabelOp};
MetricName, InputKind, LineTemplate, LineFormat, LineOp, LabelOp};
/// Generates template like "$METRIC $value $label_value["abc"]\n"
struct MyFormat;
impl LineFormat for MyFormat {
fn template(&self, name: &Name, _kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate {
vec![
LineOp::Literal(format!("{} ", name.join(".")).into()),
LineOp::Literal(format!("{} ", name.join(".")).to_uppercase().into()),
LineOp::ValueAsText,
LineOp::Literal(" ".into()),
LineOp::LabelExists("abc".into(),

View File

@ -1,28 +0,0 @@
# the dipstick handbook
**IN PROGRESS**
## table of contents
## introduction
## static metrics macro
For speed and easier maintenance, metrics are usually defined statically:
```rust,skt-plain
#[macro_use]
extern crate dipstick;
use dipstick::*;
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
fn main() {
route_aggregate_metrics(to_stdout());
COUNTER_A.count(11);
}
```
Metric definition macros are just `lazy_static!` wrappers.

View File

@ -1,54 +0,0 @@
# input
## namespace
Related metrics can share a namespace:
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let db_metrics = app_metrics.add_prefix("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
```
## proxy
## counter
## marker
## timer
Timers can be used multiple ways:
```rust,skt-run
let app_metrics = metric_scope(to_stdout());
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
let start = timer.start();
/* slow code here */
timer.stop(start);
timer.interval_us(123_456);
```
## gauge
## ad-hoc metrics
Where necessary, metrics can also be defined _ad-hoc_ (or "inline"):
```rust,skt-run
let user_name = "john_day";
let app_metrics = metric_scope(to_log()).with_cache(512);
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
```
## ad-hoc metrics cache
Defining a cache is optional but will speed up re-definition of common ad-hoc metrics.
## local vs global scopes

View File

@ -1,38 +0,0 @@
# outputs
## statsd
## graphite
## text
## logging
## prometheus
## combination
Send metrics to multiple outputs:
```rust,skt-fail,no_run
let _app_metrics = metric_scope((
to_stdout(),
to_statsd("localhost:8125")?.with_namespace(&["my", "app"])
));
```
## buffering
## sampling
Apply statistical sampling to metrics:
```rust,skt-fail
let _app_metrics = to_statsd("server:8125")?.with_sampling_rate(0.01);
```
A fast random algorithm (PCG32) is used to pick samples.
Outputs can use sample rate to expand or format published data.

View File

@ -1,36 +0,0 @@
# aggregation
## bucket
Aggregation is performed locklessly and is very fast.
Count, sum, min, max and average are tracked where they make sense.
## schedule
Aggregate metrics and schedule to be periodical publication in the background:
```rust,skt-run
use std::time::Duration;
let app_metrics = metric_scope(aggregate());
route_aggregate_metrics(to_stdout());
app_metrics.flush_every(Duration::from_secs(3));
```
## preset statistics
Published statistics can be selected with presets such as `all_stats` (see previous example),
`summary`, `average`.
## custom statistics
For more control over published statistics, provide your own strategy:
```rust,skt-run
metrics(aggregate());
set_default_aggregate_fn(|_kind, name, score|
match score {
ScoreType::Count(count) =>
Some((Kind::Counter, vec![name, ".per_thousand"], count / 1000)),
_ => None
});
```

View File

@ -1,12 +0,0 @@
# concurrency concerns
## locking
## queueing
Metrics can be recorded asynchronously:
```rust,skt-run
let _app_metrics = metric_scope(to_stdout().with_async_queue(64));
```
The async queue uses a Rust channel and a standalone thread.
The current behavior is to block when full.

View File

@ -1,2 +0,0 @@
pub mod bucket;
pub mod scores;

View File

@ -1,174 +0,0 @@
use core::input::Kind;
use core::Value;
use std::mem;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::usize;
use self::ScoreType::*;
#[derive(Debug, Clone, Copy)]
/// Possibly aggregated scores.
pub enum ScoreType {
/// Number of times the metric was used.
Count(u64),
/// Sum of metric values reported.
Sum(u64),
/// Biggest value reported.
Max(u64),
/// Smallest value reported.
Min(u64),
/// Average value (hit count / sum, non-atomic)
Mean(f64),
/// Mean rate (hit count / period length in seconds, non-atomic)
Rate(f64),
}
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
pub struct Scoreboard {
/// The kind of metric
kind: Kind,
/// The actual recorded metric scores
scores: [AtomicUsize; 4],
}
impl Scoreboard {
/// Create a new Scoreboard to track summary values of a metric
pub fn new(kind: Kind) -> Self {
Scoreboard {
kind,
scores: unsafe { mem::transmute(Scoreboard::blank()) },
}
}
/// Returns the metric's kind.
pub fn metric_kind(&self) -> Kind {
self.kind
}
#[inline]
fn blank() -> [usize; 4] {
[0, 0, usize::MIN, usize::MAX]
}
/// Update scores with new value
pub fn update(&self, value: Value) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[0].fetch_add(1, AcqRel);
match self.kind {
Kind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
swap_if(&self.scores[2], value, |new, current| new > current);
swap_if(&self.scores[3], value, |new, current| new < current);
}
}
}
/// Reset scores to zero, return previous values
fn snapshot(&self, scores: &mut [usize; 4]) -> bool {
// NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies
scores[0] = self.scores[0].swap(0, AcqRel);
scores[1] = self.scores[1].swap(0, AcqRel);
// if hit count is zero, then no values were recorded.
if scores[0] == 0 {
return false;
}
scores[2] = self.scores[2].swap(usize::MIN, AcqRel);
scores[3] = self.scores[3].swap(usize::MAX, AcqRel);
true
}
/// Map raw scores (if any) to applicable statistics
pub fn reset(&self, duration_seconds: f64) -> Option<Vec<ScoreType>> {
let mut scores = Scoreboard::blank();
if self.snapshot(&mut scores) {
let mut snapshot = Vec::new();
match self.kind {
Kind::Marker => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Kind::Gauge => {
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
}
Kind::Timer => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Kind::Counter => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
}
Some(snapshot)
} else {
None
}
}
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) {
let mut current = counter.load(Acquire);
while compare(new_value, current) {
if counter.compare_and_swap(current, new_value, Release) == new_value {
// update successful
break;
}
// race detected, retry
current = counter.load(Acquire);
}
}
#[cfg(feature = "bench")]
mod bench {
use core::input::Kind;
use super::*;
use test;
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Marker);
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn update_count(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Counter);
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn empty_snapshot(b: &mut test::Bencher) {
let metric = Scoreboard::new(Kind::Counter);
let scores = &mut Scoreboard::blank();
b.iter(|| test::black_box(metric.snapshot(scores)));
}
}

View File

@ -1,21 +1,26 @@
//! Maintain aggregated metrics for deferred reporting,
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::{Name};
use core::input::{Kind, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::{MetricName};
use core::input::{InputKind, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output, output_none};
use core::clock::TimeHandle;
use core::{Value, Flush};
use aggregate::scores::{Scoreboard, ScoreType};
use core::{MetricValue, Flush};
use bucket::{ScoreType, stats_summary};
use bucket::ScoreType::*;
use core::error;
use std::mem;
use std::usize;
use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::sync::{Arc, RwLock};
use std::fmt;
use std::borrow::Borrow;
/// A function type to transform aggregated scores into publishable statistics.
pub type StatsFn = Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static;
pub type StatsFn = Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static;
fn initial_stats() -> &'static StatsFn {
&stats_summary
@ -34,21 +39,21 @@ lazy_static! {
/// Central aggregation structure.
/// Maintains a list of metrics for enumeration when used as source.
#[derive(Debug, Clone)]
pub struct Bucket {
pub struct AtomicBucket {
attributes: Attributes,
inner: Arc<RwLock<InnerBucket>>,
inner: Arc<RwLock<InnerAtomicBucket>>,
}
struct InnerBucket {
metrics: BTreeMap<Name, Arc<Scoreboard>>,
struct InnerAtomicBucket {
metrics: BTreeMap<MetricName, Arc<AtomicScores>>,
period_start: TimeHandle,
stats: Option<Arc<Fn(Kind, Name, ScoreType)
-> Option<(Kind, Name, Value)> + Send + Sync + 'static>>,
stats: Option<Arc<Fn(InputKind, MetricName, ScoreType)
-> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>,
output: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
}
impl fmt::Debug for InnerBucket {
impl fmt::Debug for InnerAtomicBucket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "metrics: {:?}", self.metrics)?;
write!(f, "period_start: {:?}", self.period_start)
@ -56,10 +61,10 @@ impl fmt::Debug for InnerBucket {
}
lazy_static! {
static ref PERIOD_LENGTH: Name = "_period_length".into();
static ref PERIOD_LENGTH: MetricName = "_period_length".into();
}
impl InnerBucket {
impl InnerAtomicBucket {
pub fn flush(&mut self) -> error::Result<()> {
let stats_fn = match self.stats {
@ -90,13 +95,13 @@ impl InnerBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> {
pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
self.period_start = now;
let mut snapshot: Vec<(&Name, Kind, Vec<ScoreType>)> = self.metrics.iter()
let mut snapshot: Vec<(&MetricName, InputKind, Vec<ScoreType>)> = self.metrics.iter()
.flat_map(|(name, scores)| if let Some(values) = scores.reset(duration_seconds) {
Some((name, scores.metric_kind(), values))
} else {
@ -112,39 +117,41 @@ impl InnerBucket {
} else {
// TODO add switch for metadata such as PERIOD_LENGTH
if self.publish_metadata {
snapshot.push((&PERIOD_LENGTH, Kind::Timer, vec![ScoreType::Sum((duration_seconds * 1000.0) as u64)]));
snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as u64)]));
}
for metric in snapshot {
for score in metric.2 {
let filtered = (stats_fn)(metric.1, metric.0.clone(), score);
let filtered = stats(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: OutputMetric = publish_scope.new_metric(name, kind);
let metric: OutputMetric = target.new_metric(name, kind);
// TODO provide some bucket context through labels?
metric.write(value, labels![])
}
}
}
publish_scope.flush()
target.flush()
}
}
}
impl<S: AsRef<str>> From<S> for Bucket {
fn from(name: S) -> Bucket {
Bucket::new().add_naming(name.as_ref())
impl<S: AsRef<str>> From<S> for AtomicBucket {
fn from(name: S) -> AtomicBucket {
AtomicBucket::new().add_prefix(name.as_ref())
}
}
impl Bucket {
/// Build a new metric aggregation
pub fn new() -> Bucket {
Bucket {
impl AtomicBucket {
/// Build a new atomic bucket.
pub fn new() -> AtomicBucket {
AtomicBucket {
attributes: Attributes::default(),
inner: Arc::new(RwLock::new(InnerBucket {
inner: Arc::new(RwLock::new(InnerAtomicBucket {
metrics: BTreeMap::new(),
period_start: TimeHandle::now(),
stats: None,
output: None,
// TODO add API toggle for metadata publish
publish_metadata: false,
}))
}
@ -153,7 +160,7 @@ impl Bucket {
/// Set the default aggregated metrics statistics generator.
pub fn set_default_stats<F>(func: F)
where
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static
{
*DEFAULT_AGGREGATE_STATS.write().unwrap() = Arc::new(func)
}
@ -176,7 +183,7 @@ impl Bucket {
/// Set the default aggregated metrics statistics generator.
pub fn set_stats<F>(&self, func: F)
where
F: Fn(Kind, Name, ScoreType) -> Option<(Kind, Name, Value)> + Send + Sync + 'static
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static
{
self.inner.write().expect("Aggregator").stats = Some(Arc::new(func))
}
@ -187,7 +194,7 @@ impl Bucket {
}
/// Install a new receiver for all aggregated metrics, replacing any previous receiver.
pub fn set_target(&self, new_config: impl Output + Send + Sync + 'static) {
pub fn set_flush_target(&self, new_config: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config))
}
@ -204,21 +211,21 @@ impl Bucket {
}
impl InputScope for Bucket {
/// Lookup or create a scoreboard for the requested metric.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let scoreb = self.inner
impl InputScope for AtomicBucket {
/// Lookup or create scores for the requested metric.
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let scores = self.inner
.write()
.expect("Aggregator")
.metrics
.entry(self.naming_append(name))
.or_insert_with(|| Arc::new(Scoreboard::new(kind)))
.entry(self.prefix_append(name))
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
.clone();
InputMetric::new(move |value, _labels| scoreb.update(value))
InputMetric::new(move |value, _labels| scores.update(value))
}
}
impl Flush for Bucket {
impl Flush for AtomicBucket {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) -> error::Result<()> {
@ -227,64 +234,127 @@ impl Flush for Bucket {
}
}
impl WithAttributes for Bucket {
impl WithAttributes for AtomicBucket {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
#[allow(dead_code)]
pub fn stats_all(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match score {
ScoreType::Count(hit) => Some((Kind::Counter, name.make_name("count"), hit)),
ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)),
ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as Value)),
ScoreType::Max(max) => Some((Kind::Gauge, name.make_name("max"), max)),
ScoreType::Min(min) => Some((Kind::Gauge, name.make_name("min"), min)),
ScoreType::Rate(rate) => Some((Kind::Gauge, name.make_name("rate"), rate.round() as Value)),
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
struct AtomicScores {
/// The kind of metric
kind: InputKind,
/// The actual recorded metric scores
scores: [AtomicUsize; 4],
}
impl AtomicScores {
/// Create new scores to track summary values of a metric
pub fn new(kind: InputKind) -> Self {
AtomicScores {
kind,
scores: unsafe { mem::transmute(AtomicScores::blank()) },
}
}
/// Returns the metric's kind.
pub fn metric_kind(&self) -> InputKind {
self.kind
}
#[inline]
fn blank() -> [usize; 4] {
[0, 0, usize::MIN, usize::MAX]
}
/// Update scores with new value
pub fn update(&self, value: MetricValue) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[0].fetch_add(1, AcqRel);
match self.kind {
InputKind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
swap_if(&self.scores[2], value, |new, current| new > current);
swap_if(&self.scores[3], value, |new, current| new < current);
}
}
}
/// Reset scores to zero, return previous values
fn snapshot(&self, scores: &mut [usize; 4]) -> bool {
// NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies
scores[0] = self.scores[0].swap(0, AcqRel);
scores[1] = self.scores[1].swap(0, AcqRel);
// if hit count is zero, then no values were recorded.
if scores[0] == 0 {
return false;
}
scores[2] = self.scores[2].swap(usize::MIN, AcqRel);
scores[3] = self.scores[3].swap(usize::MAX, AcqRel);
true
}
/// Map raw scores (if any) to applicable statistics
pub fn reset(&self, duration_seconds: f64) -> Option<Vec<ScoreType>> {
let mut scores = AtomicScores::blank();
if self.snapshot(&mut scores) {
let mut snapshot = Vec::new();
match self.kind {
InputKind::Marker => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
InputKind::Gauge => {
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
}
InputKind::Timer => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
InputKind::Counter => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
}
}
Some(snapshot)
} else {
None
}
}
}
/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
_ => match score {
ScoreType::Mean(avg) => Some((Kind::Gauge, name, avg.round() as Value)),
_ => None,
},
}
}
/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
Kind::Counter | Kind::Timer => match score {
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
Kind::Gauge => match score {
ScoreType::Mean(mean) => Some((Kind::Gauge, name, mean.round() as Value)),
_ => None,
},
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) -> bool) {
let mut current = counter.load(Acquire);
while compare(new_value, current) {
if counter.compare_and_swap(current, new_value, Release) == new_value {
// update successful
break;
}
// race detected, retry
current = counter.load(Acquire);
}
}
@ -294,17 +364,36 @@ mod bench {
use test;
use super::*;
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Marker);
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn update_count(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Counter);
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn empty_snapshot(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Counter);
let scores = &mut AtomicScores::blank();
b.iter(|| test::black_box(metric.snapshot(scores)));
}
#[bench]
fn aggregate_marker(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("event_a".into(), Kind::Marker);
let sink = AtomicBucket::new();
let metric = sink.new_metric("event_a".into(), InputKind::Marker);
b.iter(|| test::black_box(metric.write(1, labels![])));
}
#[bench]
fn aggregate_counter(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("count_a".into(), Kind::Counter);
let sink = AtomicBucket::new();
let metric = sink.new_metric("count_a".into(), InputKind::Counter);
b.iter(|| test::black_box(metric.write(1, labels![])));
}
@ -313,16 +402,18 @@ mod bench {
#[cfg(test)]
mod test {
use super::*;
use bucket::{stats_all, stats_average, stats_summary};
use core::clock::{mock_clock_advance, mock_clock_reset};
use output::map::StatsMap;
use std::time::Duration;
use std::collections::BTreeMap;
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, Value> {
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, MetricValue> {
mock_clock_reset();
let metrics = Bucket::new().add_naming("test");
let metrics = AtomicBucket::new().add_prefix("test");
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");

84
src/bucket/mod.rs Executable file
View File

@ -0,0 +1,84 @@
pub mod atomic;
use core::input::InputKind;
use core::MetricValue;
use core::name::{MetricName};
/// Possibly aggregated scores.
#[derive(Debug, Clone, Copy)]
pub enum ScoreType {
/// Number of times the metric was used.
Count(u64),
/// Sum of metric values reported.
Sum(u64),
/// Biggest value reported.
Max(u64),
/// Smallest value reported.
Min(u64),
/// Average value (hit count / sum, non-atomic)
Mean(f64),
/// Mean rate (hit count / period length in seconds, non-atomic)
Rate(f64),
}
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
#[allow(dead_code)]
pub fn stats_all(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match score {
ScoreType::Count(hit) => Some((InputKind::Counter, name.make_name("count"), hit)),
ScoreType::Sum(sum) => Some((kind, name.make_name("sum"), sum)),
ScoreType::Mean(mean) => Some((kind, name.make_name("mean"), mean.round() as MetricValue)),
ScoreType::Max(max) => Some((InputKind::Gauge, name.make_name("max"), max)),
ScoreType::Min(min) => Some((InputKind::Gauge, name.make_name("min"), min)),
ScoreType::Rate(rate) => Some((InputKind::Gauge, name.make_name("rate"), rate.round() as MetricValue)),
}
}
/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_average(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match kind {
InputKind::Marker => match score {
ScoreType::Count(count) => Some((InputKind::Counter, name, count)),
_ => None,
},
_ => match score {
ScoreType::Mean(avg) => Some((InputKind::Gauge, name, avg.round() as MetricValue)),
_ => None,
},
}
}
/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
#[allow(dead_code)]
pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType)
-> Option<(InputKind, MetricName, MetricValue)>
{
match kind {
InputKind::Marker => match score {
ScoreType::Count(count) => Some((InputKind::Counter, name, count)),
_ => None,
},
InputKind::Counter | InputKind::Timer => match score {
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
InputKind::Gauge => match score {
ScoreType::Mean(mean) => Some((InputKind::Gauge, name, mean.round() as MetricValue)),
_ => None,
},
}
}

14
src/cache/cache_in.rs vendored
View File

@ -1,9 +1,9 @@
//! Cache metric definitions.
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use cache::lru_cache as lru;
use core::error;
@ -24,7 +24,7 @@ pub trait CachedInput: Input + Send + Sync + 'static + Sized {
pub struct InputCache {
attributes: Attributes,
target: Arc<InputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, InputMetric>>>,
}
impl InputCache {
@ -61,7 +61,7 @@ impl Input for InputCache {
pub struct InputScopeCache {
attributes: Attributes,
target: Arc<InputScope + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, InputMetric>>>,
}
impl WithAttributes for InputScopeCache {
@ -70,8 +70,8 @@ impl WithAttributes for InputScopeCache {
}
impl InputScope for InputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let lookup = {
self.cache.write().expect("Cache Lock").get(&name).cloned()
};

View File

@ -1,10 +1,10 @@
//! Cache metric definitions.
use core::Flush;
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::input::Kind;
use core::input::InputKind;
use cache::lru_cache as lru;
use core::error;
@ -26,7 +26,7 @@ pub trait CachedOutput: Output + Send + Sync + 'static + Sized {
pub struct OutputCache {
attributes: Attributes,
target: Arc<OutputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, OutputMetric>>>,
}
impl OutputCache {
@ -63,7 +63,7 @@ impl Output for OutputCache {
pub struct OutputScopeCache {
attributes: Attributes,
target: Rc<OutputScope + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
cache: Arc<RwLock<lru::LRUCache<MetricName, OutputMetric>>>,
}
impl WithAttributes for OutputScopeCache {
@ -72,8 +72,8 @@ impl WithAttributes for OutputScopeCache {
}
impl OutputScope for OutputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_append(name);
let lookup = {
self.cache.write().expect("Cache Lock").get(&name).cloned()
};

View File

@ -1,6 +1,6 @@
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
use std::hash::Hash;
use std::collections::HashMap;

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::collections::{HashMap};
use core::name::{NameParts, Name};
use core::name::{NameParts, MetricName};
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
#[derive(Debug, Clone, Copy)]
@ -43,7 +43,7 @@ pub trait WithAttributes: Clone {
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn mut_attributes(&mut self) -> &mut Attributes;
/// Utility method. Clone a component and mutate its attributes at once.
/// Clone the component and mutate its attributes at once.
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self {
let mut cloned = self.clone();
(edit)(cloned.mut_attributes());
@ -52,21 +52,21 @@ pub trait WithAttributes: Clone {
}
/// Name operations support.
pub trait Naming {
pub trait Prefixed {
/// Returns namespace of component.
fn get_naming(&self) -> &NameParts;
fn get_prefixes(&self) -> &NameParts;
/// Join namespace and prepend in newly defined metrics.
fn add_naming<S: Into<String>>(&self, name: S) -> Self;
/// Extend the namespace metrics will be defined in.
fn add_prefix<S: Into<String>>(&self, name: S) -> Self;
/// Append any name parts to the name's namespace.
fn naming_append<S: Into<Name>>(&self, name: S) -> Name {
name.into().append(self.get_naming().clone())
fn prefix_append<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().append(self.get_prefixes().clone())
}
/// Prepend any name parts to the name's namespace.
fn naming_prepend<S: Into<Name>>(&self, name: S) -> Name {
name.into().prepend(self.get_naming().clone())
fn prefix_prepend<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().prepend(self.get_prefixes().clone())
}
}
@ -80,16 +80,16 @@ pub trait Label {
}
impl<T: WithAttributes> Naming for T {
impl<T: WithAttributes> Prefixed for T {
/// Returns namespace of component.
fn get_naming(&self) -> &NameParts {
fn get_prefixes(&self) -> &NameParts {
&self.get_attributes().naming
}
/// Adds a name part to any existing naming.
/// Return a clone of the component with the updated naming.
fn add_naming<S: Into<String>>(&self, name: S) -> Self {
fn add_prefix<S: Into<String>>(&self, name: S) -> Self {
let name = name.into();
self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone()))
}

View File

@ -6,7 +6,7 @@ use std::time::Duration;
use std::time::Instant;
use core::Value;
use core::MetricValue;
#[derive(Debug, Copy, Clone)]
/// A handle to the start time of a counter.
@ -21,13 +21,13 @@ impl TimeHandle {
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_us(self) -> Value {
pub fn elapsed_us(self) -> MetricValue {
let duration = now() - self.0;
duration.as_secs() * 1_000_000 + duration.subsec_micros() as Value
duration.as_secs() * 1_000_000 + duration.subsec_micros() as MetricValue
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_ms(self) -> Value {
pub fn elapsed_ms(self) -> MetricValue {
self.elapsed_us() / 1000
}
}

View File

@ -1,7 +1,7 @@
use core::clock::TimeHandle;
use core::{Value, Flush};
use core::name::Name;
use ::{Labels};
use core::{MetricValue, Flush};
use core::name::MetricName;
use core::label::Labels;
use std::sync::Arc;
use std::fmt;
@ -36,26 +36,26 @@ impl<T: Input + Send + Sync + 'static> InputDyn for T {
pub trait InputScope: Flush {
/// Define a generic metric of the specified type.
/// It is preferable to use counter() / marker() / timer() / gauge() methods.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric;
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric;
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.new_metric(name.into(), Kind::Counter).into()
self.new_metric(name.into(), InputKind::Counter).into()
}
/// Define a marker.
fn marker(&self, name: &str) -> Marker {
self.new_metric(name.into(), Kind::Marker).into()
self.new_metric(name.into(), InputKind::Marker).into()
}
/// Define a timer.
fn timer(&self, name: &str) -> Timer {
self.new_metric(name.into(), Kind::Timer).into()
self.new_metric(name.into(), InputKind::Timer).into()
}
/// Define a gauge.
fn gauge(&self, name: &str) -> Gauge {
self.new_metric(name.into(), Kind::Gauge).into()
self.new_metric(name.into(), InputKind::Gauge).into()
}
}
@ -63,7 +63,7 @@ pub trait InputScope: Flush {
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
inner: Arc<Fn(Value, Labels) + Send + Sync>
inner: Arc<Fn(MetricValue, Labels) + Send + Sync>
}
impl fmt::Debug for InputMetric {
@ -74,20 +74,20 @@ impl fmt::Debug for InputMetric {
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
InputMetric { inner: Arc::new(metric) }
}
/// Collect a new value for this metric.
#[inline]
pub fn write(&self, value: Value, labels: Labels) {
pub fn write(&self, value: MetricValue, labels: Labels) {
(self.inner)(value, labels)
}
}
/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Kind {
pub enum InputKind {
/// Handling one item at a time.
Marker,
/// Handling quantities or multiples.
@ -98,15 +98,15 @@ pub enum Kind {
Timer,
}
/// Used by the metrics! macro to obtain the Kind from the stringified type.
impl<'a> From<&'a str> for Kind {
fn from(s: &str) -> Kind {
/// Used by the metrics! macro to obtain the InputKind from the stringified type.
impl<'a> From<&'a str> for InputKind {
fn from(s: &str) -> InputKind {
match s {
"Marker" => Kind::Marker,
"Counter" => Kind::Counter,
"Gauge" => Kind::Gauge,
"Timer" => Kind::Timer,
_ => panic!("No Kind '{}' defined", s)
"Marker" => InputKind::Marker,
"Counter" => InputKind::Counter,
"Gauge" => InputKind::Gauge,
"Timer" => InputKind::Timer,
_ => panic!("No InputKind '{}' defined", s)
}
}
}

View File

@ -195,7 +195,7 @@ pub mod test {
ThreadLabel::set("abc", "123");
assert_eq!(Arc::new("123".into()), labels!().lookup("abc").unwrap());
ThreadLabel::unset("abc");
assert_eq!(Arc::new("456".into()), labels!().lookup("abc").unwrap());
assert_eq!(Arc::new("456".into()), labels!().lookup("abc").expect("AppLabel Value"));
AppLabel::unset("abc");
assert_eq!(false, labels!().lookup("abc").is_some());
}

View File

@ -3,7 +3,7 @@
//! This is also kept in a separate module because it is not to be exposed outside of the crate.
use core::input::{Marker, InputScope, Counter};
use core::attributes::Naming;
use core::attributes::Prefixed;
use core::proxy::Proxy;
metrics!{

View File

@ -13,7 +13,7 @@ pub mod scheduler;
pub mod metrics;
/// Base type for recorded metric values.
pub type Value = u64;
pub type MetricValue = u64;
/// Both InputScope and OutputScope share the ability to flush the recorded data.
pub trait Flush {
@ -31,10 +31,9 @@ pub mod test {
#[test]
fn test_to_void() {
let c = void::Void::metrics().input();
let m = c.new_metric("test".into(), input::Kind::Marker);
let m = c.new_metric("test".into(), input::InputKind::Marker);
m.write(33, labels![]);
}
}
#[cfg(feature = "bench")]
@ -42,7 +41,7 @@ pub mod bench {
use super::input::*;
use super::clock::*;
use super::super::aggregate::bucket::*;
use super::super::bucket::atomic::*;
use test;
#[bench]
@ -52,7 +51,7 @@ pub mod bench {
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = Bucket::new();
let metrics = AtomicBucket::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}

View File

@ -28,15 +28,15 @@ impl NameParts {
}
/// Make a name in this namespace
pub fn make_name<S: Into<String>>(&self, leaf: S) -> Name {
pub fn make_name<S: Into<String>>(&self, leaf: S) -> MetricName {
let mut nodes = self.clone();
nodes.push_back(leaf.into());
Name { nodes }
MetricName { nodes }
}
/// Extract a copy of the last name part
/// Panics if empty
pub fn short(&self) -> Name {
pub fn short(&self) -> MetricName {
self.back().expect("Short metric name").clone().into()
}
}
@ -70,11 +70,11 @@ impl DerefMut for NameParts {
/// The name of a metric, including the concatenated possible namespaces in which it was defined.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct Name {
pub struct MetricName {
nodes: NameParts,
}
impl Name {
impl MetricName {
/// Prepend to the existing namespace.
pub fn prepend<S: Into<NameParts>>(mut self, namespace: S) -> Self {
@ -101,20 +101,20 @@ impl Name {
}
}
impl<S: Into<String>> From<S> for Name {
impl<S: Into<String>> From<S> for MetricName {
fn from(name: S) -> Self {
Name { nodes: NameParts::from(name) }
MetricName { nodes: NameParts::from(name) }
}
}
impl Deref for Name {
impl Deref for MetricName {
type Target = NameParts;
fn deref(&self) -> &Self::Target {
&self.nodes
}
}
impl DerefMut for Name {
impl DerefMut for MetricName {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.nodes
}

View File

@ -1,7 +1,7 @@
use core::input::{InputScope, InputMetric, Input, Kind};
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
@ -9,7 +9,7 @@ use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Provide thread-safe locking to RawScope implementers.
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
@ -23,8 +23,8 @@ impl WithAttributes for LockingScopeBox {
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {

View File

@ -1,8 +1,8 @@
use core::{Flush, Value};
use core::input::Kind;
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::name::MetricName;
use core::void::Void;
use ::{Labels};
use core::label::Labels;
use std::rc::Rc;
@ -10,26 +10,26 @@ use std::rc::Rc;
pub trait OutputScope: Flush {
/// Define a raw metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric;
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric;
}
/// Output metrics are not thread safe.
#[derive(Clone)]
pub struct OutputMetric {
inner: Rc<Fn(Value, Labels)>
inner: Rc<Fn(MetricValue, Labels)>
}
impl OutputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Labels) + 'static>(metric: F) -> OutputMetric {
pub fn new<F: Fn(MetricValue, Labels) + 'static>(metric: F) -> OutputMetric {
OutputMetric { inner: Rc::new(metric) }
}
/// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form.
/// This shouldn't matter as metrics should be of type Counter, Marker, etc.
#[inline]
pub fn write(&self, value: Value, labels: Labels) {
pub fn write(&self, value: MetricValue, labels: Labels) {
(self.inner)(value, labels)
}
}

View File

@ -1,9 +1,9 @@
//! Decouple metric definition from configuration with trait objects.
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::{Name, NameParts};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::{MetricName, NameParts};
use core::Flush;
use core::input::{Kind, InputMetric, InputScope};
use core::input::{InputKind, InputMetric, InputScope};
use core::void::VOID_INPUT;
use core::error;
@ -26,7 +26,7 @@ lazy_static! {
struct ProxyMetric {
// basic info for this metric, needed to recreate new corresponding trait object if target changes
name: NameParts,
kind: Kind,
kind: InputKind,
// the metric trait object to proxy metric values to
// the second part can be up to namespace.len() + 1 if this metric was individually targeted
@ -174,13 +174,13 @@ impl Proxy {
/// Replace target for this proxy and it's children.
pub fn set_target<T: InputScope + Send + Sync + 'static>(&self, target: T) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.set_target(self.get_naming(), Arc::new(target));
inner.set_target(self.get_prefixes(), Arc::new(target));
}
/// Replace target for this proxy and it's children.
pub fn unset_target(&self) {
let mut inner = self.inner.write().expect("Dispatch Lock");
inner.unset_target(self.get_naming());
inner.unset_target(self.get_prefixes());
}
/// Replace target for this proxy and it's children.
@ -197,19 +197,19 @@ impl Proxy {
impl<S: AsRef<str>> From<S> for Proxy {
fn from(name: S) -> Proxy {
Proxy::new().add_naming(name.as_ref())
Proxy::new().add_prefix(name.as_ref())
}
}
impl InputScope for Proxy {
/// Lookup or create a proxy stub for the requested metric.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name: Name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name: MetricName = self.prefix_append(name);
let mut inner = self.inner.write().expect("Dispatch Lock");
let proxy = inner
.metrics
.get(&name)
// TODO validate that Kind matches existing
// TODO validate that InputKind matches existing
.and_then(|proxy_ref| Weak::upgrade(proxy_ref))
.unwrap_or_else(|| {
let namespace = &*name;
@ -235,7 +235,7 @@ impl InputScope for Proxy {
impl Flush for Proxy {
fn flush(&self) -> error::Result<()> {
self.inner.write().expect("Dispatch Lock").flush(self.get_naming())
self.inner.write().expect("Dispatch Lock").flush(self.get_prefixes())
}
}
@ -249,11 +249,11 @@ mod bench {
use super::*;
use test;
use aggregate::bucket::Bucket;
use bucket::atomic::AtomicBucket;
#[bench]
fn proxy_marker_to_aggregate(b: &mut test::Bencher) {
ROOT_PROXY.set_target(Bucket::new());
ROOT_PROXY.set_target(AtomicBucket::new());
let metric = ROOT_PROXY.marker("event_a");
b.iter(|| test::black_box(metric.mark()));
}

View File

@ -1,6 +1,6 @@
use core::output::{Output, OutputScope, OutputMetric};
use core::name::Name;
use core::input::{Kind, InputDyn, InputScope};
use core::name::MetricName;
use core::input::{InputKind, InputDyn, InputScope};
use core::Flush;
use std::sync::Arc;
@ -40,7 +40,7 @@ impl Output for Void {
}
impl OutputScope for VoidOutput {
fn new_metric(&self, _name: Name, _kind: Kind) -> OutputMetric {
fn new_metric(&self, _name: MetricName, _kind: InputKind) -> OutputMetric {
OutputMetric::new(|_value, _labels| {})
}
}

View File

@ -27,10 +27,10 @@ mod macros;
pub use macros::*;
mod core;
pub use core::{Flush, Value};
pub use core::attributes::{Naming, Sampling, Sampled, Buffered, Buffering};
pub use core::name::{Name, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, Kind};
pub use core::{Flush, MetricValue};
pub use core::attributes::{Prefixed, Sampling, Sampled, Buffered, Buffering};
pub use core::name::{MetricName, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, InputKind};
pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric};
pub use core::scheduler::{ScheduleFlush, CancelHandle};
pub use core::out_lock::{LockingScopeBox};
@ -51,9 +51,9 @@ pub use output::statsd::{Statsd, StatsdScope, StatsdMetric};
pub use output::map::{StatsMap};
pub use output::log::{Log, LogScope};
mod aggregate;
pub use aggregate::bucket::{Bucket, stats_all, stats_average, stats_summary};
pub use aggregate::scores::{ScoreType, Scoreboard};
mod bucket;
pub use bucket::{ScoreType, stats_all, stats_average, stats_summary};
pub use bucket::atomic::{AtomicBucket};
mod cache;
pub use cache::cache_in::CachedInput;

View File

@ -130,27 +130,27 @@ macro_rules! metrics {
// SUB BRANCH NODE - public identifier
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_naming($e); }
lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_prefix($e); }
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE - private identifier
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_naming($e); }
lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_prefix($e); }
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH.add_prefix($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};

View File

@ -1,9 +1,9 @@
//! Dispatch metrics to multiple sinks.
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric, InputDyn};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::error;
use std::sync::Arc;
@ -75,8 +75,8 @@ impl MultiInputScope {
}
impl InputScope for MultiInputScope {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = &self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = &self.prefix_append(name);
let metrics: Vec<InputMetric> = self.scopes.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();

View File

@ -1,9 +1,9 @@
//! Dispatch metrics to multiple sinks.
use core::Flush;
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::Kind;
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::InputKind;
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::error;
@ -76,8 +76,8 @@ impl MultiOutputScope {
}
impl OutputScope for MultiOutputScope {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = &self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = &self.prefix_append(name);
let metrics: Vec<OutputMetric> = self.scopes.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();

View File

@ -1,6 +1,6 @@
use core::name::Name;
use core::input::Kind;
use core::Value;
use core::name::MetricName;
use core::input::InputKind;
use core::MetricValue;
use self::LineOp::*;
use std::io;
@ -15,7 +15,7 @@ pub enum LineOp {
/// Print metric value as text.
ValueAsText,
/// Print metric value, divided by the given scale, as text.
ScaledValueAsText(Value),
ScaledValueAsText(f64),
/// Print the newline character.labels.lookup(key)
NewLine,
}
@ -43,7 +43,7 @@ impl From<Vec<LineOp>> for LineTemplate {
impl LineTemplate {
/// Template execution applies commands in turn, writing to the output.
pub fn print<L>(&self, output: &mut io::Write, value: Value, lookup: L) -> Result<(), io::Error>
pub fn print<L>(&self, output: &mut io::Write, value: MetricValue, lookup: L) -> Result<(), io::Error>
where L: Fn(&str) -> Option<Arc<String>>
{
for cmd in &self.ops {
@ -51,7 +51,7 @@ impl LineTemplate {
Literal(src) => output.write_all(src.as_ref())?,
ValueAsText => output.write_all(format!("{}", value).as_ref())?,
ScaledValueAsText(scale) => {
let scaled = value / scale;
let scaled = value as f64 / scale;
output.write_all(format!("{}", scaled).as_ref())?
},
NewLine => writeln!(output)?,
@ -85,7 +85,7 @@ pub trait Formatting {
pub trait LineFormat: Send + Sync {
/// Prepare a template for output of metric values.
fn template(&self, name: &Name, kind: Kind) -> LineTemplate;
fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate;
}
/// A simple metric output format of "MetricName {Value}"
@ -96,7 +96,7 @@ pub struct SimpleFormat {
}
impl LineFormat for SimpleFormat {
fn template(&self, name: &Name, _kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, _kind: InputKind) -> LineTemplate {
let mut header = name.join(".");
header.push(' ');
LineTemplate {
@ -112,12 +112,12 @@ impl LineFormat for SimpleFormat {
#[cfg(test)]
pub mod test {
use super::*;
use ::Labels;
use core::label::Labels;
pub struct TestFormat;
impl LineFormat for TestFormat {
fn template(&self, name: &Name, kind: Kind) -> LineTemplate {
fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate {
let mut header: String = format!("{:?}", kind);
header.push('/');
header.push_str(&name.join("."));
@ -127,7 +127,7 @@ pub mod test {
Literal(header.into()),
ValueAsText,
Literal(" ".into()),
ScaledValueAsText(1000),
ScaledValueAsText(1000.0),
Literal(" ".into()),
LabelExists("test_key".into(), vec![
LabelOp::LabelKey,
@ -143,9 +143,9 @@ pub mod test {
fn print_label_exists() {
let labels: Labels = labels!("test_key" => "456");
let format = TestFormat {};
let mut name = Name::from("abc");
let mut name = MetricName::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let template = format.template(&name, InputKind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |key| labels.lookup(key)).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 test_key=456\n", String::from_utf8(out).unwrap());
@ -154,9 +154,9 @@ pub mod test {
#[test]
fn print_label_not_exists() {
let format = TestFormat {};
let mut name = Name::from("abc");
let mut name = MetricName::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let template = format.template(&name, InputKind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |_key| None).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 \n", String::from_utf8(out).unwrap());

View File

@ -1,9 +1,9 @@
//! Send metrics to a graphite server.
use core::attributes::{Buffered, Attributes, WithAttributes, Naming};
use core::name::Name;
use core::{Flush, Value};
use core::input::Kind;
use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
@ -71,13 +71,13 @@ pub struct GraphiteScope {
impl OutputScope for GraphiteScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let mut prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let mut prefix = self.prefix_prepend(name).join(".");
prefix.push(' ');
let scale = match kind {
// timers are in µs, but we give graphite milliseconds
Kind::Timer => 1000,
InputKind::Timer => 1000,
_ => 1,
};
@ -99,7 +99,7 @@ impl Flush for GraphiteScope {
}
impl GraphiteScope {
fn print(&self, metric: &GraphiteMetric, value: Value) {
fn print(&self, metric: &GraphiteMetric, value: MetricValue) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
@ -195,7 +195,7 @@ mod bench {
#[bench]
pub fn immediate_graphite(b: &mut test::Bencher) {
let sd = Graphite::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
@ -204,7 +204,7 @@ mod bench {
pub fn buffering_graphite(b: &mut test::Bencher) {
let sd = Graphite::send_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}

View File

@ -1,7 +1,7 @@
use core::{Flush};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Naming};
use core::name::Name;
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed};
use core::name::MetricName;
use core::error;
use cache::cache_in;
use queue::queue_in;
@ -75,8 +75,8 @@ impl queue_in::QueuedInput for Log {}
impl cache_in::CachedInput for Log {}
impl InputScope for LogScope {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
@ -136,7 +136,7 @@ mod test {
#[test]
fn test_to_log() {
let c = super::Log::log_to().input();
let m = c.new_metric("test".into(), Kind::Marker);
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}

View File

@ -1,6 +1,6 @@
use core::{Flush, Value};
use core::input::Kind;
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::name::MetricName;
use core::output::{OutputMetric, OutputScope};
use std::rc::Rc;
@ -11,11 +11,11 @@ use std::collections::BTreeMap;
/// Every received value for a metric replaces the previous one (if any).
#[derive(Clone, Default)]
pub struct StatsMap {
inner: Rc<RefCell<BTreeMap<String, Value>>>,
inner: Rc<RefCell<BTreeMap<String, MetricValue>>>,
}
impl OutputScope for StatsMap {
fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric {
fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric {
let write_to = self.inner.clone();
let name: String = name.join(".");
OutputMetric::new(move |value, _labels| {
@ -26,7 +26,7 @@ impl OutputScope for StatsMap {
impl Flush for StatsMap {}
impl From<StatsMap> for BTreeMap<String, Value> {
impl From<StatsMap> for BTreeMap<String, MetricValue> {
fn from(map: StatsMap) -> Self {
// FIXME this is is possibly a full map copy, for nothing.
// into_inner() is what we'd really want here but would require some `unsafe`? don't know how to do this yet.

View File

@ -5,10 +5,10 @@
//! - Serve metrics with basic HTTP server
//! - Print metrics to a buffer provided by an HTTP framework.
use core::{Flush, Value};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Naming};
use core::name::Name;
use core::{Flush, MetricValue};
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
@ -47,8 +47,8 @@ impl PrometheusScope {
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, _kind: Kind) -> OutputMetric {
let mut _prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric {
let mut _prefix = self.prefix_prepend(name).join(".");
OutputMetric::new(|_value, _labels| {})
}
}

View File

@ -1,10 +1,10 @@
//! Send metrics to a statsd server.
use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Naming};
use core::name::Name;
use core::attributes::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Prefixed};
use core::name::MetricName;
use core::pcg32;
use core::{Flush, Value};
use core::input::Kind;
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
@ -78,21 +78,21 @@ impl Sampled for StatsdScope {}
impl OutputScope for StatsdScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let mut prefix = self.naming_prepend(name).join(".");
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let mut prefix = self.prefix_prepend(name).join(".");
prefix.push(':');
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match kind {
Kind::Marker | Kind::Counter => "c",
Kind::Gauge => "g",
Kind::Timer => "ms",
InputKind::Marker | InputKind::Counter => "c",
InputKind::Gauge => "g",
InputKind::Timer => "ms",
});
let scale = match kind {
// timers are in µs, statsd wants ms
Kind::Timer => 1000,
InputKind::Timer => 1000,
_ => 1,
};
@ -127,7 +127,7 @@ impl Flush for StatsdScope {
}
impl StatsdScope {
fn print(&self, metric: &StatsdMetric, value: Value) {
fn print(&self, metric: &StatsdMetric, value: MetricValue) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
@ -212,16 +212,16 @@ impl Drop for StatsdScope {
//pub struct StatsdFormat;
//
//impl Format for StatsdFormat {
// fn template(&self, name: &Name, kind: Kind) -> Template {
// fn template(&self, name: &Name, kind: InputKind) -> Template {
// let mut before_value = name.join(".");
// before_value.push(':');
//
// let mut after_value = String::with_capacity(16);
// after_value.push('|');
// after_value.push_str(match kind {
// Kind::Marker | Kind::Counter => "c",
// Kind::Gauge => "g",
// Kind::Timer => "ms",
// InputKind::Marker | InputKind::Counter => "c",
// InputKind::Gauge => "g",
// InputKind::Timer => "ms",
// });
//
// // specify sampling rate if any
@ -232,7 +232,7 @@ impl Drop for StatsdScope {
// // scale timer values
// let value_text = match kind {
// // timers are in µs, statsd wants ms
// Kind::Timer => ScaledValueAsText(1000),
// InputKind::Timer => ScaledValueAsText(1000),
// _ => ValueAsText,
// };
//
@ -258,7 +258,7 @@ mod bench {
#[bench]
pub fn immediate_statsd(b: &mut test::Bencher) {
let sd = Statsd::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
@ -267,7 +267,7 @@ mod bench {
pub fn buffering_statsd(b: &mut test::Bencher) {
let sd = Statsd::send_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}

View File

@ -3,9 +3,9 @@
// TODO parameterize templates
use core::{Flush};
use core::input::Kind;
use core::attributes::{Attributes, WithAttributes, Buffered, Naming};
use core::name::Name;
use core::input::InputKind;
use core::attributes::{Attributes, WithAttributes, Buffered, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
@ -118,8 +118,8 @@ impl<W: Write + Send + Sync + 'static> WithAttributes for TextScope<W> {
impl<W: Write + Send + Sync + 'static> Buffered for TextScope<W> {}
impl<W: Write + Send + Sync + 'static> OutputScope for TextScope<W> {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
let entries = self.entries.clone();
@ -180,13 +180,13 @@ impl<W: Write + Send + Sync + 'static> Drop for TextScope<W> {
#[cfg(test)]
mod test {
use super::*;
use core::input::Kind;
use core::input::InputKind;
use std::io;
#[test]
fn sink_print() {
let c = super::Stream::write_to(io::stdout()).output();
let m = c.new_metric("test".into(), Kind::Marker);
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}
}

View File

@ -2,14 +2,14 @@
//! Metrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{Kind, Input, InputScope, InputDyn, InputMetric};
use core::{Value, Flush};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::{InputKind, Input, InputScope, InputDyn, InputMetric};
use core::{MetricValue, Flush};
use core::metrics;
use cache::cache_in::CachedInput;
use core::error;
use ::{ Labels};
use core::label::Labels;
use std::sync::Arc;
use std::sync::mpsc;
@ -90,7 +90,7 @@ impl Input for InputQueue {
/// Async commands should be of no concerns to applications.
pub enum InputQueueCmd {
/// Send metric write
Write(InputMetric, Value, Labels),
Write(InputMetric, MetricValue, Labels),
/// Send metric flush
Flush(Arc<InputScope + Send + Sync + 'static>),
}
@ -121,8 +121,8 @@ impl WithAttributes for InputQueueScope {
}
impl InputScope for InputQueueScope {
fn new_metric(&self, name: Name, kind:Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = self.target.new_metric(name, kind);
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {

View File

@ -1,16 +1,16 @@
//! Queue metrics for write on a separate thread,
//! RawMetrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
//!
use core::attributes::{Attributes, WithAttributes, Naming};
use core::name::Name;
use core::input::{Kind, Input, InputScope, InputMetric};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output};
use core::{Value, Flush};
use core::{MetricValue, Flush};
use core::metrics;
use cache::cache_in;
use core::error;
use ::{Labels};
use core::label::Labels;
use std::rc::Rc;
use std::ops;
@ -95,7 +95,7 @@ impl Input for OutputQueue {
/// Async commands should be of no concerns to applications.
pub enum OutputQueueCmd {
/// Send metric write
Write(Arc<OutputMetric>, Value, Labels),
Write(Arc<OutputMetric>, MetricValue, Labels),
/// Send metric flush
Flush(Arc<UnsafeScope>),
}
@ -115,8 +115,8 @@ impl WithAttributes for OutputQueueScope {
}
impl InputScope for OutputQueueScope {
fn new_metric(&self, name: Name, kind:Kind) -> InputMetric {
let name = self.naming_append(name);
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = Arc::new(self.target.new_metric(name, kind));
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {