Before Aggregate refactor

This commit is contained in:
Francis Lalonde 2017-12-11 11:25:53 -05:00
parent 2efb905369
commit c8aa0d2a90
5 changed files with 122 additions and 167 deletions

View File

@ -25,7 +25,7 @@ fn main() {
_ => {
match score {
// prepend and append to metric name
ScoreType::HitCount(hit) => Some((
ScoreType::Count(hit) => Some((
Kind::Counter,
vec![
"name customized_with_prefix:",
@ -36,12 +36,12 @@ fn main() {
)),
// scaling the score value and appending unit to name
ScoreType::SumOfValues(sum) => Some(
ScoreType::Sum(sum) => Some(
(kind, vec![&name, "_millisecond"], sum * 1000),
),
// using the unmodified metric name
ScoreType::AverageValue(avg) => Some((kind, vec![&name], avg.round() as u64)),
ScoreType::Mean(avg) => Some((kind, vec![&name], avg.round() as u64)),
_ => None, /* do not export min and max */
}
}

View File

@ -2,9 +2,7 @@
use std::collections::HashMap;
use core::*;
use core::Kind::*;
use std::sync::{Arc, RwLock};
use self::ScoreType::*;
use scores::*;
/// Aggregate metrics in memory.
@ -25,88 +23,16 @@ pub fn aggregate() -> (AggregateSink, AggregateSource) {
(agg.as_sink(), agg.as_source())
}
#[derive(Debug, Clone, Copy)]
/// Possibly aggregated scores.
pub enum ScoreType {
/// Number of times the metric was used.
HitCount(u64),
/// Sum of metric values reported.
SumOfValues(u64),
/// Biggest value reported.
MaximumValue(u64),
/// Smallest value reported.
MinimumValue(u64),
/// Approximative average value (hit count / sum, non-atomic)
AverageValue(f64),
/// Approximative mean rate (hit count / period length in seconds, non-atomic)
MeanRate(f64),
}
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
pub struct MetricScores {
/// The kind of metric.
pub kind: Kind,
/// The metric's name.
pub name: String,
scores: Scoreboard,
}
impl MetricScores {
/// Update aggregated values
pub fn write(&self, value: Value) {
self.scores.update(value)
}
/// Reset aggregate values, return previous values
/// To-be-published snapshot of aggregated score values for a metric.
pub fn read_and_reset(&self) -> Vec<ScoreType> {
let values = self.scores.reset();
// if hit count is zero, then no values were recorded.
if values.hit_count() == 0 {
return vec![]
}
let mut snapshot = Vec::new();
match self.kind {
Marker => {
snapshot.push(HitCount(values.hit_count()));
snapshot.push(MeanRate(values.mean_rate()))
},
Gauge => {
snapshot.push(MaximumValue(values.max()));
snapshot.push(MinimumValue(values.min()));
snapshot.push(AverageValue(values.average()));
},
Timer | Counter => {
snapshot.push(HitCount(values.hit_count()));
snapshot.push(SumOfValues(values.sum()));
snapshot.push(MaximumValue(values.max()));
snapshot.push(MinimumValue(values.min()));
snapshot.push(AverageValue(values.average()));
snapshot.push(MeanRate(values.mean_rate()))
},
}
snapshot
}
}
/// Enumerate the metrics being aggregated and their scores.
#[derive(Debug, Clone)]
pub struct AggregateSource(Arc<RwLock<HashMap<String, Arc<MetricScores>>>>);
pub struct AggregateSource(Arc<RwLock<HashMap<String, Arc<Scoreboard>>>>);
impl AggregateSource {
/// Iterate over every aggregated metric.
// TODO impl Iterator
pub fn for_each<F>(&self, ops: F)
where
F: Fn(&MetricScores),
F: Fn(&Scoreboard),
{
for metric in self.0.read().unwrap().values() {
ops(&metric)
@ -133,7 +59,7 @@ impl AggregateSource {
/// a shared list of metrics for enumeration when used as source.
#[derive(Debug, Clone)]
pub struct Aggregator {
metrics: Arc<RwLock<HashMap<String, Arc<MetricScores>>>>,
metrics: Arc<RwLock<HashMap<String, Arc<Scoreboard>>>>,
}
impl Aggregator {
@ -172,7 +98,7 @@ impl AsSink for Aggregator {
/// The type of metric created by the AggregateSink.
/// Each Aggregate
pub type Aggregate = Arc<MetricScores>;
pub type Aggregate = Arc<Scoreboard>;
/// A sink where to send metrics for aggregation.
/// The parameters of aggregation may be set upon creation.
@ -185,17 +111,16 @@ impl Sink<Aggregate> for AggregateSink {
#[allow(unused_variables)]
fn new_metric(&self, kind: Kind, name: &str, sampling: Rate) -> Aggregate {
self.0.write().unwrap().entry(name.to_string()).or_insert_with(||
Arc::new(MetricScores {
kind,
name: name.to_string(),
scores: Scoreboard::new()
})).clone()
Arc::new(
Scoreboard::new(kind, name.to_string())
)
).clone()
}
#[allow(unused_variables)]
fn new_scope(&self, auto_flush: bool) -> ScopeFn<Aggregate> {
Arc::new(|cmd| match cmd {
Scope::Write(metric, value) => metric.write(value),
Scope::Write(metric, value) => metric.update(value),
Scope::Flush => {}
})
}
@ -228,14 +153,14 @@ mod bench {
fn time_bench_read_event(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Marker, &"marker_a", 1.0);
b.iter(|| test::black_box(metric.read_and_reset()));
b.iter(|| test::black_box(metric.reset()));
}
#[bench]
fn time_bench_read_count(b: &mut test::Bencher) {
let (sink, _source) = aggregate();
let metric = sink.new_metric(Counter, &"count_a", 1.0);
b.iter(|| test::black_box(metric.read_and_reset()));
b.iter(|| test::black_box(metric.reset()));
}
}

View File

@ -159,6 +159,8 @@ mod sample;
pub use sample::*;
mod scores;
pub use scores::*;
mod aggregate;
pub use aggregate::*;

View File

@ -21,8 +21,9 @@
use core::*;
use core::Kind::*;
use aggregate::{AggregateSource, ScoreType};
use aggregate::ScoreType::*;
use aggregate::AggregateSource;
use scores::ScoreType;
use scores::ScoreType::*;
use std::time::Duration;
use schedule::{schedule, CancelHandle};
@ -53,7 +54,7 @@ where
{
let publish_scope_fn = target.new_scope(false);
source.for_each(|metric| {
let snapshot = metric.read_and_reset();
let snapshot = metric.reset();
if snapshot.is_empty() {
// no data was collected for this period
// TODO repeat previous frame min/max ?
@ -76,12 +77,12 @@ where
/// Resulting stats are named by appending a short suffix to each metric's name.
pub fn all_stats(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
match score {
HitCount(hit) => Some((Counter, vec![name, ".hit"], hit)),
SumOfValues(sum) => Some((kind, vec![name, ".sum"], sum)),
AverageValue(avg) => Some((kind, vec![name, ".avg"], avg.round() as Value)),
MaximumValue(max) => Some((Gauge, vec![name, ".max"], max)),
MinimumValue(min) => Some((Gauge, vec![name, ".min"], min)),
MeanRate(rate) => Some((Gauge, vec![name, ".rate"], rate.round() as Value))
Count(hit) => Some((Counter, vec![name, ".count"], hit)),
Sum(sum) => Some((kind, vec![name, ".sum"], sum)),
Mean(mean) => Some((kind, vec![name, ".mean"], mean.round() as Value)),
Max(max) => Some((Gauge, vec![name, ".max"], max)),
Min(min) => Some((Gauge, vec![name, ".min"], min)),
Rate(rate) => Some((Gauge, vec![name, ".rate"], rate.round() as Value))
}
}
@ -94,13 +95,13 @@ pub fn average(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s
match kind {
Marker => {
match score {
HitCount(hit) => Some((Counter, vec![name], hit)),
Count(count) => Some((Counter, vec![name], count)),
_ => None,
}
}
_ => {
match score {
AverageValue(avg) => Some((Gauge, vec![name], avg.round() as Value)),
Mean(avg) => Some((Gauge, vec![name], avg.round() as Value)),
_ => None,
}
}
@ -118,19 +119,19 @@ pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&s
match kind {
Marker => {
match score {
HitCount(hit) => Some((Counter, vec![name], hit)),
Count(count) => Some((Counter, vec![name], count)),
_ => None,
}
}
Counter | Timer => {
match score {
SumOfValues(sum) => Some((kind, vec![name], sum)),
Sum(sum) => Some((kind, vec![name], sum)),
_ => None,
}
}
Gauge => {
match score {
AverageValue(avg) => Some((Gauge, vec![name], avg.round() as Value)),
Mean(mean) => Some((Gauge, vec![name], mean.round() as Value)),
_ => None,
}
}

View File

@ -2,18 +2,50 @@ use time;
use std::mem;
use core::*;
use core::Kind::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::usize;
use self::ScoreType::*;
#[derive(Debug, Clone, Copy)]
/// Possibly aggregated scores.
pub enum ScoreType {
/// Number of times the metric was used.
Count(u64),
/// Sum of metric values reported.
Sum(u64),
/// Biggest value reported.
Max(u64),
/// Smallest value reported.
Min(u64),
/// Approximative average value (hit count / sum, non-atomic)
Mean(f64),
/// Approximative mean rate (hit count / period length in seconds, non-atomic)
Rate(f64),
}
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
pub struct Scoreboard {
/// The kind of metric.
pub kind: Kind,
/// The metric's name.
pub name: String,
scores: [AtomicUsize; 5]
}
impl Scoreboard {
pub fn new() -> Self {
/// Create a new Scoreboard to track summary values of a metric
pub fn new(kind: Kind, name: String) -> Self {
Scoreboard {
kind,
name,
scores: unsafe { mem::transmute(Scoreboard::blank()) }
}
}
@ -23,7 +55,19 @@ impl Scoreboard {
[time::precise_time_ns() as usize, 0, 0, usize::MIN, usize::MAX]
}
pub fn reset(&self) -> Snapshot {
/// Update scores with new value
pub fn update(&self, value: Value) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[1].fetch_add(1, Acquire);
self.scores[2].fetch_add(value, Acquire);
swap_if_more(&self.scores[3], value);
swap_if_less(&self.scores[4], value);
}
/// Reset aggregate values, return previous values
/// To-be-published snapshot of aggregated score values for a metric.
pub fn reset(&self) -> Vec<ScoreType> {
let mut scores = Scoreboard::blank();
let now = scores[0];
@ -31,73 +75,56 @@ impl Scoreboard {
scores[i] = self.scores[i].swap(scores[i], Release);
}
scores[0] = now - scores[0];
Snapshot { scores }
}
let duration_seconds = (now - scores[0]) as f64 / 1_000_000_000.0;
/// Update scores with new value
pub fn update(&self, value: Value) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value as usize;
self.scores[1].fetch_add(1, Acquire);
self.scores[2].fetch_add(value, Acquire);
Scoreboard::swap_if_more(&self.scores[3], value);
Scoreboard::swap_if_less(&self.scores[4], value);
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_more(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current < new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
// if hit count is zero, then no values were recorded.
if scores[1] == 0 {
return vec![]
}
let mut snapshot = Vec::new();
match self.kind {
Marker => {
snapshot.push(Count(scores[1] as u64));
snapshot.push(Rate(scores[2] as f64 / duration_seconds))
},
Gauge => {
snapshot.push(Max(scores[3] as u64));
snapshot.push(Min(scores[4] as u64));
snapshot.push(Mean(scores[2] as f64 / scores[1] as f64));
},
Timer | Counter => {
snapshot.push(Count(scores[1] as u64));
snapshot.push(Sum(scores[2] as u64));
snapshot.push(Max(scores[3] as u64));
snapshot.push(Min(scores[4] as u64));
snapshot.push(Mean(scores[2] as f64 / scores[1] as f64));
snapshot.push(Rate(scores[2] as f64 / duration_seconds))
},
}
snapshot
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_less(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current > new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
}
}
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_more(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current < new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
}
}
pub struct Snapshot {
scores: [usize; 5],
/// Spinlock until success or clear loss to concurrent update.
#[inline]
fn swap_if_less(counter: &AtomicUsize, new_value: usize) {
let mut current = counter.load(Acquire);
while current > new_value {
if counter.compare_and_swap(current, new_value, Release) == new_value { break }
current = counter.load(Acquire);
}
}
impl Snapshot {
pub fn duration_ns(&self) -> Value {
self.scores[0] as Value
}
pub fn hit_count(&self) -> Value {
self.scores[1] as Value
}
pub fn sum(&self) -> Value {
self.scores[2] as Value
}
pub fn max(&self) -> Value {
self.scores[3] as Value
}
pub fn min(&self) -> Value {
self.scores[4] as Value
}
pub fn average(&self) -> f64 {
self.sum() as f64 / self.hit_count() as f64
}
pub fn mean_rate(&self) -> f64 {
self.sum() as f64 / (self.duration_ns() as f64 / 1_000_000_000.0)
}
}