Add publish strategies

This commit is contained in:
Francis Lalonde 2017-09-28 18:17:16 -04:00
parent 9f54443487
commit 83d9a795c1
21 changed files with 459 additions and 222 deletions

31
Cargo.lock generated
View File

@ -1,16 +1,16 @@
[root]
name = "raw_sink"
version = "0.4.5-alpha.0"
version = "0.0.0"
dependencies = [
"dipstick 0.4.4-alpha.0",
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "aggregate"
name = "aggregate_print"
version = "0.0.0"
dependencies = [
"dipstick 0.4.4-alpha.0",
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -83,7 +83,7 @@ dependencies = [
name = "counter_timer_gauge"
version = "0.0.0"
dependencies = [
"dipstick 0.4.4-alpha.0",
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -92,6 +92,14 @@ name = "custom_derive"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "custom_publish"
version = "0.0.0"
dependencies = [
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "dbghelp-sys"
version = "0.2.0"
@ -103,10 +111,11 @@ dependencies = [
[[package]]
name = "dipstick"
version = "0.4.4-alpha.0"
version = "0.4.5-alpha.0"
dependencies = [
"cached 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
@ -152,6 +161,11 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lazycell"
version = "0.5.1"
@ -217,7 +231,7 @@ dependencies = [
name = "multi_print"
version = "0.0.0"
dependencies = [
"dipstick 0.4.4-alpha.0",
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -311,7 +325,7 @@ dependencies = [
name = "queued"
version = "0.0.0"
dependencies = [
"dipstick 0.4.4-alpha.0",
"dipstick 0.4.5-alpha.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -432,6 +446,7 @@ dependencies = [
"checksum futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "77d49e7de8b91b20d6fda43eea906637eff18b96702eb6b2872df8bfab1ad2b5"
"checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c9e5e58fa1a4c3b915a561a78a22ee0cac6ab97dca2504428bc1cb074375f8d5"
"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b"
"checksum libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "d1419b2939a0bc44b77feb34661583c7546b532b192feab36249ab584b86856c"
"checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b"

View File

@ -5,6 +5,7 @@ members = [
"examples/multi_outs/",
"examples/aggregate_print/",
"examples/async_print/",
"examples/custom_publish/",
"examples/raw_log/",
]
@ -33,6 +34,7 @@ log = "0.3.8"
time = "0.1"
num = "0.1.40"
error-chain = "0.10.0"
lazy_static = "0.2.9"
[features]
bench = []

111
README.md
View File

@ -1,11 +1,67 @@
dipstick
--------
# dipstick
[![Build Status](https://travis-ci.org/fralalonde/dipstick.svg?branch=master)](https://travis-ci.org/fralalonde/dipstick)
A fast and modular metrics library decoupling app instrumentation from reporting backend.
A fast and modular metrics toolkit for all Rust applications.
Similar to popular logging frameworks, but with counters and timers.
Can be configured for combined outputs (log + statsd), random sampling, local aggregation of metrics, recurrent background publication, etc.
- Does not bind application code to a single metrics implementation.
- Builds on stable Rust with minimal dependencies.
```rust
use dipstick::*;
let app_metrics = metrics(to_stdout());
app_metrics.counter("my_counter").count(3);
```
Metrics can be sent to multiple outputs at the same time.
```rust
let app_metrics = metrics((to_log("log_this:"), to_statsd("localhost:8125")));
```
Since instruments are decoupled from the backend, outputs can be swapped easily.
Metrics can be aggregated and sent periodically in the background.
```rust
use std::time::Duration;
let (to_aggregate, from_aggregate) = aggregate();
publish_every(Duration::from_secs(10), from_aggregate, to_log("last_ten_secs:"), all_stats);
let app_metrics = metrics(to_aggregate);
```
Use predefined publishing strategies `all_stats`, `summary`, `average` or roll your own.
Metrics can be statistically sampled.
```rust
let app_metrics = metrics(sample(0.001, to_statsd("localhost:8125")));
```
Metrics can be recorded asynchronously.
```rust
let app_metrics = metrics(async(to_stdout()));
```
Metric definitions can be cached to make using ad-hoc metrics faster.
```rust
let app_metrics = metrics(cache(512, to_log()));
app_metrics.gauge(format!("my_gauge_{}", 34)).value(44);
```
Timers can be used multiple ways.
```rust
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
let start = timer.start();
/* slow code here */
timer.stop(start);
timer.interval_us(123_456);
```
Related metrics can share a namespace.
```rust
let db_metrics = app_metrics.with_prefix("database.");
let db_timer = db_metrics.timer("db_timer");
let db_counter = db_metrics.counter("db_counter");
```
## Design
Dipstick's design goals are to:
@ -14,45 +70,8 @@ Dipstick's design goals are to:
- promote metrics conventions that facilitate app monitoring and maintenance
- stay out of the way in the code and at runtime (ergonomic, fast, resilient)
## Code
Here's an example showing usage of a predefined timer with closure syntax.
Each timer value is :
- Written immediately to the "app_metrics" logger.
- Sent to statsd immediately, one time out of ten (randomly sampled).
```rust
use dipstick::*;
let app_metrics = metrics((
log("app_metrics"),
sample(0.1, statsd("stats:8125"))
));
let timer = app_metrics.timer("timer_b");
let value2 = time!(timer, compute_value2());
```
In this other example, an _ad-hoc_ timer with macro syntax is used.
- Each new timer value is aggregated with the previous values.
- Aggregation tracks count, sum, max and min values (locklessly).
- Aggregated scores are written to log every 10 seconds.
- `cache(sink)` is used to prevent metrics of the same to be created multiple times.
```rust
use dipstick::*;
use std::time::Duration;
let (sink, source) = aggregate();
let app_metrics = metrics(cache(sink));
publish(source, log("last_ten_seconds")).publish_every(Duration::from_secs(10));
let value2 = time!(app_metrics.timer("timer_b"), compute_value2());
```
Other example(s?) can be found in the /examples dir.
## Performance
Predefined timers use a bit more code but are generally faster because their
initialization cost is is only paid once.
Predefined timers use a bit more code but are generally faster because their initialization cost is is only paid once.
Ad-hoc timers are redefined "inline" on each use. They are more flexible, but have more overhead because their init cost is paid on each use.
Defining a metric `cache()` reduces that cost for recurring metrics.
@ -66,13 +85,11 @@ of any kind at this point. See the following list for any potential caveats :
- dispatch scopes
- feature flags
- derive stats
- non-tokio publish scheduler
- microsecond-precision intervals
- time measurement units in metric kind (us, ms, etc.) for naming & scaling
- heartbeat metric on publish
- logger templates
- configurable aggregation
- non-aggregating buffers
- tagged / ad-hoc metrics
- framework glue (rocket, iron, gotham, indicatif, etc.)
- more tests & benchmarks
- complete doc / inline samples

View File

@ -1,5 +1,5 @@
[package]
name = "aggregate"
name = "aggregate_print"
version = "0.0.0"
workspace = "../../"

View File

@ -1,24 +1,28 @@
//! A sample application continuously aggregating metrics and
//! sending the aggregated results to the console every three seconds.
//! A sample application continuously aggregating metrics,
//! printing the summary stats every three seconds and
//! printing complete stats every 10 seconds.
#[macro_use] extern crate dipstick;
extern crate dipstick;
use std::time::Duration;
use dipstick::*;
fn main() {
// send application metrics to both aggregator and to sampling log
let (to_aggregate, from_aggregate) = aggregate();
let (to_quick_aggregate, from_quick_aggregate) = aggregate();
let (to_slow_aggregate, from_slow_aggregate) = aggregate();
let app_metrics = metrics(to_aggregate);
let app_metrics = metrics((to_quick_aggregate, to_slow_aggregate));
// schedule aggregated metrics to be printed every 3 seconds
let to_console = print();
publish_every(Duration::from_secs(3), from_quick_aggregate, to_stdout(), publish::summary);
publish_every(Duration::from_secs(3), from_aggregate, to_console);
publish_every(Duration::from_secs(10), from_slow_aggregate, to_stdout(), publish::all_stats);
let counter = app_metrics.counter("counter_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
}
}

View File

@ -12,7 +12,7 @@ use dipstick::core::{Sink, self};
fn main() {
let metrics = metrics(queue(0, print()));
let metrics = metrics(async(0, to_stdout()));
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_b");

View File

@ -8,7 +8,7 @@ use dipstick::*;
fn main() {
// for this demo, print metric values to the console
let app_metrics = metrics(print());
let app_metrics = metrics(to_stdout());
// metrics can be predefined by type and name
let counter = app_metrics.counter("counter_a");

View File

@ -0,0 +1,9 @@
[package]
name = "custom_publish"
version = "0.0.0"
workspace = "../../"
[dependencies]
dipstick = { path = '../../' }
scheduled-executor = "0.3.0"

View File

@ -0,0 +1,49 @@
//! A demonstration of customization of exported aggregated metrics.
//! Using match on origin metric kind or score type to alter publication output.
extern crate dipstick;
use std::time::Duration;
use dipstick::*;
use dipstick::aggregate::ScoreType;
use dipstick::core::Kind;
fn main() {
// send application metrics to both aggregator and to sampling log
let (to_aggregate, from_aggregate) = aggregate();
let app_metrics = metrics(to_aggregate);
// schedule aggregated metrics to be printed every 3 seconds
let to_console = to_stdout();
publish_every(Duration::from_secs(3), from_aggregate, to_console, |kind, name, score|
match kind {
// do not export gauge scores
Kind::Gauge => None,
_ => match score {
// prepend and append to metric name
ScoreType::HitCount(hit) => Some((Kind::Counter, vec!["name customized_with_prefix:", &name, " and a suffix: "], hit)),
// scaling the score value and appending unit to name
ScoreType::SumOfValues(sum) => Some((kind, vec![&name, "_millisecond"], sum * 1000)),
// using the unmodified metric name
ScoreType::AverageValue(avg) => Some((kind, vec![&name], avg)),
_ => None /* do not export min and max */
}
}
);
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_b");
let gauge = app_metrics.gauge("gauge_c");
loop {
counter.count(11);
timer.interval_us(654654);
gauge.value(3534);
}
}

View File

@ -7,9 +7,9 @@ use dipstick::*;
fn main() {
let metrics = metrics(
//! Metric caching allows re-use of the counter, skipping cost of redefining it on each use.
cache(1, (
statsd("localhost:8125", "myapp.").expect("Could not connect to statsd"),
print())));
cache(12, (
to_statsd("localhost:8125", "myapp.").expect("Could not connect to statsd"),
to_stdout())));
loop {
metrics.counter("counter_a").count(123);

View File

@ -13,7 +13,7 @@ fn main() {
pub fn raw_write() {
// setup dual metric channels
let metrics_log = log("metrics");
let metrics_log = to_log("metrics");
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(Kind::Counter, "count_a", FULL_SAMPLING_RATE);

View File

@ -1,7 +1,8 @@
//! Maintain aggregated metrics for deferred reporting,
use core::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::sync::{Arc, RwLock};
use std::usize;
@ -43,31 +44,24 @@ enum InnerScores {
},
}
/// To-be-published snapshot of aggregated score values.
#[derive(Debug, Clone, Copy)]
pub enum ScoresSnapshot {
/// No data was reported (yet) for this metric.
NoData,
/// Simple score for event counters
Event {
/// Number of times the metric was used.
hit: u64
},
/// Score structure for counters, timers and gauges.
Value {
/// Number of times the metric was used.
hit: u64,
/// Sum of metric values reported.
sum: u64,
/// Biggest value reported.
max: u64,
/// Smallest value reported.
min: u64,
},
/// Possibly aggregated scores.
pub enum ScoreType {
/// Number of times the metric was used.
HitCount(u64),
/// Sum of metric values reported.
SumOfValues(u64),
/// Biggest value reported.
MaximumValue(u64),
/// Smallest value reported.
MinimumValue(u64),
/// Approximative average value (hit count / sum, non-atomic)
AverageValue(u64),
}
/// To-be-published snapshot of aggregated score values for a metric.
pub type ScoresSnapshot = Vec<ScoreType>;
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
@ -85,13 +79,13 @@ pub struct MetricScores {
/// Retries until success or clear loss to concurrent update.
#[inline]
fn compare_and_swap<F>(counter: &AtomicUsize, new_value: usize, retry: F) where F: Fn(usize) -> bool {
let mut loaded = counter.load(Ordering::Acquire);
let mut loaded = counter.load(Acquire);
while retry(loaded) {
if counter.compare_and_swap(loaded, new_value, Ordering::Release) == new_value {
if counter.compare_and_swap(loaded, new_value, Release) == new_value {
// success
break;
}
loaded = counter.load(Ordering::Acquire);
loaded = counter.load(Acquire);
}
}
@ -100,39 +94,58 @@ impl MetricScores {
pub fn write(&self, value: usize) -> () {
match &self.score {
&InnerScores::Event { ref hit, .. } => {
hit.fetch_add(1, Ordering::SeqCst);
hit.fetch_add(1, SeqCst);
}
&InnerScores::Value { ref hit, ref sum, ref max, ref min, .. } => {
compare_and_swap(max, value, |loaded| value > loaded);
compare_and_swap(min, value, |loaded| value < loaded);
sum.fetch_add(value, Ordering::Acquire);
sum.fetch_add(value, Acquire);
// TODO report any concurrent updates / resets for measurement of contention
hit.fetch_add(1, Ordering::Acquire);
hit.fetch_add(1, Acquire);
}
}
}
/// reset aggregate values, return previous values
pub fn read_and_reset(&self) -> ScoresSnapshot {
let mut snapshot = Vec::new();
match self.score {
InnerScores::Event { ref hit } => {
match hit.swap(0, Ordering::Release) as u64 {
0 => ScoresSnapshot::NoData,
hit => ScoresSnapshot::Event { hit }
match hit.swap(0, Release) as u64 {
// hit count is the only meaningful metric for markers
// rate could be nice too but we don't time-derived (yet)
hit if hit > 0 => snapshot.push(ScoreType::HitCount(hit)),
_ => {}
}
}
InnerScores::Value { ref hit, ref sum, ref max, ref min, .. } => {
match hit.swap(0, Ordering::Release) as u64 {
0 => ScoresSnapshot::NoData,
hit => ScoresSnapshot::Value {
hit,
sum: sum.swap(0, Ordering::Release) as u64,
max: max.swap(usize::MIN, Ordering::Release) as u64,
min: min.swap(usize::MAX, Ordering::Release) as u64,
}
match hit.swap(0, Release) as u64 {
hit if hit > 0 => {
let sum = sum.swap(0, Release) as u64;
match self.kind {
Kind::Gauge => {
// sum and hit are meaningless for Gauge metrics
},
_ => {
snapshot.push(ScoreType::HitCount(hit));
snapshot.push(ScoreType::SumOfValues(sum));
}
}
// NOTE best-effort averaging
// - hit and sum are not incremented nor read as one
// - integer division is not rounding
// assuming values will still be good enough to be useful
snapshot.push(ScoreType::AverageValue(sum / hit));
snapshot.push(ScoreType::MaximumValue(max.swap(usize::MIN, Release) as u64));
snapshot.push(ScoreType::MinimumValue(min.swap(usize::MAX, Release) as u64));
},
_ => {}
}
}
}
snapshot
}
}
@ -187,6 +200,8 @@ impl AsSink<Aggregate, AggregateSink> for Aggregator {
}
}
/// The type of metric created by the AggregateSink.
/// Each Aggregate
pub type Aggregate = Arc<MetricScores>;
/// A sink where to send metrics for aggregation.

View File

@ -4,6 +4,8 @@
// TODO option to drop metrics when queue full
use core::*;
use selfmetrics::*;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
@ -11,7 +13,7 @@ use std::thread;
/// Cache metrics to prevent them from being re-defined on every use.
/// Use of this should be transparent, this has no effect on the values.
/// Stateful sinks (i.e. Aggregate) may naturally cache their definitions.
pub fn queue<M, S>(queue_size: usize, sink: S) -> MetricQueue<M, S>
pub fn async<M, S>(queue_size: usize, sink: S) -> MetricQueue<M, S>
where M: 'static + Send + Sync, S: Sink<M>
{
let (sender, receiver) = mpsc::sync_channel::<QueueCommand<M>>(queue_size);
@ -27,6 +29,13 @@ pub fn queue<M, S>(queue_size: usize, sink: S) -> MetricQueue<M, S>
MetricQueue { next_sink: sink, sender }
}
lazy_static! {
static ref QUEUE_METRICS: AppMetrics<Aggregate, AggregateSink> =
SELF_METRICS.with_prefix("async.");
static ref SEND_FAILED: Marker<Aggregate> = QUEUE_METRICS.marker("send_failed");
}
/// Thread safe sender to the queue
pub type QueueSender<M> = mpsc::SyncSender<QueueCommand<M>>;
@ -68,7 +77,10 @@ impl<M, S> Sink<Arc<M>> for MetricQueue<M, S> where S: Sink<M>, M: 'static + Sen
sender.send(QueueCommand {
cmd: send_cmd,
next_scope: next_scope.clone(),
}).unwrap_or_else(|e| { /* TODO dropping queue command, record fault in selfstats */})
}).unwrap_or_else(|e| {
SEND_FAILED.mark();
trace!("Async metrics could not be sent: {}", e);
})
})
}
}

View File

@ -43,13 +43,13 @@ pub const FULL_SAMPLING_RATE: Rate = 1.0;
/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone)]
pub enum Kind {
/// Was one item handled?
/// Handling one item at a time.
Marker,
/// How many items were handled?
/// Handling quantities or multiples.
Counter,
/// How much are we using or do we have left?
/// Reporting instant measurement of a resource at a point in time.
Gauge,
/// How long did this take?
/// Measuring a time interval, internal to the app or provided by an external source.
Timer,
}

View File

@ -28,6 +28,7 @@ extern crate error_chain;
extern crate time;
extern crate cached;
extern crate num;
#[macro_use] extern crate lazy_static;
mod pcg32;
@ -51,8 +52,10 @@ pub mod publish;
pub mod statsd;
pub mod cache;
pub mod multi;
pub mod queue;
pub mod async;
pub mod fnsink;
pub mod schedule;
pub mod selfmetrics;
// input
pub use app::metrics;
@ -61,7 +64,7 @@ pub use app::metrics;
pub use fnsink::make_sink;
// buffering
pub use queue::queue;
pub use async::async;
// transform
pub use cache::cache;
@ -69,8 +72,8 @@ pub use sampling::sample;
// pack + forward
pub use aggregate::aggregate;
pub use publish::{publish, publish_every};
pub use publish::{publish, publish_every, all_stats, summary, average};
// output
pub use output::{log, print};
pub use statsd::statsd;
pub use output::{to_log, to_stdout};
pub use statsd::to_statsd;

View File

@ -1,27 +1,26 @@
//! Standard stateless metric outputs.
use core::*;
use core::Scope;
use fnsink::*;
/// Write metric values to stdout using `println!`.
pub fn print() -> FnSink<String> {
make_sink(|k, n, r| format!("{:?} {} {}", k, n, r),
pub fn to_stdout() -> FnSink<String> {
make_sink(|_, name, _| String::from(name),
|cmd| if let Scope::Write(m, v) = cmd {
println!("{}: {}", m, v)
})
}
/// Write metric values to the standard log using `info!`.
pub fn log<STR: AsRef<str> + 'static + Send + Sync>(prefix: STR) -> FnSink<String> {
make_sink(move |k, n, r| format!("{}{:?} {} {}", prefix.as_ref(), k, n, r),
pub fn to_log<STR: AsRef<str> + 'static + Send + Sync>(prefix: STR) -> FnSink<String> {
make_sink(move |_, name, _| [prefix.as_ref(), name].concat(),
|cmd| if let Scope::Write(m, v) = cmd {
info!("{}: {}", m, v)
})
}
/// Special sink that discards all metric values sent to it.
pub fn void<STR: AsRef<str> + 'static + Send + Sync>(prefix: STR) -> FnSink<String> {
make_sink(move |k, n, r| format!("{}{:?} {} {}", prefix.as_ref(), k, n, r),
|_| {})
pub fn to_void() -> FnSink<String> {
make_sink(move |_, name, _| String::from(name), |_| {})
}
mod test {
@ -29,14 +28,21 @@ mod test {
#[test]
fn sink_print() {
let c = super::print();
let c = super::to_stdout();
let m = c.new_metric(Kind::Marker, "test", 1.0);
c.new_scope()(Scope::Write(&m, 33));
}
#[test]
fn log_print() {
let c = super::log("log prefix");
fn test_to_log() {
let c = super::to_log("log prefix");
let m = c.new_metric(Kind::Marker, "test", 1.0);
c.new_scope()(Scope::Write(&m, 33));
}
#[test]
fn test_to_void() {
let c = super::to_void();
let m = c.new_metric(Kind::Marker, "test", 1.0);
c.new_scope()(Scope::Write(&m, 33));
}

View File

@ -1,94 +1,131 @@
//! Publishes metrics values from a source to a sink.
//!
//! Publishing can be done on request:
//! ```
//! use dipstick::*;
//!
//! let (sink, source) = aggregate();
//! publish(&source, &log("aggregated"));
//! publish(&source, &log("aggregated"), publish::all_stats);
//! ```
//!
//! Publishing can be scheduled to run recurrently.
//! ```
//! use dipstick::*;
//! use std::time::Duration;
//!
//! let (sink, source) = aggregate();
//! let job = publish_every(Duration::from_millis(100), &source, &log("aggregated"), publish::all_stats);
//! // publish will go on until cancelled
//! job.cancel();
//! ```
use core::*;
use aggregate::{AggregateSource, ScoresSnapshot};
use core::Kind::*;
use aggregate::{AggregateSource, ScoreType};
use aggregate::ScoreType::*;
use std::time::Duration;
use std::thread;
fn schedule<F>(every: Duration, operation: F)
where F: Fn() -> () + Send + 'static
{
thread::spawn(move || {
loop {
thread::sleep(every);
// TODO add cancel
operation();
}
});
}
use schedule::{schedule, CancelHandle};
/// Schedules the publisher to run at recurrent intervals
pub fn publish_every<M, S>(duration: Duration, source: AggregateSource, target: S)
where S: Sink<M> + 'static + Send + Sync, M: Send + Sync
pub fn publish_every<E, M, S>(duration: Duration, source: AggregateSource, target: S, export: E) -> CancelHandle
where S: Sink<M> + 'static + Send + Sync, M: Send + Sync, E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static
{
schedule(duration, move || publish(&source, &target))
schedule(duration, move || {
let scope = target.new_scope();
source.for_each(|metric| {
let snapshot = metric.read_and_reset();
if snapshot.is_empty() {
// no data was collected for this period
// TODO repeat previous frame min/max ?
// TODO update some canary metric ?
} else {
for score in snapshot {
if let Some(ex) = export(metric.kind, &metric.name, score) {
let temp_metric = target.new_metric(ex.0, &ex.1.concat(), 1.0);
scope(Scope::Write(&temp_metric, ex.2));
}
}
}
})
})
}
/// Define and write metrics from aggregated scores to the target channel
/// If this is called repeatedly it can be a good idea to use the metric cache
/// to prevent new metrics from being created every time.
pub fn publish<M, S>(source: &AggregateSource, target: &S)
where S: Sink<M>, M: Send + Sync {
pub fn publish<E, M, S>(source: &AggregateSource, target: &S, export: E)
where S: Sink<M>, M: Send + Sync, E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static
{
let scope = target.new_scope();
source.for_each(|metric| {
match metric.read_and_reset() {
ScoresSnapshot::NoData => {
// TODO repeat previous frame min/max ?
// TODO update some canary metric ?
}
ScoresSnapshot::Event { hit } => {
let name = format!("{}.hit", &metric.name);
let temp_metric = target.new_metric(Kind::Counter, &name, 1.0);
scope(Scope::Write(&temp_metric, hit));
}
ScoresSnapshot::Value { hit, sum, max, min } => {
if hit > 0 {
match &metric.kind {
&Kind::Counter |
&Kind::Timer |
&Kind::Gauge => {
// NOTE best-effort averaging
// - hit and sum are not incremented nor read as one
// - integer division is not rounding
// assuming values will still be good enough to be useful
let name = format!("{}.avg", &metric.name);
let temp_metric = target.new_metric(metric.kind, &name, 1.0);
scope(Scope::Write(&temp_metric, sum / hit));
}
_ => (),
}
match &metric.kind {
// do not report gauges sum and hit, they are meaningless
&Kind::Counter |
&Kind::Timer => {
let name = format!("{}.sum", &metric.name);
let temp_metric = target.new_metric(metric.kind, &name, 1.0);
scope(Scope::Write(&temp_metric, sum));
let name = format!("{}.hit", &metric.name);
let temp_metric = target.new_metric(metric.kind, &name, 1.0);
scope(Scope::Write(&temp_metric, hit));
}
_ => (),
}
let name = format!("{}.max", &metric.name);
let temp_metric = target.new_metric(Kind::Gauge, &name, 1.0);
scope(Scope::Write(&temp_metric, max));
let name = format!("{}.min", &metric.name);
let temp_metric = target.new_metric(Kind::Gauge, &name, 1.0);
scope(Scope::Write(&temp_metric, min));
let snapshot = metric.read_and_reset();
if snapshot.is_empty() {
// no data was collected for this period
// TODO repeat previous frame min/max ?
// TODO update some canary metric ?
} else {
for score in snapshot {
if let Some(ex) = export(metric.kind, &metric.name, score) {
let temp_metric = target.new_metric(ex.0, &ex.1.concat(), 1.0);
scope(Scope::Write(&temp_metric, ex.2));
}
}
}
})
}
/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
pub fn all_stats(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
match score {
HitCount(hit) => Some((Counter, vec![name, ".hit"], hit)),
SumOfValues(sum) => Some((kind, vec![name, ".sum"], sum)),
AverageValue(avg) => Some((kind, vec![name, ".avg"], avg)),
MaximumValue(max) => Some((Gauge, vec![name, ".max"], max)),
MinimumValue(min) => Some((Gauge, vec![name, ".min"], min)),
}
}
/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
///
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
pub fn average(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
match kind {
Marker => match score {
HitCount(hit) => Some((Counter, vec![name], hit)),
_ => None
},
_ => {
match score {
AverageValue(avg) => Some((Gauge, vec![name], avg)),
_ => None
}
}
}
}
/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
///
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
match kind {
Marker => match score {
HitCount(hit) => Some((Counter, vec![name], hit)),
_ => None
},
Counter | Timer => match score {
SumOfValues(sum) => Some((kind, vec![name], sum)),
_ => None
},
Gauge => match score {
AverageValue(avg) => Some((Gauge, vec![name], avg)),
_ => None
},
}
}

View File

@ -1,11 +0,0 @@
use std::time::Duration;
use std::thread;
pub fn schedule(every: Duration, operation: F) -> JoinHandle<()>
where F: Fn() -> ()
{
thread::spawn(|| {
thread::sleep(every);
operation
})
}

46
src/schedule.rs Normal file
View File

@ -0,0 +1,46 @@
//! Task scheduling facilities.
use std::time::Duration;
use std::thread;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
/// A handle that allows cancelling a scheduled task.
#[derive(Clone)]
pub struct CancelHandle (Arc<AtomicBool>);
impl CancelHandle {
fn new() -> CancelHandle {
CancelHandle(Arc::new(AtomicBool::new(false)))
}
/// Signals the task to stop.
pub fn cancel(&self) {
self.0.store(true, SeqCst);
}
fn is_cancelled(&self) -> bool {
self.0.load(SeqCst)
}
}
/// Schedule a task to run periodically.
/// Starts a new thread for every task.
pub fn schedule<F>(every: Duration, operation: F) -> CancelHandle
where F: Fn() -> () + Send + 'static
{
let handle = CancelHandle::new();
let inner_handle = handle.clone();
thread::spawn(move || {
loop {
thread::sleep(every);
if inner_handle.is_cancelled() {
break
}
operation();
}
});
handle
}

20
src/selfmetrics.rs Normal file
View File

@ -0,0 +1,20 @@
//! Internal Dipstick metrics.
//! Collect statistics about various metrics modules at runtime.
//! Stats can can be obtained for publication from `selfstats::SOURCE`.
pub use app::*;
pub use aggregate::*;
lazy_static! {
/// Central metric storage
static ref AGGREGATE: (AggregateSink, AggregateSource) = aggregate();
/// Source of dipstick inner metrics, for eventual publication.
pub static ref SOURCE: AggregateSource = AGGREGATE.1.clone();
/// Application metrics are collected to the aggregator
pub static ref SELF_METRICS: AppMetrics<Aggregate, AggregateSink> =
metrics(AGGREGATE.0.clone()).with_prefix("dipstick.");
}

View File

@ -1,15 +1,15 @@
//! Send metrics to a statsd server.
use ::core::*;
use ::error;
use core::*;
use error;
use selfmetrics::*;
use std::net::UdpSocket;
use std::sync::{Arc,RwLock};
pub use std::net::ToSocketAddrs;
/// Send metrics to a statsd server at the address and port provided.
pub fn statsd<STR, ADDR>(address: ADDR, prefix: STR) -> error::Result<StatsdSink>
pub fn to_statsd<STR, ADDR>(address: ADDR, prefix: STR) -> error::Result<StatsdSink>
where STR: Into<String>, ADDR: ToSocketAddrs
{
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?); // NB: CLOEXEC by default
@ -23,6 +23,14 @@ pub fn statsd<STR, ADDR>(address: ADDR, prefix: STR) -> error::Result<StatsdSink
})
}
lazy_static! {
static ref STATSD_METRICS: AppMetrics<Aggregate, AggregateSink> =
SELF_METRICS.with_prefix("statsd.");
static ref SEND_ERR: Marker<Aggregate> = STATSD_METRICS.marker("send_failed");
static ref SENT_BYTES: Counter<Aggregate> = STATSD_METRICS.counter("sent_bytes");
}
/// Key of a statsd metric.
#[derive(Debug)]
pub struct StatsdMetric {
@ -34,12 +42,13 @@ pub struct StatsdMetric {
/// Use a safe maximum size for UDP to prevent fragmentation.
const MAX_UDP_PAYLOAD: usize = 576;
/// Wrapped buffer & socket as one so that any remainding data can be flushed on Drop.
/// Wrapped string buffer & socket as one.
struct ScopeBuffer {
str: String,
socket: Arc<UdpSocket>,
}
/// Any remaining buffered data is flushed on Drop.
impl Drop for ScopeBuffer {
fn drop(&mut self) {
self.flush()
@ -49,11 +58,15 @@ impl Drop for ScopeBuffer {
impl ScopeBuffer {
fn flush(&mut self) {
debug!("statsd sending {} bytes", self.str.len());
// TODO check for and report any send() error
match self.socket.send(self.str.as_bytes()) {
Ok(size) => { /* TODO update selfstats */ },
Err(e) => { /* TODO metric faults */ }
Ok(size) => {
SENT_BYTES.count(size);
trace!("Sent {} bytes to statsd", self.str.len());
},
Err(e) => {
SEND_ERR.mark();
debug!("Failed to send packet to statsd: {}", e);
}
};
self.str.clear();
}