aggregate.sink() / .source(), better sample in main and README

This commit is contained in:
Francis Lalonde 2017-07-24 16:36:23 -04:00
parent 5a255cfa16
commit fce43013ef
15 changed files with 274 additions and 171 deletions

6
Cargo.lock generated
View File

@ -3,9 +3,10 @@ name = "dipstick"
version = "0.1.0"
dependencies = [
"cached 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"thread-local-object 0.1.0",
"thread-local-object 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -161,8 +162,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "thread-local-object"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"unsafe-any 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -255,6 +256,7 @@ dependencies = [
"checksum scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e50853ff7d0b411f597d2ed9d4347b4a7583f3abced3e24d2b7fbfde492c901"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum thread-local-object 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7da3caa820d0308c84c8654f6cafd81cc3195d45433311cbe22fcf44fc8be071"
"checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520"
"checksum tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6a20ba4738d283cac7495ca36e045c80c2a8df3e05dd0909b17a06646af5a7ed"
"checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db"

View File

@ -4,14 +4,14 @@ version = "0.1.0"
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
[dependencies]
# not yet required
#error-chain = "0.10"
lazy_static = "0.2.8"
cached = "0.4.1"
#lazy_static = "0.2.8"
log = "0.3.8"
time = "0.1"
scheduled-executor = "0.3.0"
#thread_local = "0.3"
thread-local-object = { path = '../thread-local-object' }
thread-local-object = "0.1.0"
[features]
bench = []

View File

@ -1,29 +1,60 @@
dipstick
--------
A performant, configurable metrics toolkit for Rust applications.
A configurable, extendable application metrics framework.
Similar to popular logging frameworks, but with counters and timers.
Enables multiple outputs (i.e. Log file _and_ Statsd) from a single set of instruments.
Supports random sampling, local aggregation and periodical publication of collected metric values.
Provides ergonomic Timer, Counter, Gauges and Event instruments, with optional thread-local scoping.
Defined metrics are decoupled from implementation so that recorded values can be sent
transparently to multiple destinations. Current output modules include *Logging* and *Statsd*.
Random sampling or local aggregation can be used to reduce the amount of metrics emited by the app.
Publication of aggregated metrics can be done synchronously (i.e. when a program exits)
or in the background using your favorite scheduler.
Modular backend API for app configuration :
```rust
let metrics_log = LogChannel::new("metrics");
let metrics = DirectDispatch::new(metrics_log);
let counter = metrics.new_count("count_a");
counter.value(1);
// send application metrics to both aggregator and to sampling log
let aggregator = MetricAggregator::new();
let sampling_log = RandomSamplingSink::new(LogSink::new("metrics:"), 0.1);
let dual_sink = DualSink::new(aggregator.sink(), sampling_log);
// schedule aggregated metrics to be sent to statsd every 3 seconds
let statsd = MetricCache::new(StatsdSink::new("localhost:8125", "hello.").unwrap(), 512);
let aggregate_metrics = AggregatePublisher::new(statsd, aggregator.source());
// TODO publisher should provide its own scheduler
CoreExecutor::new().unwrap().schedule_fixed_rate(
Duration::from_secs(3),
Duration::from_secs(3),
move |_| aggregate_metrics.publish()
);
```
Ergonomic frontend API for application code :
```rust
// define application metrics
let mut app_metrics = DirectDispatch::new(dual_sink);
let counter = app_metrics.new_count("counter_a");
let timer = app_metrics.new_timer("timer_b");
let event = app_metrics.new_event("event_c");
let gauge = app_metrics.new_gauge("gauge_d");
loop {
// report some metric values from our "application" loop
counter.value(11);
gauge.value(22);
// use scope to update metrics as one (single log line, single network packet, etc.)
app_metrics.scope(|| {
event.mark();
time!(timer, { sleep(Duration::from_millis(5)); });
});
}
```
##TODO
- scopes
- sampling
- generic publisher / sources
- scope properties
- log templates
- more outputs
- configurable aggregates
- buffers & queued dispatch
- tags
- tests
- bench
- doc
- samples
- tests & benches
- create doc
- lots of sample code

View File

@ -1,2 +1,2 @@
pub mod sink;
pub mod source;
pub mod publish;

View File

@ -1,29 +1,35 @@
//// Aggregate Source
use core::{MetricSink, MetricType, SinkWriter, MetricSource};
use aggregate::sink::{ScoreIterator, AggregateScore};
use core::{MetricSink, MetricType, SinkWriter};
use aggregate::sink::{AggregateSource, AggregateScore};
/// publisher from aggregate metrics to target channel
pub struct AggregateSource<C: MetricSink> {
/// Publisher from aggregate metrics to target channel
pub struct AggregatePublisher<C: MetricSink> {
source: AggregateSource,
target: C,
scores: ScoreIterator,
}
impl <C: MetricSink> AggregateSource<C> {
impl <C: MetricSink> AggregatePublisher<C> {
/// create new publisher from aggregate metrics to target channel
pub fn new(target: C, scores: ScoreIterator) -> AggregateSource<C> {
AggregateSource {target, scores}
/// Create new publisher from aggregate metrics to target channel
pub fn new(target: C, source: AggregateSource) -> AggregatePublisher<C> {
AggregatePublisher{ source, target }
}
}
impl <C: MetricSink> MetricSource for AggregateSource<C> {
impl <C: MetricSink> AggregatePublisher<C> {
/// define and write metrics from aggregated scores to the target channel
fn publish(&self) {
/// Define and write metrics from aggregated scores to the target channel
/// If this is called repeatedly it can be a good idea to use the metric cache
/// to prevent new metrics from being created every time.
pub fn publish(&self) {
let scope = self.target.new_writer();
self.scores.for_each(|metric| {
self.source.for_each(|metric| {
match metric.read_and_reset() {
AggregateScore::NoData => {
// TODO repeat previous frame min/max ?
// TODO update some canary metric ?
},
AggregateScore::Event {hit} => {
let name = format!("{}.hit", &metric.name);
let temp_metric = self.target.define(MetricType::Count, name, 1.0);
@ -60,7 +66,7 @@ impl <C: MetricSink> MetricSource for AggregateSource<C> {
}
}
}
});
})
}
}

View File

@ -1,7 +1,9 @@
use core::{MetricType, RateType, Value, SinkWriter, SinkMetric, MetricSink};
use core::{MetricType, Rate, Value, SinkWriter, SinkMetric, MetricSink};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::usize;
use std::slice::Iter;
#[derive(Debug)]
enum AtomicScore {
@ -9,13 +11,15 @@ enum AtomicScore {
Value { hit: AtomicUsize, sum: AtomicUsize, max: AtomicUsize, min: AtomicUsize },
}
/// consumed aggregated values
/// to-be-consumed aggregated values
#[derive(Debug)]
pub enum AggregateScore {
NoData,
Event { hit: u64 },
Value { hit: u64, sum: u64, max: u64, min: u64 },
}
/// A metric that holds aggregated values
#[derive(Debug)]
pub struct AggregateMetric {
pub m_type: MetricType,
@ -25,7 +29,7 @@ pub struct AggregateMetric {
impl AggregateMetric {
/// add value to score
/// Update scores with value
pub fn write(&self, value: usize) -> () {
match &self.score {
&AtomicScore::Event {ref hit, ..} => {
@ -56,18 +60,22 @@ impl AggregateMetric {
}
}
/// reset aggregate values, save previous values to
/// reset aggregate values, return previous values
pub fn read_and_reset(&self) -> AggregateScore {
match self.score {
AtomicScore::Event {ref hit} =>
AggregateScore::Event {
hit: hit.swap(0, Ordering::Release) as u64 },
AtomicScore::Event {ref hit} => {
let hit = hit.swap(0, Ordering::Release) as u64;
if hit == 0 {AggregateScore::NoData} else { AggregateScore::Event { hit }}
},
AtomicScore::Value {ref hit, ref sum,ref max, ref min} => {
AggregateScore::Value {
hit: hit.swap(0, Ordering::Release) as u64,
sum: sum.swap(0, Ordering::Release) as u64,
max: max.swap(usize::MIN, Ordering::Release) as u64,
min: min.swap(usize::MAX, Ordering::Release) as u64,
let hit = hit.swap(0, Ordering::Release) as u64;
if hit == 0 {AggregateScore::NoData} else {
AggregateScore::Value {
hit,
sum: sum.swap(0, Ordering::Release) as u64,
max: max.swap(usize::MIN, Ordering::Release) as u64,
min: min.swap(usize::MAX, Ordering::Release) as u64,
}
}
}
}
@ -78,8 +86,7 @@ impl SinkMetric for Arc<AggregateMetric> {
}
#[derive(Debug)]
pub struct AggregateWrite {
}
pub struct AggregateWrite ();
impl SinkWriter<Arc<AggregateMetric>> for AggregateWrite {
fn write(&self, metric: &Arc<AggregateMetric>, value: Value) {
@ -87,42 +94,50 @@ impl SinkWriter<Arc<AggregateMetric>> for AggregateWrite {
}
}
#[derive(Debug)]
pub struct ScoreIterator {
metrics: Arc<RwLock<Vec<Arc<AggregateMetric>>>>,
// there can only be one
lazy_static! {
static ref AGGREGATE_WRITE: AggregateWrite = AggregateWrite();
}
impl ScoreIterator {
pub fn for_each<F>(&self, operations: F) where F: Fn(&AggregateMetric) {
for metric in self.metrics.read().unwrap().iter() {
operations(metric);
};
#[derive(Debug)]
pub struct AggregateSource ( Arc<RwLock<Vec<Arc<AggregateMetric>>>> );
impl AggregateSource {
pub fn for_each<F>(&self, ops: F) where F: Fn(&AggregateMetric) {
for metric in self.0.read().unwrap().iter() {
ops(&metric)
}
}
}
#[derive(Debug)]
pub struct AggregateChannel {
write: AggregateWrite,
pub struct MetricAggregator {
metrics: Arc<RwLock<Vec<Arc<AggregateMetric>>>>,
}
impl AggregateChannel {
impl MetricAggregator {
pub fn new() -> AggregateChannel {
AggregateChannel { write: AggregateWrite {}, metrics: Arc::new(RwLock::new(Vec::new())) }
pub fn new() -> MetricAggregator {
MetricAggregator { metrics: Arc::new(RwLock::new(Vec::new())) }
}
pub fn scores(&self) -> ScoreIterator {
ScoreIterator { metrics : self.metrics.clone() }
pub fn source(&self) -> AggregateSource {
AggregateSource( self.metrics.clone() )
}
pub fn sink(&self) -> AggregateSink {
AggregateSink( self.metrics.clone() )
}
}
impl MetricSink for AggregateChannel {
pub struct AggregateSink ( Arc<RwLock<Vec<Arc<AggregateMetric>>>> );
impl MetricSink for AggregateSink {
type Metric = Arc<AggregateMetric>;
type Writer = AggregateWrite;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType) -> Arc<AggregateMetric> {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> Arc<AggregateMetric> {
let name = name.as_ref().to_string();
let metric = Arc::new(AggregateMetric {
m_type, name, score: match m_type {
@ -136,12 +151,13 @@ impl MetricSink for AggregateChannel {
}
});
self.metrics.write().unwrap().push(metric.clone());
self.0.write().unwrap().push(metric.clone());
metric
}
fn new_writer(&self) -> AggregateWrite {
AggregateWrite{ }
// TODO return AGGREGATE_WRITE
AggregateWrite()
}
}
@ -150,13 +166,13 @@ impl MetricSink for AggregateChannel {
#[cfg(feature="bench")]
mod bench {
use super::AggregateChannel;
use super::MetricAggregator;
use core::{MetricType, MetricSink, SinkWriter};
use test::Bencher;
#[bench]
fn time_bench_write_event(b: &mut Bencher) {
let aggregate = &AggregateChannel::new();
let aggregate = &MetricAggregator::new().sink();
let metric = aggregate.define(MetricType::Event, "event_a", 1.0);
let writer = aggregate.new_writer();
b.iter(|| writer.write(&metric, 1));
@ -165,7 +181,7 @@ mod bench {
#[bench]
fn time_bench_write_count(b: &mut Bencher) {
let aggregate = &AggregateChannel::new();
let aggregate = &MetricAggregator::new().sink();
let metric = aggregate.define(MetricType::Count, "count_a", 1.0);
let writer = aggregate.new_writer();
b.iter(|| writer.write(&metric, 1));
@ -173,14 +189,14 @@ mod bench {
#[bench]
fn time_bench_read_event(b: &mut Bencher) {
let aggregate = &AggregateChannel::new();
let aggregate = &MetricAggregator::new().sink();
let metric = aggregate.define(MetricType::Event, "event_a", 1.0);
b.iter(|| metric.read_and_reset());
}
#[bench]
fn time_bench_read_count(b: &mut Bencher) {
let aggregate = &AggregateChannel::new();
let aggregate = &MetricAggregator::new().sink();
let metric = aggregate.define(MetricType::Count, "count_a", 1.0);
b.iter(|| metric.read_and_reset());
}

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, MetricSink, SinkMetric, SinkWriter};
use core::{MetricType, Rate, Value, MetricSink, SinkMetric, SinkWriter};
use cached::{SizedCache, Cached};
use std::sync::Arc;
use std::sync::RwLock;
@ -14,14 +14,12 @@ use std::sync::RwLock;
// one solution might require SinkMetric<PHANTOM> (everywhere!), not tried because it would be HORRIBLE
// for now we use this "wrapping reification" of Arc<> which needs to be allocated on every cache miss or hit
// if you know how to fix it that'd be great
#[derive(Debug)]
pub struct CachedMetric<C: MetricSink> (Arc<C::Metric>);
impl <C: MetricSink> SinkMetric for CachedMetric<C> {}
// WRITER
#[derive(Debug)]
pub struct CachedMetricWriter<C: MetricSink> {
target: C::Writer,
}
@ -32,14 +30,15 @@ impl <C: MetricSink> SinkWriter<CachedMetric<C>> for CachedMetricWriter<C> {
}
}
// SINK
/// A cache to help with ad-hoc defined metrics
/// Does not alter the values of the metrics
pub struct MetricCache<C: MetricSink> {
target: C,
cache: RwLock<SizedCache<String, Arc<C::Metric>>>,
}
impl <C: MetricSink> MetricCache<C> {
/// Build a new metric cache
pub fn new(target: C, cache_size: usize) -> MetricCache<C> {
let cache = RwLock::new(SizedCache::with_capacity(cache_size));
MetricCache { target, cache }
@ -50,17 +49,15 @@ impl <C: MetricSink> MetricSink for MetricCache<C> {
type Metric = CachedMetric<C>;
type Writer = CachedMetricWriter<C>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType) -> CachedMetric<C> {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> CachedMetric<C> {
let key = name.as_ref().to_string();
{
let mut cache = self.cache.write().unwrap();
let cached_metric = cache.cache_get(&key);
if let Some(cached_metric) = cached_metric {
println!("cache hit");
return CachedMetric(cached_metric.clone());
}
}
println!("cache miss");
let target_metric = self.target.define(m_type, name, sampling);
let new_metric = Arc::new( target_metric );
let mut cache = self.cache.write().unwrap();

View File

@ -1,4 +1,5 @@
use time;
use std::result::Iter;
//////////////////
// DEFINITIONS
@ -9,11 +10,18 @@ pub type Value = u64;
pub struct TimeType (u64);
impl TimeType {
pub fn now() -> TimeType { TimeType(time::precise_time_ns()) }
pub fn elapsed_ms(self) -> Value { (TimeType::now().0 - self.0) / 1_000_000 }
pub fn now() -> TimeType {
TimeType(time::precise_time_ns())
}
pub fn elapsed_ms(self) -> Value {
(TimeType::now().0 - self.0) / 1_000_000
}
}
pub type RateType = f64;
pub type Rate = f64;
pub const FULL_SAMPLING_RATE: Rate = 1.0;
#[derive(Debug, Copy, Clone)]
pub enum MetricType {
@ -23,13 +31,10 @@ pub enum MetricType {
Time,
}
//////////////////
// CONTRACT
// INSTRUMENTATION (API CONTRACT)
// Application contract
pub trait EventMetric {
fn event(&self);
fn mark(&self);
}
pub trait ValueMetric {
@ -46,10 +51,13 @@ pub trait TimerMetric: ValueMetric {
}
}
/// Identifies a metric dispatch scope.
pub trait MetricScope {
fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self;
// TODO enable free-form scope properties
// fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self;
}
/// Main trait of the metrics API
pub trait MetricDispatch {
type Event: EventMetric;
type Value: ValueMetric;
@ -64,7 +72,10 @@ pub trait MetricDispatch {
fn scope<F>(&mut self, operations: F) where F: Fn(/*&Self::Scope*/);
}
pub trait MetricSource {
/// Metric sources allow a group of metrics to be defined and written as one.
/// Source implementers may get their data from internally aggregated or buffered metrics
/// or they may read existing metrics not defined by the app (OS counters, etc)
pub trait MetricPublisher {
fn publish(&self);
}
@ -80,18 +91,42 @@ macro_rules! time {
}};
}
// SINK
pub trait SinkMetric {}
pub trait SinkWriter<M: SinkMetric>: Send {
fn write(&self, metric: &M, value: Value);
fn flush(&self) {}
}
/// Main trait of the metrics backend API.
/// Defines a component that can be used when setting up a metrics backend stack.
/// Intermediate sinks transform how metrics are defined and written:
/// - Sampling
/// - Dual
/// - Cache
/// Terminal sinks store or propagate metric values to other systems.
/// - Statsd
/// - Log
/// - Aggregate
pub trait MetricSink {
type Metric: SinkMetric;
type Writer: SinkWriter<Self::Metric>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> Self::Metric;
/// Define a new sink-specific metric that can be used for writing values.
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> Self::Metric;
/// Open a metric writer to write metrics to.
/// Some sinks actually reuse the same writer while others allocate resources for every new writer.
fn new_writer(&self) -> Self::Writer;
}
/// A metric identifier defined by a specific metric sink implementation.
/// Passed back to when writing a metric value
/// May carry state specific to the sink's implementation
pub trait SinkMetric {}
/// A sink-specific target for writing metrics to.
pub trait SinkWriter<M: SinkMetric>: Send {
/// Write a single metric value
fn write(&self, metric: &M, value: Value);
/// Some sinks may have buffering capability.
/// Flushing makes sure all previously written metrics are propagated
/// down the sink chain and to any applicable external outputs.
fn flush(&self) {}
}

View File

@ -1,36 +1,35 @@
use core::{MetricType, Value, SinkWriter, MetricSink, MetricDispatch, EventMetric, ValueMetric, TimerMetric, MetricScope};
use std::sync::Arc;
use std::cell::RefCell;
use thread_local_object::ThreadLocal;
////////////
pub struct DirectEvent<C: MetricSink + 'static> {
/// Base struct for all direct dispatch metrics
struct DirectMetric<C: MetricSink + 'static> {
metric: <C as MetricSink>::Metric,
dispatch_scope: Arc<DirectScope<C>>
}
pub struct DirectValue<C: MetricSink + 'static> {
metric: <C as MetricSink>::Metric,
dispatch_scope: Arc<DirectScope<C>>
}
/// An event marker that dispatches values directly to the metrics backend
pub struct DirectEvent<C: MetricSink + 'static>( DirectMetric<C>);
pub struct DirectTimer<C: MetricSink + 'static> {
metric: <C as MetricSink>::Metric,
dispatch_scope: Arc<DirectScope<C>>
}
/// A gauge or counter that dispatches values directly to the metrics backend
pub struct DirectValue<C: MetricSink + 'static>( DirectMetric<C>);
/// An timer that dispatches values directly to the metrics backend
pub struct DirectTimer<C: MetricSink + 'static>( DirectMetric<C>);
/// A scoped writer
pub struct ScopeWriter<C: MetricSink> {
writer: C::Writer
// properties: hashmap
}
impl <C: MetricSink> MetricScope for ScopeWriter<C> {
fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self {
/* fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self {
self
}
}*/
}
/// The shared scope-selector for all of a single Dispatcher metrics
pub struct DirectScope<C: MetricSink + 'static> {
default_scope: C::Writer,
thread_scope: ThreadLocal<ScopeWriter<C>>,
@ -48,20 +47,20 @@ impl <C: MetricSink> DirectScope<C> {
}
impl <C: MetricSink> EventMetric for DirectEvent<C> {
fn event(&self) {
self.dispatch_scope.value(&self.metric, 1);
fn mark(&self) {
self.0.dispatch_scope.value(&self.0.metric, 1);
}
}
impl <C: MetricSink> ValueMetric for DirectValue<C> {
fn value(&self, value: Value) {
self.dispatch_scope.value(&self.metric, value);
self.0.dispatch_scope.value(&self.0.metric, value);
}
}
impl <C: MetricSink> ValueMetric for DirectTimer<C> {
fn value(&self, value: Value) {
self.dispatch_scope.value(&self.metric,value);
self.0.dispatch_scope.value(&self.0.metric,value);
}
}
@ -69,9 +68,11 @@ impl <C: MetricSink> TimerMetric for DirectTimer<C> {
}
impl <C: MetricSink> MetricScope for DirectScope<C> {
/*
fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self {
self
}
*/
}
pub struct DirectDispatch<C: MetricSink + 'static> {
@ -94,27 +95,28 @@ impl <C: MetricSink> MetricDispatch for DirectDispatch<C> {
fn new_event<S: AsRef<str>>(&self, name: S) -> Self::Event {
let metric = self.target.define(MetricType::Event, name, 1.0);
DirectEvent { metric, dispatch_scope: self.dispatch_scope.clone() }
DirectEvent ( DirectMetric{ metric, dispatch_scope: self.dispatch_scope.clone() })
}
fn new_count<S: AsRef<str>>(&self, name: S) -> Self::Value {
let metric = self.target.define(MetricType::Count, name, 1.0);
DirectValue { metric, dispatch_scope: self.dispatch_scope.clone() }
DirectValue ( DirectMetric { metric, dispatch_scope: self.dispatch_scope.clone() })
}
fn new_timer<S: AsRef<str>>(&self, name: S) -> Self::Timer {
let metric = self.target.define(MetricType::Time, name, 1.0);
DirectTimer { metric, dispatch_scope: self.dispatch_scope.clone() }
DirectTimer ( DirectMetric { metric, dispatch_scope: self.dispatch_scope.clone() })
}
fn new_gauge<S: AsRef<str>>(&self, name: S) -> Self::Value {
let metric = self.target.define(MetricType::Gauge, name, 1.0);
DirectValue { metric, dispatch_scope: self.dispatch_scope.clone() }
DirectValue ( DirectMetric { metric, dispatch_scope: self.dispatch_scope.clone() })
}
fn scope<F>(&mut self, operations: F) where F: Fn(/*&Self::Scope*/) {
let new_writer = self.target.new_writer();
let mut scope = ScopeWriter{ writer: new_writer};
let scope = ScopeWriter{ writer: new_writer};
// TODO this should be done transactionally to make sure scope is always removed() even on panic
self.dispatch_scope.thread_scope.set(scope);
operations();
self.dispatch_scope.thread_scope.remove();
@ -125,16 +127,16 @@ impl <C: MetricSink> MetricDispatch for DirectDispatch<C> {
#[cfg(feature="bench")]
mod bench {
use aggregate::sink::AggregateChannel;
use core::{MetricType, MetricSink, SinkWriter, MetricDispatch, EventMetric};
use aggregate::sink::MetricAggregator;
use core::{MetricDispatch, EventMetric};
use test::Bencher;
#[bench]
fn time_bench_direct_dispatch_event(b: &mut Bencher) {
let aggregate = AggregateChannel::new();
let aggregate = MetricAggregator::new().sink();
let dispatch = super::DirectDispatch::new(aggregate);
let metric = dispatch.new_event("aaa");
b.iter(|| metric.event());
let event = dispatch.new_event("aaa");
b.iter(|| event.mark());
}

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, SinkWriter, SinkMetric, MetricSink};
use core::{MetricType, Rate, Value, SinkWriter, SinkMetric, MetricSink};
#[derive(Debug)]
pub struct DualMetric<M1: SinkMetric, M2: SinkMetric> {
@ -37,9 +37,9 @@ impl <C1: MetricSink, C2: MetricSink> MetricSink for DualSink<C1, C2> {
type Metric = DualMetric<C1::Metric, C2::Metric>;
type Writer = DualWriter<C1, C2>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> DualMetric<C1::Metric, C2::Metric> {
let metric_1 = self.channel_a.define(m_type, &name, sample);
let metric_2 = self.channel_b.define(m_type, &name, sample);
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> DualMetric<C1::Metric, C2::Metric> {
let metric_1 = self.channel_a.define(m_type, &name, sampling);
let metric_2 = self.channel_b.define(m_type, &name, sampling);
DualMetric { metric_1, metric_2 }
}

View File

@ -25,6 +25,9 @@ extern crate thread_local_object;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
pub mod core;
pub mod dual;
pub mod dispatch;
@ -34,3 +37,4 @@ pub mod statsd;
pub mod mlog;
pub mod pcg32;
pub mod cache;

View File

@ -8,6 +8,9 @@ extern crate time;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
extern crate scheduled_executor;
extern crate thread_local_object;
@ -30,9 +33,9 @@ use dispatch::DirectDispatch;
use sampling::RandomSamplingSink;
use statsd::StatsdSink;
use mlog::LogSink;
use aggregate::sink::{AggregateChannel};
use aggregate::source::{AggregateSource};
use core::{MetricType, MetricSink, SinkWriter, MetricDispatch, ValueMetric, TimerMetric, MetricSource};
use aggregate::sink::{MetricAggregator};
use aggregate::publish::{AggregatePublisher};
use core::{MetricType, MetricSink, SinkWriter, MetricDispatch, ValueMetric, TimerMetric, EventMetric};
use std::thread::sleep;
use scheduled_executor::{CoreExecutor};
use std::time::Duration;
@ -44,35 +47,42 @@ fn main() {
pub fn sample_scheduled_statsd_aggregation() {
// app metrics aggregate here
let aggregate = AggregateChannel::new();
// SAMPLE METRICS SETUP
// aggregated metrics are collected here
let scores = aggregate.scores();
// send application metrics to both aggregator and to sampling log
let aggregator = MetricAggregator::new();
let sampling_log = RandomSamplingSink::new(LogSink::new("metrics:"), 0.1);
let dual_sink = DualSink::new(aggregator.sink(), sampling_log);
// define some application metrics
let mut app_metrics = DirectDispatch::new(aggregate);
let counter = app_metrics.new_count("counter_a");
let timer = app_metrics.new_timer("timer_a");
// send aggregated metrics to statsd
// schedule aggregated metrics to be sent to statsd every 3 seconds
let statsd = MetricCache::new(StatsdSink::new("localhost:8125", "hello.").unwrap(), 512);
let aggregate_metrics = AggregateSource::new(statsd, scores);
// collect every three seconds
let executor = CoreExecutor::new().unwrap();
executor.schedule_fixed_rate(
let aggregate_metrics = AggregatePublisher::new(statsd, aggregator.source());
// TODO publisher should provide its own scheduler
let exec = CoreExecutor::new().unwrap();
exec.schedule_fixed_rate(
Duration::from_secs(3),
Duration::from_secs(3),
move |_| aggregate_metrics.publish()
);
// generate some metric values
// SAMPLE METRICS USAGE
// define application metrics
let mut app_metrics = DirectDispatch::new(dual_sink);
let counter = app_metrics.new_count("counter_a");
let timer = app_metrics.new_timer("timer_b");
let event = app_metrics.new_event("event_c");
let gauge = app_metrics.new_gauge("gauge_d");
loop {
// report some metric values from our "application" loop
counter.value(11);
gauge.value(22);
// use scope to update metrics as one (single log line, single network packet, etc.)
app_metrics.scope(|| {
counter.value(11);
counter.value(22);
time!(timer, { sleep(Duration::from_millis(10)); });
event.mark();
time!(timer, { sleep(Duration::from_millis(5)); });
});
}

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, SinkWriter, SinkMetric, MetricSink};
use core::{MetricType, Rate, Value, SinkWriter, SinkMetric, MetricSink};
//////////// Log Channel
@ -36,7 +36,7 @@ impl MetricSink for LogSink {
type Metric = LogMetric;
type Writer = LogWriter;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> LogMetric {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> LogMetric {
LogMetric { prefix: format!("{:?}:{}{}", m_type, self.prefix, name.as_ref())}
}

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, SinkWriter, SinkMetric, MetricSink};
use core::{MetricType, Rate, Value, SinkWriter, SinkMetric, MetricSink};
use pcg32;
#[derive(Debug)]
@ -26,11 +26,11 @@ impl <C: MetricSink> SinkWriter<RandomSamplingMetric<<C as MetricSink>::Metric>>
#[derive(Debug)]
pub struct RandomSamplingSink<C: MetricSink> {
target: C,
sampling_rate: RateType,
sampling_rate: Rate,
}
impl <C: MetricSink> RandomSamplingSink<C> {
pub fn new(target: C, sampling_rate: RateType) -> RandomSamplingSink<C> {
pub fn new(target: C, sampling_rate: Rate) -> RandomSamplingSink<C> {
RandomSamplingSink { target, sampling_rate}
}
}
@ -40,7 +40,7 @@ impl <C: MetricSink> MetricSink for RandomSamplingSink<C> {
type Writer = RandomSamplingWriter<C>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> RandomSamplingMetric<C::Metric> {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> RandomSamplingMetric<C::Metric> {
let pm = self.target.define(m_type, name, self.sampling_rate);
RandomSamplingMetric { target: pm, int_sampling_rate: pcg32::to_int_rate(self.sampling_rate) }
}

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, SinkWriter, SinkMetric, MetricSink};
use core::{MetricType, Rate, Value, SinkWriter, SinkMetric, MetricSink, FULL_SAMPLING_RATE};
use std::net::UdpSocket;
use std::io::Result;
use std::cell::RefCell;
@ -27,7 +27,7 @@ pub struct StatsdWriter {
fn flush(payload: &mut String, socket: &UdpSocket) {
// TODO check for and report any send() error
debug!("statsd sending {} bytes", payload.len());
socket.send(payload.as_bytes());
socket.send(payload.as_bytes()).unwrap();
payload.clear();
}
@ -100,7 +100,7 @@ impl MetricSink for StatsdSink {
type Metric = StatsdMetric;
type Writer = StatsdWriter;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> StatsdMetric {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> StatsdMetric {
let mut prefix = String::with_capacity(32);
prefix.push_str(&self.prefix);
prefix.push_str(name.as_ref());
@ -114,9 +114,9 @@ impl MetricSink for StatsdSink {
MetricType::Time => "ms"
});
if sample < 1.0 {
if sampling < FULL_SAMPLING_RATE {
suffix.push('@');
suffix.push_str(&sample.to_string());
suffix.push_str(&sampling.to_string());
}
StatsdMetric {prefix, suffix}