Add MeanRate to aggregator stats, refactor scoreboard, Average is f64

This commit is contained in:
Francis Lalonde 2017-12-08 13:18:59 -05:00
parent 1a897b971c
commit 372f380ce9
6 changed files with 155 additions and 130 deletions

View File

@ -18,8 +18,9 @@ Run benchmarks with `cargo +nightly bench --features bench`.
Although already usable, Dipstick is still under heavy development and makes no guarantees
of any kind at this point. See the following list for any potential caveats :
- META turn TODOs into GitHub issues
- fix rustdocs https://docs.rs/dipstick/0.4.15/dipstick/
- generic publisher / sources
- feature flags
- add feature flags
- time measurement units in metric kind (us, ms, etc.) for naming & scaling
- heartbeat metric on publish
- logger templates

View File

@ -41,7 +41,7 @@ fn main() {
),
// using the unmodified metric name
ScoreType::AverageValue(avg) => Some((kind, vec![&name], avg)),
ScoreType::AverageValue(avg) => Some((kind, vec![&name], avg.round() as u64)),
_ => None, /* do not export min and max */
}
}

View File

@ -2,10 +2,10 @@
use std::collections::HashMap;
use core::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use core::Kind::*;
use std::sync::{Arc, RwLock};
use std::usize;
use self::ScoreType::*;
use scores::*;
/// Aggregate metrics in memory.
/// Depending on the type of metric, count, sum, minimum and maximum of values will be tracked.
@ -25,24 +25,6 @@ pub fn aggregate() -> (AggregateSink, AggregateSource) {
(agg.as_sink(), agg.as_source())
}
/// Core aggregation structure for a single metric.
/// Hit count is maintained for all types.
/// If hit count is zero, then no values were recorded.
#[derive(Debug)]
enum InnerScores {
/// Event metrics need not record more than a hit count.
Event { hit: AtomicUsize },
/// Value metrics keep track of key highlights.
Value {
hit: AtomicUsize,
sum: AtomicUsize,
max: AtomicUsize,
min: AtomicUsize,
},
}
#[derive(Debug, Clone, Copy)]
/// Possibly aggregated scores.
pub enum ScoreType {
@ -55,12 +37,11 @@ pub enum ScoreType {
/// Smallest value reported.
MinimumValue(u64),
/// Approximative average value (hit count / sum, non-atomic)
AverageValue(u64),
AverageValue(f64),
/// Approximative mean rate (hit count / period length in seconds, non-atomic)
MeanRate(f64),
}
/// To-be-published snapshot of aggregated score values for a metric.
pub type ScoresSnapshot = Vec<ScoreType>;
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
@ -71,97 +52,47 @@ pub struct MetricScores {
/// The metric's name.
pub name: String,
score: InnerScores,
}
/// Spinlock update of max and min values.
/// Retries until success or clear loss to concurrent update.
#[inline]
fn compare_and_swap<F>(counter: &AtomicUsize, new_value: usize, retry: F)
where
F: Fn(usize) -> bool,
{
let mut loaded = counter.load(Acquire);
while retry(loaded) {
if counter.compare_and_swap(loaded, new_value, Release) == new_value {
// success
break;
}
loaded = counter.load(Acquire);
}
scores: Scoreboard,
}
impl MetricScores {
/// Update scores with new value
pub fn write(&self, value: usize) -> () {
match &self.score {
&InnerScores::Event { ref hit, .. } => {
hit.fetch_add(1, SeqCst);
}
&InnerScores::Value {
ref hit,
ref sum,
ref max,
ref min,
..
} => {
compare_and_swap(max, value, |loaded| value > loaded);
compare_and_swap(min, value, |loaded| value < loaded);
sum.fetch_add(value, Acquire);
// TODO report any concurrent updates / resets for measurement of contention
hit.fetch_add(1, Acquire);
}
}
/// Update aggregated values
pub fn write(&self, value: Value) {
self.scores.update(value)
}
/// reset aggregate values, return previous values
pub fn read_and_reset(&self) -> ScoresSnapshot {
/// Reset aggregate values, return previous values
/// To-be-published snapshot of aggregated score values for a metric.
pub fn read_and_reset(&self) -> Vec<ScoreType> {
let (values, now) = self.scores.reset();
// if hit count is zero, then no values were recorded.
if values.hit_count() == 0 { return vec![] }
let mut snapshot = Vec::new();
match self.score {
InnerScores::Event { ref hit } => {
match hit.swap(0, Release) as u64 {
// hit count is the only meaningful metric for markers
// rate could be nice too but we don't time-derived (yet)
hit if hit > 0 => snapshot.push(ScoreType::HitCount(hit)),
_ => {}
}
}
InnerScores::Value {
ref hit,
ref sum,
ref max,
ref min,
..
} => {
match hit.swap(0, Release) as u64 {
hit if hit > 0 => {
let sum = sum.swap(0, Release) as u64;
let mean_rate = values.hit_count() as f64 /
((values.start_time_ns() - now) as f64 / 1_000_000_000.0);
match self.kind {
Marker => {
snapshot.push(HitCount(values.hit_count()));
snapshot.push(MeanRate(mean_rate))
},
Gauge => {
snapshot.push(MaximumValue(values.max()));
snapshot.push(MinimumValue(values.min()));
},
Timer | Counter => {
snapshot.push(HitCount(values.hit_count()));
snapshot.push(SumOfValues(values.sum()));
match self.kind {
Kind::Gauge => {
// sum and hit are meaningless for Gauge metrics
}
_ => {
snapshot.push(ScoreType::HitCount(hit));
snapshot.push(ScoreType::SumOfValues(sum));
}
}
// NOTE best-effort averaging
// - hit and sum are not incremented nor read as one
// - integer division is not rounding
// assuming values will still be good enough to be useful
snapshot.push(ScoreType::AverageValue(sum / hit));
snapshot.push(ScoreType::MaximumValue(
max.swap(usize::MIN, Release) as u64,
));
snapshot.push(ScoreType::MinimumValue(
min.swap(usize::MAX, Release) as u64,
));
}
_ => {}
}
}
snapshot.push(MaximumValue(values.max()));
snapshot.push(MinimumValue(values.min()));
// NOTE following derived metrics are a computed as a best-effort between atomics
// NO GUARANTEES
snapshot.push(AverageValue(values.sum() as f64 / values.hit_count() as f64));
snapshot.push(MeanRate(mean_rate))
},
}
snapshot
}
@ -173,6 +104,7 @@ pub struct AggregateSource(Arc<RwLock<HashMap<String, Arc<MetricScores>>>>);
impl AggregateSource {
/// Iterate over every aggregated metric.
// TODO impl Iterator
pub fn for_each<F>(&self, ops: F)
where
F: Fn(&MetricScores),
@ -249,6 +181,7 @@ pub type Aggregate = Arc<MetricScores>;
#[derive(Debug, Clone)]
pub struct AggregateSink(Arc<RwLock<HashMap<String, Aggregate>>>);
impl Sink<Aggregate> for AggregateSink {
#[allow(unused_variables)]
fn new_metric(&self, kind: Kind, name: &str, sampling: Rate) -> Aggregate {
@ -256,38 +189,29 @@ impl Sink<Aggregate> for AggregateSink {
Arc::new(MetricScores {
kind,
name: name.to_string(),
score: match kind {
Kind::Marker => InnerScores::Event { hit: AtomicUsize::new(0) },
_ => InnerScores::Value {
hit: AtomicUsize::new(0),
sum: AtomicUsize::new(0),
max: AtomicUsize::new(usize::MIN),
min: AtomicUsize::new(usize::MAX),
},
}
scores: Scoreboard::new()
})).clone()
}
#[allow(unused_variables)]
fn new_scope(&self, auto_flush: bool) -> ScopeFn<Aggregate> {
Arc::new(|cmd| match cmd {
Scope::Write(metric, value) => metric.write(value as usize),
Scope::Write(metric, value) => metric.write(value),
Scope::Flush => {}
})
}
}
#[cfg(feature = "bench")]
mod microbench {
mod bench {
use super::*;
use ::*;
use test;
#[bench]
fn time_bench_write_event(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Kind::Marker, &"event_a", 1.0);
let metric = sink.new_metric(Marker, &"event_a", 1.0);
let scope = sink.new_scope(false);
b.iter(|| test::black_box(scope(Scope::Write(&metric, 1))));
}
@ -296,7 +220,7 @@ mod microbench {
#[bench]
fn time_bench_write_count(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Kind::Counter, &"count_a", 1.0);
let metric = sink.new_metric(Counter, &"count_a", 1.0);
let scope = sink.new_scope(false);
b.iter(|| test::black_box(scope(Scope::Write(&metric, 1))));
}
@ -304,14 +228,14 @@ mod microbench {
#[bench]
fn time_bench_read_event(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Kind::Marker, &"marker_a", 1.0);
let metric = sink.new_metric(Marker, &"marker_a", 1.0);
b.iter(|| test::black_box(metric.read_and_reset()));
}
#[bench]
fn time_bench_read_count(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Kind::Counter, &"count_a", 1.0);
let metric = sink.new_metric(Counter, &"count_a", 1.0);
b.iter(|| test::black_box(metric.read_and_reset()));
}

View File

@ -158,6 +158,7 @@ pub use scope_metrics::*;
mod sample;
pub use sample::*;
mod scores;
mod aggregate;
pub use aggregate::*;

View File

@ -78,9 +78,10 @@ pub fn all_stats(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<
match score {
HitCount(hit) => Some((Counter, vec![name, ".hit"], hit)),
SumOfValues(sum) => Some((kind, vec![name, ".sum"], sum)),
AverageValue(avg) => Some((kind, vec![name, ".avg"], avg)),
AverageValue(avg) => Some((kind, vec![name, ".avg"], avg.round() as Value)),
MaximumValue(max) => Some((Gauge, vec![name, ".max"], max)),
MinimumValue(min) => Some((Gauge, vec![name, ".min"], min)),
MeanRate(rate) => Some((Gauge, vec![name, ".rate"], rate.round() as Value))
}
}
@ -99,7 +100,7 @@ pub fn average(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s
}
_ => {
match score {
AverageValue(avg) => Some((Gauge, vec![name], avg)),
AverageValue(avg) => Some((Gauge, vec![name], avg.round() as Value)),
_ => None,
}
}
@ -129,7 +130,7 @@ pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s
}
Gauge => {
match score {
AverageValue(avg) => Some((Gauge, vec![name], avg)),
AverageValue(avg) => Some((Gauge, vec![name], avg.round() as Value)),
_ => None,
}
}

98
src/scores.rs Normal file
View File

@ -0,0 +1,98 @@
use time;
use std::mem;
use core::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::usize;
#[derive(Debug)]
pub struct Scoreboard {
scores: [AtomicUsize; 5]
}
impl Scoreboard {
pub fn new() -> Self {
unsafe {
// AtomicUsize does not impl Default
let scores: [AtomicUsize; 5] = mem::uninitialized();
scores[1].store(0, SeqCst);
scores[2].store(0, SeqCst);
scores[3].store(usize::MIN, SeqCst);
scores[4].store(usize::MAX, SeqCst);
scores[0].store(time::precise_time_ns() as usize, SeqCst);
Scoreboard {
scores
}
}
}
pub fn reset(&self) -> (Snapshot, u64) {
let snapshot = Snapshot{ scores: [0;5] };
// SNAPSHOTS OF ATOMICS IN PROGRESS, HANG TIGHT
for i in 0..5 {
self.scores[i].swap(snapshot.scores[i], Release);
}
// END OF ATOMICS SNAPSHOT, YOU CAN RELAX NOW
(snapshot, self.scores[0].load(SeqCst) as u64)
}
/// Update scores with new value
pub fn update(&self, value: Value) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[1].fetch_add(1, Acquire);
self.scores[2].fetch_add(value, Acquire);
Scoreboard::swap_if_more(&self.scores[3], value);
Scoreboard::swap_if_less(&self.scores[4], value);
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_more(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current < new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
}
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_less(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current > new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
}
}
}
pub struct Snapshot {
scores: [usize; 5]
}
impl Snapshot {
pub fn start_time_ns(&self) -> Value {
self.scores[0] as Value
}
pub fn hit_count(&self) -> Value {
self.scores[1] as Value
}
pub fn sum(&self) -> Value {
self.scores[2] as Value
}
pub fn max(&self) -> Value {
self.scores[3] as Value
}
pub fn min(&self) -> Value {
self.scores[4] as Value
}
}