Move to subs, break core

This commit is contained in:
Francis Lalonde 2018-09-14 06:47:31 -04:00
parent 6706f18872
commit 35c90336b7
42 changed files with 3812 additions and 1395 deletions

0
README.md Normal file → Executable file
View File

0
examples/custom_publish.rs Normal file → Executable file
View File

0
examples/raw_log.rs Normal file → Executable file
View File

View File

@ -1,13 +1,12 @@
//! Maintain aggregated metrics for deferred reporting,
//!
use core::{Kind, Value, Name, AddPrefix, output_none, InputScope, InputMetric, WithAttributes, Attributes,
OutputScope, OutputMetric, Output, Flush, OutputDyn};
use clock::TimeHandle;
use core::Kind::*;
use error;
use scores::{ScoreType, Scoreboard};
use scores::ScoreType::*;
use core::component::{Attributes, Name, WithAttributes, AddPrefix};
use core::input::{Kind, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output, output_none};
use core::clock::TimeHandle;
use core::{Value, Flush};
use aggregate::scores::{Scoreboard, ScoreType};
use core::error;
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
@ -112,7 +111,7 @@ impl InnerBucket {
} else {
// TODO add switch for metadata such as PERIOD_LENGTH
if self.publish_metadata {
snapshot.push((&PERIOD_LENGTH, Timer, vec![Sum((duration_seconds * 1000.0) as u64)]));
snapshot.push((&PERIOD_LENGTH, Kind::Timer, vec![ScoreType::Sum((duration_seconds * 1000.0) as u64)]));
}
for metric in snapshot {
for score in metric.2 {
@ -237,12 +236,12 @@ impl WithAttributes for Bucket {
#[allow(dead_code)]
pub fn stats_all(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match score {
Count(hit) => Some((Counter, name.concat("count"), hit)),
Sum(sum) => Some((kind, name.concat("sum"), sum)),
Mean(mean) => Some((kind, name.concat("mean"), mean.round() as Value)),
Max(max) => Some((Gauge, name.concat("max"), max)),
Min(min) => Some((Gauge, name.concat("min"), min)),
Rate(rate) => Some((Gauge, name.concat("rate"), rate.round() as Value)),
ScoreType::Count(hit) => Some((Kind::Counter, name.concat("count"), hit)),
ScoreType::Sum(sum) => Some((kind, name.concat("sum"), sum)),
ScoreType::Mean(mean) => Some((kind, name.concat("mean"), mean.round() as Value)),
ScoreType::Max(max) => Some((Kind::Gauge, name.concat("max"), max)),
ScoreType::Min(min) => Some((Kind::Gauge, name.concat("min"), min)),
ScoreType::Rate(rate) => Some((Kind::Gauge, name.concat("rate"), rate.round() as Value)),
}
}
@ -253,12 +252,12 @@ pub fn stats_all(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name
#[allow(dead_code)]
pub fn stats_average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Marker => match score {
Count(count) => Some((Counter, name, count)),
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
_ => match score {
Mean(avg) => Some((Gauge, name, avg.round() as Value)),
ScoreType::Mean(avg) => Some((Kind::Gauge, name, avg.round() as Value)),
_ => None,
},
}
@ -273,16 +272,16 @@ pub fn stats_average(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind,
#[allow(dead_code)]
pub fn stats_summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind, Name, Value)> {
match kind {
Marker => match score {
Count(count) => Some((Counter, name, count)),
Kind::Marker => match score {
ScoreType::Count(count) => Some((Kind::Counter, name, count)),
_ => None,
},
Counter | Timer => match score {
Sum(sum) => Some((kind, name, sum)),
Kind::Counter | Kind::Timer => match score {
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
Gauge => match score {
Mean(mean) => Some((Gauge, name, mean.round() as Value)),
Kind::Gauge => match score {
ScoreType::Mean(mean) => Some((Kind::Gauge, name, mean.round() as Value)),
_ => None,
},
}
@ -292,8 +291,8 @@ pub fn stats_summary(kind: Kind, name: Name, score: ScoreType) -> Option<(Kind,
mod bench {
use test;
use core::*;
use bucket::Bucket;
use core::clock::*;
use super::*;
#[bench]
fn aggregate_marker(b: &mut test::Bencher) {
@ -313,10 +312,9 @@ mod bench {
#[cfg(test)]
mod test {
use core::*;
use bucket::{Bucket, stats_all, stats_summary, stats_average, StatsFn};
use clock::{mock_clock_advance, mock_clock_reset};
use map::StatsMap;
use super::*;
use core::clock::{mock_clock_advance, mock_clock_reset};
use output::map::StatsMap;
use std::time::Duration;
use std::collections::BTreeMap;

2
src/aggregate/mod.rs Executable file
View File

@ -0,0 +1,2 @@
pub mod bucket;
pub mod scores;

View File

@ -1,8 +1,7 @@
use core::input::Kind;
use core::Value;
use std::mem;
use core::*;
use core::Kind::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::usize;
@ -61,7 +60,7 @@ impl Scoreboard {
let value = value as usize;
self.scores[0].fetch_add(1, AcqRel);
match self.kind {
Marker => {}
Kind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
@ -94,16 +93,16 @@ impl Scoreboard {
let mut snapshot = Vec::new();
match self.kind {
Marker => {
Kind::Marker => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Gauge => {
Kind::Gauge => {
snapshot.push(Max(scores[2] as u64));
snapshot.push(Min(scores[3] as u64));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
}
Timer => {
Kind::Timer => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
@ -113,7 +112,7 @@ impl Scoreboard {
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
}
Counter => {
Kind::Counter => {
snapshot.push(Count(scores[0] as u64));
snapshot.push(Sum(scores[1] as u64));
@ -148,24 +147,26 @@ fn swap_if(counter: &AtomicUsize, new_value: usize, compare: fn(usize, usize) ->
#[cfg(feature = "bench")]
mod bench {
use core::input::Kind;
use super::*;
use test;
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = Scoreboard::new(Marker);
let metric = Scoreboard::new(Kind::Marker);
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
fn update_count(b: &mut test::Bencher) {
let metric = Scoreboard::new(Counter);
let metric = Scoreboard::new(Kind::Counter);
b.iter(|| test::black_box(metric.update(4)));
}
#[bench]
fn empty_snapshot(b: &mut test::Bencher) {
let metric = Scoreboard::new(Counter);
let metric = Scoreboard::new(Kind::Counter);
let scores = &mut Scoreboard::blank();
b.iter(|| test::black_box(metric.snapshot(scores)));
}

93
src/cache/cache_in.rs vendored Executable file
View File

@ -0,0 +1,93 @@
//! Cache metric definitions.
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use cache::lru_cache as lru;
use core::error;
use std::sync::{Arc, RwLock};
/// Wrap an output with a metric definition cache.
/// This is useless if all metrics are statically declared but can provide performance
/// benefits if some metrics are dynamically defined at runtime.
pub trait CachedInput: Input + Send + Sync + 'static + Sized {
/// Wrap this output with an asynchronous dispatch queue of specified length.
fn cached(self, max_size: usize) -> InputCache {
InputCache::wrap(self, max_size)
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct InputCache {
attributes: Attributes,
target: Arc<InputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
}
impl InputCache {
/// Wrap scopes with an asynchronous metric write & flush dispatcher.
pub fn wrap<OUT: Input + Send + Sync + 'static>(target: OUT, max_size: usize) -> InputCache {
InputCache {
attributes: Attributes::default(),
target: Arc::new(target),
cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size)))
}
}
}
impl WithAttributes for InputCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Input for InputCache {
type SCOPE = InputScopeCache;
fn input(&self) -> Self::SCOPE {
let target = self.target.input_dyn();
InputScopeCache {
attributes: self.attributes.clone(),
target,
cache: self.cache.clone(),
}
}
}
/// Input wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct InputScopeCache {
attributes: Attributes,
target: Arc<InputScope + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
}
impl WithAttributes for InputScopeCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for InputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.qualified_name(name);
let lookup = {
let mut cache = self.cache.write().expect("Cache Lock");
cache.get(&name).map(|found| found.clone())
};
lookup.unwrap_or_else(|| {
let new_metric = self.target.new_metric(name.clone(), kind);
// FIXME (perf) having to take another write lock for a cache miss
let mut cache_miss = self.cache.write().expect("Cache Lock");
cache_miss.insert(name, new_metric.clone());
new_metric
})
}
}
impl Flush for InputScopeCache {
fn flush(&self) -> error::Result<()> {
self.target.flush()
}
}

96
src/cache/cache_out.rs vendored Executable file
View File

@ -0,0 +1,96 @@
//! Cache metric definitions.
use core::Flush;
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::input::Kind;
use cache::lru_cache as lru;
use core::error;
use std::sync::{Arc, RwLock};
use std::rc::Rc;
/// Wrap an output with a metric definition cache.
/// This is useless if all metrics are statically declared but can provide performance
/// benefits if some metrics are dynamically defined at runtime.
pub trait CachedOutput: Output + Send + Sync + 'static + Sized {
/// Wrap this output with an asynchronous dispatch queue of specified length.
fn cached(self, max_size: usize) -> OutputCache {
OutputCache::wrap(self, max_size)
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct OutputCache {
attributes: Attributes,
target: Arc<OutputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
}
impl OutputCache {
/// Wrap scopes with an asynchronous metric write & flush dispatcher.
pub fn wrap<OUT: Output + Send + Sync + 'static>(target: OUT, max_size: usize) -> OutputCache {
OutputCache {
attributes: Attributes::default(),
target: Arc::new(target),
cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size)))
}
}
}
impl WithAttributes for OutputCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Output for OutputCache {
type SCOPE = OutputScopeCache;
fn output(&self) -> Self::SCOPE {
let target = self.target.output_dyn();
OutputScopeCache {
attributes: self.attributes.clone(),
target,
cache: self.cache.clone(),
}
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct OutputScopeCache {
attributes: Attributes,
target: Rc<OutputScope + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
}
impl WithAttributes for OutputScopeCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl OutputScope for OutputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.qualified_name(name);
let lookup = {
let mut cache = self.cache.write().expect("Cache Lock");
cache.get(&name).map(|found| found.clone())
};
lookup.unwrap_or_else(|| {
let new_metric: OutputMetric = self.target.new_metric(name.clone(), kind);
// FIXME (perf) having to take another write lock for a cache miss
let mut cache_miss = self.cache.write().expect("Cache Lock");
cache_miss.insert(name, new_metric.clone());
new_metric
})
}
}
impl Flush for OutputScopeCache {
fn flush(&self) -> error::Result<()> {
self.target.flush()
}
}

170
src/cache/lru_cache.rs vendored Executable file
View File

@ -0,0 +1,170 @@
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
use std::hash::Hash;
use std::collections::HashMap;
struct CacheEntry<K, V> {
key: K,
value: Option<V>,
next: Option<usize>,
prev: Option<usize>,
}
/// A fixed-size cache.
pub struct LRUCache<K, V> {
table: HashMap<K, usize>,
entries: Vec<CacheEntry<K, V>>,
first: Option<usize>,
last: Option<usize>,
capacity: usize,
}
impl<K: Clone + Hash + Eq, V> LRUCache<K, V> {
/// Creates a new cache that can hold the specified number of elements.
pub fn with_capacity(size: usize) -> Self {
LRUCache {
table: HashMap::with_capacity(size),
entries: Vec::with_capacity(size),
first: None,
last: None,
capacity: size,
}
}
/// Inserts a key-value pair into the cache and returns the previous value, if any.
/// If there is no room in the cache the oldest item will be removed.
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
if self.table.contains_key(&key) {
self.access(&key);
let entry = &mut self.entries[self.first.unwrap()];
let old = entry.value.take();
entry.value = Some(value);
old
} else {
self.ensure_room();
// Update old head
let idx = self.entries.len();
self.first.map(|e| {
let prev = Some(idx);
self.entries[e].prev = prev;
});
// This is the new head
self.entries.push(CacheEntry {
key: key.clone(),
value: Some(value),
next: self.first,
prev: None,
});
self.first = Some(idx);
self.last = self.last.or(self.first);
self.table.insert(key, idx);
None
}
}
/// Retrieves a reference to the item associated with `key` from the cache
/// without promoting it.
pub fn peek(&mut self, key: &K) -> Option<&V> {
let entries = &self.entries;
self.table
.get(key)
.and_then(move |i| entries[*i].value.as_ref())
}
/// Retrieves a reference to the item associated with `key` from the cache.
pub fn get(&mut self, key: &K) -> Option<&V> {
if self.contains_key(key) {
self.access(key);
}
self.peek(key)
}
/// Returns the number of elements currently in the cache.
pub fn len(&self) -> usize {
self.table.len()
}
/// Promotes the specified key to the top of the cache.
fn access(&mut self, key: &K) {
let i = *self.table.get(key).unwrap();
self.remove_from_list(i);
self.first = Some(i);
}
pub fn contains_key(&mut self, key: &K) -> bool {
self.table.contains_key(key)
}
/// Removes an item from the linked list.
fn remove_from_list(&mut self, i: usize) {
let (prev, next) = {
let entry = &mut self.entries[i];
(entry.prev, entry.next)
};
match (prev, next) {
// Item was in the middle of the list
(Some(j), Some(k)) => {
{
let first = &mut self.entries[j];
first.next = next;
}
let second = &mut self.entries[k];
second.prev = prev;
}
// Item was at the end of the list
(Some(j), None) => {
let first = &mut self.entries[j];
first.next = None;
self.last = prev;
}
// Item was at front
_ => (),
}
}
fn ensure_room(&mut self) {
if self.capacity == self.len() {
self.remove_last();
}
}
/// Removes the oldest item in the cache.
fn remove_last(&mut self) {
if let Some(idx) = self.last {
self.remove_from_list(idx);
let key = &self.entries[idx].key;
self.table.remove(key);
}
if self.last.is_none() {
self.first = None;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_and_get_mut_promote() {
let mut cache: LRUCache<&str, _> = LRUCache::with_capacity(2);
cache.insert("foo", 1);
cache.insert("bar", 2);
cache.get(&"foo").unwrap();
cache.insert("baz", 3);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"baz"));
assert!(!cache.contains_key(&"bar"));
cache.get(&"foo").unwrap();
cache.insert("qux", 4);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"qux"));
assert!(!cache.contains_key(&"baz"));
}
}

3
src/cache/mod.rs vendored Executable file
View File

@ -0,0 +1,3 @@
pub mod lru_cache;
pub mod cache_out;
pub mod cache_in;

View File

@ -1,263 +0,0 @@
//! Cache metric definitions.
use core::{Input, InputDyn, WithAttributes, Attributes, InputScope, InputMetric, Name, Flush, Kind, AddPrefix};
use error;
use std::sync::{Arc, RwLock};
/// Wrap an output with a metric definition cache.
/// This is useless if all metrics are statically declared but can provide performance
/// benefits if some metrics are dynamically defined at runtime.
pub trait CachedInput: Input + Send + Sync + 'static + Sized {
/// Wrap this output with an asynchronous dispatch queue of specified length.
fn cached(self, max_size: usize) -> InputCache {
InputCache::wrap(self, max_size)
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct InputCache {
attributes: Attributes,
target: Arc<InputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
}
impl InputCache {
/// Wrap scopes with an asynchronous metric write & flush dispatcher.
pub fn wrap<OUT: Input + Send + Sync + 'static>(target: OUT, max_size: usize) -> InputCache {
InputCache {
attributes: Attributes::default(),
target: Arc::new(target),
cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size)))
}
}
}
impl WithAttributes for InputCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Input for InputCache {
type SCOPE = InputScopeCache;
fn input(&self) -> Self::SCOPE {
let target = self.target.input_dyn();
InputScopeCache {
attributes: self.attributes.clone(),
target,
cache: self.cache.clone(),
}
}
}
/// Input wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct InputScopeCache {
attributes: Attributes,
target: Arc<InputScope + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, InputMetric>>>,
}
impl WithAttributes for InputScopeCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for InputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.qualified_name(name);
let lookup = {
let mut cache = self.cache.write().expect("Cache Lock");
cache.get(&name).map(|found| found.clone())
};
lookup.unwrap_or_else(|| {
let new_metric = self.target.new_metric(name.clone(), kind);
// FIXME (perf) having to take another write lock for a cache miss
let mut cache_miss = self.cache.write().expect("Cache Lock");
cache_miss.insert(name, new_metric.clone());
new_metric
})
}
}
impl Flush for InputScopeCache {
fn flush(&self) -> error::Result<()> {
self.target.flush()
}
}
mod lru {
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
use std::hash::Hash;
use std::collections::HashMap;
struct CacheEntry<K, V> {
key: K,
value: Option<V>,
next: Option<usize>,
prev: Option<usize>,
}
/// A fixed-size cache.
pub struct LRUCache<K, V> {
table: HashMap<K, usize>,
entries: Vec<CacheEntry<K, V>>,
first: Option<usize>,
last: Option<usize>,
capacity: usize,
}
impl<K: Clone + Hash + Eq, V> LRUCache<K, V> {
/// Creates a new cache that can hold the specified number of elements.
pub fn with_capacity(size: usize) -> Self {
LRUCache {
table: HashMap::with_capacity(size),
entries: Vec::with_capacity(size),
first: None,
last: None,
capacity: size,
}
}
/// Inserts a key-value pair into the cache and returns the previous value, if any.
/// If there is no room in the cache the oldest item will be removed.
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
if self.table.contains_key(&key) {
self.access(&key);
let entry = &mut self.entries[self.first.unwrap()];
let old = entry.value.take();
entry.value = Some(value);
old
} else {
self.ensure_room();
// Update old head
let idx = self.entries.len();
self.first.map(|e| {
let prev = Some(idx);
self.entries[e].prev = prev;
});
// This is the new head
self.entries.push(CacheEntry {
key: key.clone(),
value: Some(value),
next: self.first,
prev: None,
});
self.first = Some(idx);
self.last = self.last.or(self.first);
self.table.insert(key, idx);
None
}
}
/// Retrieves a reference to the item associated with `key` from the cache
/// without promoting it.
pub fn peek(&mut self, key: &K) -> Option<&V> {
let entries = &self.entries;
self.table
.get(key)
.and_then(move |i| entries[*i].value.as_ref())
}
/// Retrieves a reference to the item associated with `key` from the cache.
pub fn get(&mut self, key: &K) -> Option<&V> {
if self.contains_key(key) {
self.access(key);
}
self.peek(key)
}
/// Returns the number of elements currently in the cache.
pub fn len(&self) -> usize {
self.table.len()
}
/// Promotes the specified key to the top of the cache.
fn access(&mut self, key: &K) {
let i = *self.table.get(key).unwrap();
self.remove_from_list(i);
self.first = Some(i);
}
pub fn contains_key(&mut self, key: &K) -> bool {
self.table.contains_key(key)
}
/// Removes an item from the linked list.
fn remove_from_list(&mut self, i: usize) {
let (prev, next) = {
let entry = &mut self.entries[i];
(entry.prev, entry.next)
};
match (prev, next) {
// Item was in the middle of the list
(Some(j), Some(k)) => {
{
let first = &mut self.entries[j];
first.next = next;
}
let second = &mut self.entries[k];
second.prev = prev;
}
// Item was at the end of the list
(Some(j), None) => {
let first = &mut self.entries[j];
first.next = None;
self.last = prev;
}
// Item was at front
_ => (),
}
}
fn ensure_room(&mut self) {
if self.capacity == self.len() {
self.remove_last();
}
}
/// Removes the oldest item in the cache.
fn remove_last(&mut self) {
if let Some(idx) = self.last {
self.remove_from_list(idx);
let key = &self.entries[idx].key;
self.table.remove(key);
}
if self.last.is_none() {
self.first = None;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_and_get_mut_promote() {
let mut cache: LRUCache<&str, _> = LRUCache::with_capacity(2);
cache.insert("foo", 1);
cache.insert("bar", 2);
cache.get(&"foo").unwrap();
cache.insert("baz", 3);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"baz"));
assert!(!cache.contains_key(&"bar"));
cache.get(&"foo").unwrap();
cache.insert("qux", 4);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"qux"));
assert!(!cache.contains_key(&"baz"));
}
}
}

View File

@ -1,264 +0,0 @@
//! Cache metric definitions.
use core::{Output, OutputDyn, WithAttributes, Attributes, OutputScope, OutputMetric, Name, Flush, Kind, AddPrefix};
use error;
use std::sync::{Arc, RwLock};
use std::rc::Rc;
/// Wrap an output with a metric definition cache.
/// This is useless if all metrics are statically declared but can provide performance
/// benefits if some metrics are dynamically defined at runtime.
pub trait CachedOutput: Output + Send + Sync + 'static + Sized {
/// Wrap this output with an asynchronous dispatch queue of specified length.
fn cached(self, max_size: usize) -> OutputCache {
OutputCache::wrap(self, max_size)
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct OutputCache {
attributes: Attributes,
target: Arc<OutputDyn + Send + Sync + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
}
impl OutputCache {
/// Wrap scopes with an asynchronous metric write & flush dispatcher.
pub fn wrap<OUT: Output + Send + Sync + 'static>(target: OUT, max_size: usize) -> OutputCache {
OutputCache {
attributes: Attributes::default(),
target: Arc::new(target),
cache: Arc::new(RwLock::new(lru::LRUCache::with_capacity(max_size)))
}
}
}
impl WithAttributes for OutputCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Output for OutputCache {
type SCOPE = OutputScopeCache;
fn output(&self) -> Self::SCOPE {
let target = self.target.output_dyn();
OutputScopeCache {
attributes: self.attributes.clone(),
target,
cache: self.cache.clone(),
}
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct OutputScopeCache {
attributes: Attributes,
target: Rc<OutputScope + 'static>,
cache: Arc<RwLock<lru::LRUCache<Name, OutputMetric>>>,
}
impl WithAttributes for OutputScopeCache {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl OutputScope for OutputScopeCache {
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric {
let name = self.qualified_name(name);
let lookup = {
let mut cache = self.cache.write().expect("Cache Lock");
cache.get(&name).map(|found| found.clone())
};
lookup.unwrap_or_else(|| {
let new_metric: OutputMetric = self.target.new_metric(name.clone(), kind);
// FIXME (perf) having to take another write lock for a cache miss
let mut cache_miss = self.cache.write().expect("Cache Lock");
cache_miss.insert(name, new_metric.clone());
new_metric
})
}
}
impl Flush for OutputScopeCache {
fn flush(&self) -> error::Result<()> {
self.target.flush()
}
}
mod lru {
//! A fixed-size cache with LRU expiration criteria.
//! Stored values will be held onto as long as there is space.
//! When space runs out, the oldest unused value will get evicted to make room for a new value.
use std::hash::Hash;
use std::collections::HashMap;
struct CacheEntry<K, V> {
key: K,
value: Option<V>,
next: Option<usize>,
prev: Option<usize>,
}
/// A fixed-size cache.
pub struct LRUCache<K, V> {
table: HashMap<K, usize>,
entries: Vec<CacheEntry<K, V>>,
first: Option<usize>,
last: Option<usize>,
capacity: usize,
}
impl<K: Clone + Hash + Eq, V> LRUCache<K, V> {
/// Creates a new cache that can hold the specified number of elements.
pub fn with_capacity(size: usize) -> Self {
LRUCache {
table: HashMap::with_capacity(size),
entries: Vec::with_capacity(size),
first: None,
last: None,
capacity: size,
}
}
/// Inserts a key-value pair into the cache and returns the previous value, if any.
/// If there is no room in the cache the oldest item will be removed.
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
if self.table.contains_key(&key) {
self.access(&key);
let entry = &mut self.entries[self.first.unwrap()];
let old = entry.value.take();
entry.value = Some(value);
old
} else {
self.ensure_room();
// Update old head
let idx = self.entries.len();
self.first.map(|e| {
let prev = Some(idx);
self.entries[e].prev = prev;
});
// This is the new head
self.entries.push(CacheEntry {
key: key.clone(),
value: Some(value),
next: self.first,
prev: None,
});
self.first = Some(idx);
self.last = self.last.or(self.first);
self.table.insert(key, idx);
None
}
}
/// Retrieves a reference to the item associated with `key` from the cache
/// without promoting it.
pub fn peek(&mut self, key: &K) -> Option<&V> {
let entries = &self.entries;
self.table
.get(key)
.and_then(move |i| entries[*i].value.as_ref())
}
/// Retrieves a reference to the item associated with `key` from the cache.
pub fn get(&mut self, key: &K) -> Option<&V> {
if self.contains_key(key) {
self.access(key);
}
self.peek(key)
}
/// Returns the number of elements currently in the cache.
pub fn len(&self) -> usize {
self.table.len()
}
/// Promotes the specified key to the top of the cache.
fn access(&mut self, key: &K) {
let i = *self.table.get(key).unwrap();
self.remove_from_list(i);
self.first = Some(i);
}
pub fn contains_key(&mut self, key: &K) -> bool {
self.table.contains_key(key)
}
/// Removes an item from the linked list.
fn remove_from_list(&mut self, i: usize) {
let (prev, next) = {
let entry = &mut self.entries[i];
(entry.prev, entry.next)
};
match (prev, next) {
// Item was in the middle of the list
(Some(j), Some(k)) => {
{
let first = &mut self.entries[j];
first.next = next;
}
let second = &mut self.entries[k];
second.prev = prev;
}
// Item was at the end of the list
(Some(j), None) => {
let first = &mut self.entries[j];
first.next = None;
self.last = prev;
}
// Item was at front
_ => (),
}
}
fn ensure_room(&mut self) {
if self.capacity == self.len() {
self.remove_last();
}
}
/// Removes the oldest item in the cache.
fn remove_last(&mut self) {
if let Some(idx) = self.last {
self.remove_from_list(idx);
let key = &self.entries[idx].key;
self.table.remove(key);
}
if self.last.is_none() {
self.first = None;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_and_get_mut_promote() {
let mut cache: LRUCache<&str, _> = LRUCache::with_capacity(2);
cache.insert("foo", 1);
cache.insert("bar", 2);
cache.get(&"foo").unwrap();
cache.insert("baz", 3);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"baz"));
assert!(!cache.contains_key(&"bar"));
cache.get(&"foo").unwrap();
cache.insert("qux", 4);
assert!(cache.contains_key(&"foo"));
assert!(cache.contains_key(&"qux"));
assert!(!cache.contains_key(&"baz"));
}
}
}

View File

@ -1,686 +0,0 @@
//! Dipstick metrics core types and traits.
//! This is mostly centered around the backend.
//! Application-facing types are in the `app` module.
use clock::TimeHandle;
use error;
use std::sync::{Arc, Mutex};
use std::ops;
use std::rc::Rc;
use std::fmt;
use std::collections::HashMap;
// TODO maybe define an 'AsValue' trait + impl for supported number types, then drop 'num' crate
pub use num::ToPrimitive;
/// Base type for recorded metric values.
pub type Value = u64;
/////// ATTRIBUTES
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
#[derive(Debug, Clone, Copy)]
pub enum Sampling {
/// Do not sample, use all data.
Full,
/// Floating point sampling rate
/// - 1.0+ records everything
/// - 0.5 records one of two values
/// - 0.0 records nothing
Random(f64)
}
impl Default for Sampling {
fn default() -> Self {
Sampling::Full
}
}
/// A metrics buffering strategy.
/// All strategies other than `Unbuffered` are applied as a best-effort, meaning that the buffer
/// may be flushed at any moment before reaching the limit, for any or no reason in particular.
#[derive(Debug, Clone, Copy)]
pub enum Buffering {
/// No buffering is performed (default).
Unbuffered,
/// A buffer of maximum specified size is used.
BufferSize(usize),
/// Buffer as much as possible.
Unlimited,
}
impl Default for Buffering {
fn default() -> Self {
Buffering::Unbuffered
}
}
/// One struct to rule them all.
/// Possible attributes of metric outputs and scopes.
/// Private trait used by impls of specific With* traits.
/// Not all attributes are used by all structs!
/// This is a design choice to centralize code at the expense of slight waste of memory.
/// Fields have also not been made `pub` to make it easy to change this mechanism.
/// Field access must go through `is_` and `get_` methods declared in sub-traits.
#[derive(Debug, Clone, Default)]
pub struct Attributes {
namespace: Name,
sampling: Sampling,
buffering: Buffering,
}
/// The only trait that requires concrete impl by metric components.
/// Default impl of actual attributes use this to clone & mutate the original component.
/// This trait is _not_ exposed by the lib.
pub trait WithAttributes: Clone {
/// Return attributes for evaluation.
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn get_attributes(&self) -> &Attributes;
/// Return attributes of component to be mutated after cloning.
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn mut_attributes(&mut self) -> &mut Attributes;
/// Clone this component and its attributes before returning it.
/// This means one of the attributes will be cloned only to be replaced immediately.
/// But the benefits of a generic solution means we can live with that for a while.
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self {
let mut cloned = self.clone();
(edit)(cloned.mut_attributes());
cloned
}
}
/// Name operations support.
pub trait AddPrefix {
/// Return the namespace of the component.
fn get_namespace(&self) -> &Name;
/// Join namespace and prepend in newly defined metrics.
fn add_prefix(&self, name: &str) -> Self;
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, metric_name: Name) -> Name;
}
/// Name operations support.
pub trait AddTag {
/// Return the namespace of the component.
fn get_tags(&self) -> &Arc<HashMap<String, String>>;
/// Join namespace and prepend in newly defined metrics.
fn add_tag(&self, name: &str) -> Self;
}
impl<T: WithAttributes> AddPrefix for T {
fn get_namespace(&self) -> &Name {
&self.get_attributes().namespace
}
/// Join namespace and prepend in newly defined metrics.
fn add_prefix(&self, name: &str) -> Self {
self.with_attributes(|new_attr| new_attr.namespace = new_attr.namespace.concat(name))
}
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, name: Name) -> Name {
// FIXME (perf) store name in reverse to prepend with an actual push() to the vec
self.get_attributes().namespace.concat(name)
}
}
/// Apply statistical sampling to collected metrics data.
pub trait Sampled: WithAttributes {
/// Perform random sampling of values according to the specified rate.
fn sampled(&self, sampling: Sampling) -> Self {
self.with_attributes(|new_attr| new_attr.sampling = sampling)
}
/// Get the sampling strategy for this component, if any.
fn get_sampling(&self) -> Sampling {
self.get_attributes().sampling
}
}
/// Determine scope buffering strategy, if supported by output.
/// Changing this only affects scopes opened afterwards.
/// Buffering is done on best effort, meaning flush will occur if buffer capacity is exceeded.
pub trait Buffered: WithAttributes {
/// Return a clone with the specified buffering set.
fn buffered(&self, buffering: Buffering) -> Self {
self.with_attributes(|new_attr| new_attr.buffering = buffering)
}
/// Is this component using buffering?
fn is_buffered(&self) -> bool {
match self.get_buffering() {
Buffering::Unbuffered => false,
_ => true
}
}
/// Return the buffering.
fn get_buffering(&self) -> Buffering {
self.get_attributes().buffering
}
}
/// A namespace for metrics.
/// Does _not_ include the metric's "short" name itself.
/// Can be empty.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
pub struct Name {
inner: Vec<String>,
}
impl Name {
/// Concatenate with another namespace into a new one.
pub fn concat(&self, name: impl Into<Name>) -> Self {
let mut cloned = self.clone();
cloned.inner.extend_from_slice(&name.into().inner);
cloned
}
/// Returns true if the specified namespace is a subset or is equal to this namespace.
pub fn starts_with(&self, name: &Name) -> bool {
(self.inner.len() >= name.inner.len()) && (name.inner[..] == self.inner[..name.inner.len()])
}
/// Combine name parts into a string.
pub fn join(&self, separator: &str) -> String {
let mut buf = String::with_capacity(64);
let mut i = self.inner.iter();
if let Some(n) = i.next() {
buf.push_str(n.as_ref());
} else {
return "".into()
}
for n in i {
buf.push_str(separator);
buf.push_str(n.as_ref());
}
buf
}
}
impl ops::Deref for Name {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ops::DerefMut for Name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<S: Into<String>> From<S> for Name {
fn from(name: S) -> Name {
let name: String = name.into();
if name.is_empty() {
Name::default()
} else {
Name { inner: vec![name] }
}
}
}
////// INPUT
lazy_static! {
/// The reference instance identifying an uninitialized metric config.
pub static ref NO_METRIC_OUTPUT: Arc<InputDyn + Send + Sync> = Arc::new(Void::metrics());
/// The reference instance identifying an uninitialized metric scope.
pub static ref NO_METRIC_SCOPE: Arc<InputScope + Send + Sync> = NO_METRIC_OUTPUT.input_dyn();
}
/// A function trait that opens a new metric capture scope.
pub trait Input: Send + Sync + 'static + InputDyn {
/// The type of Scope returned byt this input.
type SCOPE: InputScope + Send + Sync + 'static;
/// Open a new scope from this output.
fn input(&self) -> Self::SCOPE;
}
/// A function trait that opens a new metric capture scope.
pub trait InputDyn: Send + Sync + 'static {
/// Open a new scope from this output.
fn input_dyn(&self) -> Arc<InputScope + Send + Sync + 'static>;
}
/// Blanket impl of dyn input trait
impl<T: Input + Send + Sync + 'static> InputDyn for T {
fn input_dyn(&self) -> Arc<InputScope + Send + Sync + 'static> {
Arc::new(self.input())
}
}
/// InputScope
/// Define metrics, write values and flush them.
pub trait InputScope: Flush {
/// Define a generic metric of the specified type.
/// It is preferable to use counter() / marker() / timer() / gauge() methods.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric;
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.new_metric(name.into(), Kind::Counter).into()
}
/// Define a marker.
fn marker(&self, name: &str) -> Marker {
self.new_metric(name.into(), Kind::Marker).into()
}
/// Define a timer.
fn timer(&self, name: &str) -> Timer {
self.new_metric(name.into(), Kind::Timer).into()
}
/// Define a gauge.
fn gauge(&self, name: &str) -> Gauge {
self.new_metric(name.into(), Kind::Gauge).into()
}
}
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
inner: Arc<Fn(Value) + Send + Sync>
}
impl fmt::Debug for InputMetric {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "InputMetric")
}
}
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(Value) + Send + Sync + 'static>(metric: F) -> InputMetric {
InputMetric { inner: Arc::new(metric) }
}
/// Collect a new value for this metric.
#[inline]
pub fn write(&self, value: Value) {
(self.inner)(value)
}
}
////// OUTPUT
/// Define metrics, write values and flush them.
pub trait OutputScope: Flush {
/// Define a raw metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric;
}
impl OutputMetric {
/// Utility constructor
pub fn new<F: Fn(Value) + 'static>(metric: F) -> OutputMetric {
OutputMetric { inner: Rc::new(metric) }
}
/// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form.
/// This shouldn't matter as metrics should be of type Counter, Marker, etc.
#[inline]
pub fn write(&self, value: Value) {
(self.inner)(value)
}
}
/// A function trait that opens a new metric capture scope.
pub trait Output: Send + Sync + 'static + OutputDyn {
/// The type of Scope returned byt this output.
type SCOPE: OutputScope;
/// Open a new scope from this output.
fn output(&self) -> Self::SCOPE;
}
/// A function trait that opens a new metric capture scope.
pub trait OutputDyn {
/// Open a new scope from this output.
fn output_dyn(&self) -> Rc<OutputScope + 'static>;
}
/// Blanket impl of dyn output trait
impl<T: Output + Send + Sync + 'static> OutputDyn for T {
fn output_dyn(&self) -> Rc<OutputScope + 'static> {
Rc::new(self.output())
}
}
//////// FLUSH
/// Both InputScope and OutputScope share the ability to flush the recorded data.
pub trait Flush {
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
}
}
///////// LOCKING INPUT -> OUTPUT ADAPTER
/// Provide thread-safe locking to RawScope implementers.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
inner: Arc<Mutex<UnsafeScope>>
}
impl WithAttributes for LockingScopeBox {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.qualified_name(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value| {
let _guard = mutex.lock().expect("OutputMetric Lock");
raw_metric.write(value)
} )
}
}
impl Flush for LockingScopeBox {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("OutputScope Lock").flush()
}
}
/// Blanket impl that provides RawOutputs their dynamic flavor.
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingScopeBox;
fn input(&self) -> Self::SCOPE {
LockingScopeBox {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(UnsafeScope(self.output_dyn())))
}
}
}
///////// UNSAFE INPUT -> OUTPUT ADAPTER
/// Wrap a RawScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
pub struct UnsafeScope(Rc<OutputScope + 'static> );
unsafe impl Send for UnsafeScope {}
unsafe impl Sync for UnsafeScope {}
impl UnsafeScope {
/// Wrap a dynamic RawScope to make it Send + Sync.
pub fn new(scope: Rc<OutputScope + 'static>) -> Self {
UnsafeScope(scope)
}
}
impl ops::Deref for UnsafeScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
/// Output metrics are not thread safe.
#[derive(Clone)]
pub struct OutputMetric {
inner: Rc<Fn(Value)>
}
impl fmt::Debug for OutputMetric {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Box<Fn(Value)>")
}
}
unsafe impl Send for OutputMetric {}
unsafe impl Sync for OutputMetric {}
////////////// INSTRUMENTS
/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Kind {
/// Handling one item at a time.
Marker,
/// Handling quantities or multiples.
Counter,
/// Reporting instant measurement of a resource at a point in time.
Gauge,
/// Measuring a time interval, internal to the app or provided by an external source.
Timer,
}
/// Used by the metrics! macro to obtain the Kind from the stringified type.
impl<'a> From<&'a str> for Kind {
fn from(s: &str) -> Kind {
match s {
"Marker" => Kind::Marker,
"Counter" => Kind::Counter,
"Gauge" => Kind::Gauge,
"Timer" => Kind::Timer,
_ => panic!("No Kind '{}' defined", s)
}
}
}
/// A monotonic counter metric.
/// Since value is only ever increased by one, no value parameter is provided,
/// preventing programming errors.
#[derive(Debug, Clone)]
pub struct Marker {
inner: InputMetric,
}
impl Marker {
/// Record a single event occurence.
pub fn mark(&self) {
self.inner.write(1)
}
}
/// A counter that sends values to the metrics backend
#[derive(Debug, Clone)]
pub struct Counter {
inner: InputMetric,
}
impl Counter {
/// Record a value count.
pub fn count<V: ToPrimitive>(&self, count: V) {
self.inner.write(count.to_u64().unwrap())
}
}
/// A gauge that sends values to the metrics backend
#[derive(Debug, Clone)]
pub struct Gauge {
inner: InputMetric,
}
impl Gauge {
/// Record a value point for this gauge.
pub fn value<V: ToPrimitive>(&self, value: V) {
self.inner.write(value.to_u64().unwrap())
}
}
/// A timer that sends values to the metrics backend
/// Timers can record time intervals in multiple ways :
/// - with the time! macrohich wraps an expression or block with start() and stop() calls.
/// - with the time(Fn) methodhich wraps a closure with start() and stop() calls.
/// - with start() and stop() methodsrapping around the operation to time
/// - with the interval_us() method, providing an externally determined microsecond interval
#[derive(Debug, Clone)]
pub struct Timer {
inner: InputMetric,
}
impl Timer {
/// Record a microsecond interval for this timer
/// Can be used in place of start()/stop() if an external time interval source is used
pub fn interval_us<V: ToPrimitive>(&self, interval_us: V) -> V {
self.inner.write(interval_us.to_u64().unwrap());
interval_us
}
/// Obtain a opaque handle to the current time.
/// The handle is passed back to the stop() method to record a time interval.
/// This is actually a convenience method to the TimeHandle::now()
/// Beware, handles obtained here are not bound to this specific timer instance
/// _for now_ but might be in the future for safety.
/// If you require safe multi-timer handles, get them through TimeType::now()
pub fn start(&self) -> TimeHandle {
TimeHandle::now()
}
/// Record the time elapsed since the start_time handle was obtained.
/// This call can be performed multiple times using the same handle,
/// reporting distinct time intervals each time.
/// Returns the microsecond interval value that was recorded.
pub fn stop(&self, start_time: TimeHandle) -> u64 {
let elapsed_us = start_time.elapsed_us();
self.interval_us(elapsed_us)
}
/// Record the time taken to execute the provided closure
pub fn time<F: FnOnce() -> R, R>(&self, operations: F) -> R {
let start_time = self.start();
let value: R = operations();
self.stop(start_time);
value
}
}
impl From<InputMetric> for Gauge {
fn from(metric: InputMetric) -> Gauge {
Gauge { inner: metric }
}
}
impl From<InputMetric> for Timer {
fn from(metric: InputMetric) -> Timer {
Timer { inner: metric }
}
}
impl From<InputMetric> for Counter {
fn from(metric: InputMetric) -> Counter {
Counter { inner: metric }
}
}
impl From<InputMetric> for Marker {
fn from(metric: InputMetric) -> Marker {
Marker { inner: metric }
}
}
/// VOID INPUT & OUTPUT
/// Discard metrics output.
#[derive(Clone)]
pub struct Void {}
/// Discard metrics output.
#[derive(Clone)]
pub struct VoidInput {}
/// Discard metrics output.
#[derive(Clone)]
pub struct VoidOutput {}
impl Void {
/// Void metrics builder.
pub fn metrics() -> Self {
Void {}
}
}
impl Output for Void {
type SCOPE = VoidOutput;
fn output(&self) -> VoidOutput {
VoidOutput {}
}
}
impl OutputScope for VoidOutput {
fn new_metric(&self, _name: Name, _kind: Kind) -> OutputMetric {
OutputMetric::new(|_value| {})
}
}
impl Flush for VoidOutput {
}
/// Discard all metric values sent to it.
pub fn output_none() -> Void {
Void {}
}
#[cfg(test)]
mod test {
use core::*;
#[test]
fn test_to_void() {
let c = Void::metrics().input();
let m = c.new_metric("test".into(), Kind::Marker);
m.write(33);
}
}
#[cfg(feature = "bench")]
mod bench {
use core::*;
use clock::TimeHandle;
use test;
use bucket::Bucket;
#[bench]
fn get_instant(b: &mut test::Bencher) {
b.iter(|| test::black_box(TimeHandle::now()));
}
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = Bucket::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}
}

3
src/clock.rs → src/core/clock.rs Normal file → Executable file
View File

@ -1,5 +1,6 @@
use std::ops::Add;
use std::time::{Duration, Instant};
use core::Value;
#[derive(Debug, Copy, Clone)]
@ -14,7 +15,7 @@ impl TimeHandle {
TimeHandle(now())
}
/// Get the elapsed time in microseconds since TimeHanduule was obtained.
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_us(self) -> Value {
let duration = now() - self.0;
duration.as_secs() * 1000000 + (duration.subsec_nanos() / 1000) as Value

220
src/core/component.rs Executable file
View File

@ -0,0 +1,220 @@
use std::sync::Arc;
use std::ops;
use std::collections::HashMap;
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
#[derive(Debug, Clone, Copy)]
pub enum Sampling {
/// Do not sample, use all data.
Full,
/// Floating point sampling rate
/// - 1.0+ records everything
/// - 0.5 records one of two values
/// - 0.0 records nothing
Random(f64)
}
impl Default for Sampling {
fn default() -> Self {
Sampling::Full
}
}
/// A metrics buffering strategy.
/// All strategies other than `Unbuffered` are applied as a best-effort, meaning that the buffer
/// may be flushed at any moment before reaching the limit, for any or no reason in particular.
#[derive(Debug, Clone, Copy)]
pub enum Buffering {
/// No buffering is performed (default).
Unbuffered,
/// A buffer of maximum specified size is used.
BufferSize(usize),
/// Buffer as much as possible.
Unlimited,
}
impl Default for Buffering {
fn default() -> Self {
Buffering::Unbuffered
}
}
/// One struct to rule them all.
/// Possible attributes of metric outputs and scopes.
/// Private trait used by impls of specific With* traits.
/// Not all attributes are used by all structs!
/// This is a design choice to centralize code at the expense of slight waste of memory.
/// Fields have also not been made `pub` to make it easy to change this mechanism.
/// Field access must go through `is_` and `get_` methods declared in sub-traits.
#[derive(Debug, Clone, Default)]
pub struct Attributes {
namespace: Name,
sampling: Sampling,
buffering: Buffering,
}
/// The only trait that requires concrete impl by metric components.
/// Default impl of actual attributes use this to clone & mutate the original component.
/// This trait is _not_ exposed by the lib.
pub trait WithAttributes: Clone {
/// Return attributes for evaluation.
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn get_attributes(&self) -> &Attributes;
/// Return attributes of component to be mutated after cloning.
// TODO replace with fields-in-traits if ever stabilized (https://github.com/nikomatsakis/fields-in-traits-rfc)
fn mut_attributes(&mut self) -> &mut Attributes;
/// Clone this component and its attributes before returning it.
/// This means one of the attributes will be cloned only to be replaced immediately.
/// But the benefits of a generic solution means we can live with that for a while.
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self {
let mut cloned = self.clone();
(edit)(cloned.mut_attributes());
cloned
}
}
/// Name operations support.
pub trait AddPrefix {
/// Return the namespace of the component.
fn get_namespace(&self) -> &Name;
/// Join namespace and prepend in newly defined metrics.
fn add_prefix(&self, name: &str) -> Self;
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, metric_name: Name) -> Name;
}
/// Name operations support.
pub trait Label {
/// Return the namespace of the component.
fn get_label(&self) -> &Arc<HashMap<String, String>>;
/// Join namespace and prepend in newly defined metrics.
fn label(&self, name: &str) -> Self;
}
impl<T: WithAttributes> AddPrefix for T {
fn get_namespace(&self) -> &Name {
&self.get_attributes().namespace
}
/// Join namespace and prepend in newly defined metrics.
fn add_prefix(&self, name: &str) -> Self {
self.with_attributes(|new_attr| new_attr.namespace = new_attr.namespace.concat(name))
}
/// Append the specified name to the local namespace and return the concatenated result.
fn qualified_name(&self, name: Name) -> Name {
// FIXME (perf) store name in reverse to prepend with an actual push() to the vec
self.get_attributes().namespace.concat(name)
}
}
/// Apply statistical sampling to collected metrics data.
pub trait Sampled: WithAttributes {
/// Perform random sampling of values according to the specified rate.
fn sampled(&self, sampling: Sampling) -> Self {
self.with_attributes(|new_attr| new_attr.sampling = sampling)
}
/// Get the sampling strategy for this component, if any.
fn get_sampling(&self) -> Sampling {
self.get_attributes().sampling
}
}
/// Determine scope buffering strategy, if supported by output.
/// Changing this only affects scopes opened afterwards.
/// Buffering is done on best effort, meaning flush will occur if buffer capacity is exceeded.
pub trait Buffered: WithAttributes {
/// Return a clone with the specified buffering set.
fn buffered(&self, buffering: Buffering) -> Self {
self.with_attributes(|new_attr| new_attr.buffering = buffering)
}
/// Is this component using buffering?
fn is_buffered(&self) -> bool {
match self.get_buffering() {
Buffering::Unbuffered => false,
_ => true
}
}
/// Return the buffering.
fn get_buffering(&self) -> Buffering {
self.get_attributes().buffering
}
}
/// A namespace for metrics.
/// Does _not_ include the metric's "short" name itself.
/// Can be empty.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
pub struct Name {
inner: Vec<String>,
}
impl Name {
/// Concatenate with another namespace into a new one.
pub fn concat(&self, name: impl Into<Name>) -> Self {
let mut cloned = self.clone();
cloned.inner.extend_from_slice(&name.into().inner);
cloned
}
/// Returns true if the specified namespace is a subset or is equal to this namespace.
pub fn starts_with(&self, name: &Name) -> bool {
(self.inner.len() >= name.inner.len()) && (name.inner[..] == self.inner[..name.inner.len()])
}
/// Combine name parts into a string.
pub fn join(&self, separator: &str) -> String {
let mut buf = String::with_capacity(64);
let mut i = self.inner.iter();
if let Some(n) = i.next() {
buf.push_str(n.as_ref());
} else {
return "".into()
}
for n in i {
buf.push_str(separator);
buf.push_str(n.as_ref());
}
buf
}
}
impl ops::Deref for Name {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ops::DerefMut for Name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<S: Into<String>> From<S> for Name {
fn from(name: S) -> Name {
let name: String = name.into();
if name.is_empty() {
Name::default()
} else {
Name { inner: vec![name] }
}
}
}

View File

@ -5,8 +5,8 @@ use std::error;
use std::fmt::{self, Display, Formatter};
use std::result;
use std::sync::mpsc;
use queue_in;
use queue_out;
use queue::queue_in;
use queue::queue_out;
use self::Error::*;

224
src/core/input.rs Executable file
View File

@ -0,0 +1,224 @@
use core::clock::TimeHandle;
use core::{Value, Flush};
use core::component::Name;
use std::sync::Arc;
use std::fmt;
// TODO maybe define an 'AsValue' trait + impl for supported number types, then drop 'num' crate
pub use num::ToPrimitive;
/// A function trait that opens a new metric capture scope.
pub trait Input: Send + Sync + 'static + InputDyn {
/// The type of Scope returned byt this input.
type SCOPE: InputScope + Send + Sync + 'static;
/// Open a new scope from this output.
fn input(&self) -> Self::SCOPE;
}
/// A function trait that opens a new metric capture scope.
pub trait InputDyn: Send + Sync + 'static {
/// Open a new scope from this output.
fn input_dyn(&self) -> Arc<InputScope + Send + Sync + 'static>;
}
/// Blanket impl of dyn input trait
impl<T: Input + Send + Sync + 'static> InputDyn for T {
fn input_dyn(&self) -> Arc<InputScope + Send + Sync + 'static> {
Arc::new(self.input())
}
}
/// InputScope
/// Define metrics, write values and flush them.
pub trait InputScope: Flush {
/// Define a generic metric of the specified type.
/// It is preferable to use counter() / marker() / timer() / gauge() methods.
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric;
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.new_metric(name.into(), Kind::Counter).into()
}
/// Define a marker.
fn marker(&self, name: &str) -> Marker {
self.new_metric(name.into(), Kind::Marker).into()
}
/// Define a timer.
fn timer(&self, name: &str) -> Timer {
self.new_metric(name.into(), Kind::Timer).into()
}
/// Define a gauge.
fn gauge(&self, name: &str) -> Gauge {
self.new_metric(name.into(), Kind::Gauge).into()
}
}
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
inner: Arc<Fn(Value) + Send + Sync>
}
impl fmt::Debug for InputMetric {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "InputMetric")
}
}
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(Value) + Send + Sync + 'static>(metric: F) -> InputMetric {
InputMetric { inner: Arc::new(metric) }
}
/// Collect a new value for this metric.
#[inline]
pub fn write(&self, value: Value) {
(self.inner)(value)
}
}
/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Kind {
/// Handling one item at a time.
Marker,
/// Handling quantities or multiples.
Counter,
/// Reporting instant measurement of a resource at a point in time.
Gauge,
/// Measuring a time interval, internal to the app or provided by an external source.
Timer,
}
/// Used by the metrics! macro to obtain the Kind from the stringified type.
impl<'a> From<&'a str> for Kind {
fn from(s: &str) -> Kind {
match s {
"Marker" => Kind::Marker,
"Counter" => Kind::Counter,
"Gauge" => Kind::Gauge,
"Timer" => Kind::Timer,
_ => panic!("No Kind '{}' defined", s)
}
}
}
/// A monotonic counter metric.
/// Since value is only ever increased by one, no value parameter is provided,
/// preventing programming errors.
#[derive(Debug, Clone)]
pub struct Marker {
inner: InputMetric,
}
impl Marker {
/// Record a single event occurence.
pub fn mark(&self) {
self.inner.write(1)
}
}
/// A counter that sends values to the metrics backend
#[derive(Debug, Clone)]
pub struct Counter {
inner: InputMetric,
}
impl Counter {
/// Record a value count.
pub fn count<V: ToPrimitive>(&self, count: V) {
self.inner.write(count.to_u64().unwrap())
}
}
/// A gauge that sends values to the metrics backend
#[derive(Debug, Clone)]
pub struct Gauge {
inner: InputMetric,
}
impl Gauge {
/// Record a value point for this gauge.
pub fn value<V: ToPrimitive>(&self, value: V) {
self.inner.write(value.to_u64().unwrap())
}
}
/// A timer that sends values to the metrics backend
/// Timers can record time intervals in multiple ways :
/// - with the time! macrohich wraps an expression or block with start() and stop() calls.
/// - with the time(Fn) methodhich wraps a closure with start() and stop() calls.
/// - with start() and stop() methodsrapping around the operation to time
/// - with the interval_us() method, providing an externally determined microsecond interval
#[derive(Debug, Clone)]
pub struct Timer {
inner: InputMetric,
}
impl Timer {
/// Record a microsecond interval for this timer
/// Can be used in place of start()/stop() if an external time interval source is used
pub fn interval_us<V: ToPrimitive>(&self, interval_us: V) -> V {
self.inner.write(interval_us.to_u64().unwrap());
interval_us
}
/// Obtain a opaque handle to the current time.
/// The handle is passed back to the stop() method to record a time interval.
/// This is actually a convenience method to the TimeHandle::now()
/// Beware, handles obtained here are not bound to this specific timer instance
/// _for now_ but might be in the future for safety.
/// If you require safe multi-timer handles, get them through TimeType::now()
pub fn start(&self) -> TimeHandle {
TimeHandle::now()
}
/// Record the time elapsed since the start_time handle was obtained.
/// This call can be performed multiple times using the same handle,
/// reporting distinct time intervals each time.
/// Returns the microsecond interval value that was recorded.
pub fn stop(&self, start_time: TimeHandle) -> u64 {
let elapsed_us = start_time.elapsed_us();
self.interval_us(elapsed_us)
}
/// Record the time taken to execute the provided closure
pub fn time<F: FnOnce() -> R, R>(&self, operations: F) -> R {
let start_time = self.start();
let value: R = operations();
self.stop(start_time);
value
}
}
impl From<InputMetric> for Gauge {
fn from(metric: InputMetric) -> Gauge {
Gauge { inner: metric }
}
}
impl From<InputMetric> for Timer {
fn from(metric: InputMetric) -> Timer {
Timer { inner: metric }
}
}
impl From<InputMetric> for Counter {
fn from(metric: InputMetric) -> Counter {
Counter { inner: metric }
}
}
impl From<InputMetric> for Marker {
fn from(metric: InputMetric) -> Marker {
Marker { inner: metric }
}
}

View File

@ -2,8 +2,9 @@
//! Because the possibly high volume of data, this is pre-set to use aggregation.
//! This is also kept in a separate module because it is not to be exposed outside of the crate.
use core::{Marker, AddPrefix, InputScope, Counter};
use proxy::Proxy;
use core::component::AddPrefix;
use core::input::{Marker, InputScope, Counter};
use core::proxy::Proxy;
metrics!{
/// Dipstick's own internal metrics.

58
src/core/mod.rs Executable file
View File

@ -0,0 +1,58 @@
pub mod error;
pub mod component;
pub mod input;
pub mod output;
pub mod out_lock;
pub mod clock;
pub mod void;
pub mod proxy;
pub mod pcg32;
pub mod scheduler;
pub mod metrics;
/// Base type for recorded metric values.
pub type Value = u64;
/// Both InputScope and OutputScope share the ability to flush the recorded data.
pub trait Flush {
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
}
}
#[cfg(test)]
pub mod test {
use super::*;
use super::input::*;
#[test]
fn test_to_void() {
let c = void::Void::metrics().input();
let m = c.new_metric("test".into(), input::Kind::Marker);
m.write(33);
}
}
#[cfg(feature = "bench")]
pub mod bench {
use core::{TimeHandle, Marker, Input};
use aggregate::bucket::Bucket;
use super::clock::TimeHandle;
use test;
use aggregate::Bucket;
#[bench]
fn get_instant(b: &mut test::Bencher) {
b.iter(|| test::black_box(TimeHandle::now()));
}
#[bench]
fn time_bench_direct_dispatch_event(b: &mut test::Bencher) {
let metrics = Bucket::new();
let marker = metrics.marker("aaa");
b.iter(|| test::black_box(marker.mark()));
}
}

69
src/core/out_lock.rs Executable file
View File

@ -0,0 +1,69 @@
use core::input::{InputScope, InputMetric, Input, Kind};
use core::output::{Output, OutputScope};
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use core::Flush;
use core::error;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Provide thread-safe locking to RawScope implementers.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
inner: Arc<Mutex<LockScope>>
}
impl WithAttributes for LockingScopeBox {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: Name, kind: Kind) -> InputMetric {
let name = self.qualified_name(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value| {
let _guard = mutex.lock().expect("OutputMetric Lock");
raw_metric.write(value)
} )
}
}
impl Flush for LockingScopeBox {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("OutputScope Lock").flush()
}
}
/// Blanket impl that provides RawOutputs their dynamic flavor.
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingScopeBox;
fn input(&self) -> Self::SCOPE {
LockingScopeBox {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(LockScope(self.output_dyn())))
}
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
struct LockScope(Rc<OutputScope + 'static> );
impl ops::Deref for LockScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
unsafe impl Send for LockScope {}
unsafe impl Sync for LockScope {}

62
src/core/output.rs Executable file
View File

@ -0,0 +1,62 @@
use core::{Flush, Value};
use core::input::Kind;
use core::component::Name;
use core::void::Void;
use std::rc::Rc;
/// Define metrics, write values and flush them.
pub trait OutputScope: Flush {
/// Define a raw metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> OutputMetric;
}
impl OutputMetric {
/// Utility constructor
pub fn new<F: Fn(Value) + 'static>(metric: F) -> OutputMetric {
OutputMetric { inner: Rc::new(metric) }
}
/// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form.
/// This shouldn't matter as metrics should be of type Counter, Marker, etc.
#[inline]
pub fn write(&self, value: Value) {
(self.inner)(value)
}
}
/// A function trait that opens a new metric capture scope.
pub trait Output: Send + Sync + 'static + OutputDyn {
/// The type of Scope returned byt this output.
type SCOPE: OutputScope;
/// Open a new scope from this output.
fn output(&self) -> Self::SCOPE;
}
/// A function trait that opens a new metric capture scope.
pub trait OutputDyn {
/// Open a new scope from this output.
fn output_dyn(&self) -> Rc<OutputScope + 'static>;
}
/// Blanket impl of dyn output trait
impl<T: Output + Send + Sync + 'static> OutputDyn for T {
fn output_dyn(&self) -> Rc<OutputScope + 'static> {
Rc::new(self.output())
}
}
/// Output metrics are not thread safe.
#[derive(Clone)]
pub struct OutputMetric {
inner: Rc<Fn(Value)>
}
/// Discard all metric values sent to it.
pub fn output_none() -> Void {
Void {}
}

View File

@ -1,6 +1,6 @@
//! PCG32 random number generation for fast sampling
//! Kept here for low dependency count.
// TODO use https://github.com/codahale/pcg instead?
use std::cell::RefCell;
use time;

View File

@ -1,7 +1,10 @@
//! Decouple metric definition from configuration with trait objects.
use core::{Name, AddPrefix, Kind, InputScope, InputMetric, NO_METRIC_OUTPUT, WithAttributes, Attributes, Flush};
use error;
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use core::Flush;
use core::input::{Kind, InputMetric, InputScope};
use core::void::VOID_INPUT;
use core::error;
use std::collections::{HashMap, BTreeMap};
use std::sync::{Arc, RwLock, Weak};
@ -118,7 +121,7 @@ impl InnerProxy {
}
let (up_target, up_nslen) = self.get_effective_target(namespace)
.unwrap_or_else(|| (NO_METRIC_OUTPUT.input_dyn(), 0));
.unwrap_or_else(|| (VOID_INPUT.input_dyn(), 0));
// update all affected metrics to next upper targeted namespace
for (name, metric) in self.metrics.range_mut(namespace..) {
@ -209,7 +212,7 @@ impl InputScope for Proxy {
let name2 = name.clone();
// not found, define new
let (target, target_namespace_length) = inner.get_effective_target(&name)
.unwrap_or_else(|| (NO_METRIC_OUTPUT.input_dyn(), 0));
.unwrap_or_else(|| (VOID_INPUT.input_dyn(), 0));
let metric_object = target.new_metric(name.clone(), kind);
let proxy = Arc::new(ProxyMetric {
name,
@ -239,10 +242,9 @@ impl WithAttributes for Proxy {
#[cfg(feature = "bench")]
mod bench {
use core::*;
use proxy::*;
use super::*;
use test;
use bucket::Bucket;
use aggregate::bucket::Bucket;
#[bench]
fn proxy_marker_to_aggregate(b: &mut test::Bencher) {

View File

@ -1,6 +1,6 @@
//! Task scheduling facilities.
use core::InputScope;
use core::input::InputScope;
use std::time::Duration;
use std::thread;

49
src/core/void.rs Executable file
View File

@ -0,0 +1,49 @@
use core::output::{Output, OutputScope, OutputMetric};
use core::component::Name;
use core::input::{Kind, InputDyn};
use core::Flush;
use std::sync::Arc;
lazy_static! {
/// The reference instance identifying an uninitialized metric config.
pub static ref VOID_INPUT: Arc<InputDyn + Send + Sync> = Arc::new(Void::metrics());
// /// The reference instance identifying an uninitialized metric scope.
// pub static ref NO_METRIC_SCOPE: Arc<InputScope + Send + Sync> = VOID_INPUT.input_dyn();
}
/// Discard metrics output.
#[derive(Clone)]
pub struct Void {}
/// Discard metrics output.
#[derive(Clone)]
pub struct VoidInput {}
/// Discard metrics output.
#[derive(Clone)]
pub struct VoidOutput {}
impl Void {
/// Void metrics builder.
pub fn metrics() -> Self {
Void {}
}
}
impl Output for Void {
type SCOPE = VoidOutput;
fn output(&self) -> VoidOutput {
VoidOutput {}
}
}
impl OutputScope for VoidOutput {
fn new_metric(&self, _name: Name, _kind: Kind) -> OutputMetric {
OutputMetric::new(|_value| {})
}
}
impl Flush for VoidOutput {
}

View File

@ -19,86 +19,47 @@ extern crate num;
#[cfg(feature="protobuf")]
extern crate protobuf;
// FIXME required only for random seed for sampling
// FIXME required only for pcg32 seed (for sampling)
extern crate time;
pub mod error;
pub use error::{Error, Result};
pub mod core;
pub use core::{Value, Kind, Marker, Timer, Counter, Gauge,
Flush, InputScope, Input, WithAttributes,
Name, AddPrefix, Sampled, Sampling, Buffering, Buffered,
OutputScope, Output, OutputMetric, UnsafeScope,
output_none, Void};
#[macro_use]
pub mod macros;
mod macros;
pub mod proxy;
pub use proxy::Proxy;
mod core;
pub use core::{Flush, Value};
pub use core::component::*;
pub use core::input::*;
pub use core::output::*;
pub use core::scheduler::*;
pub use core::out_lock::*;
pub use core::error::{Error, Result};
pub use core::clock::{TimeHandle, mock_clock_advance, mock_clock_reset};
pub use core::proxy::Proxy;
mod bucket;
pub use bucket::{Bucket, stats_summary, stats_all, stats_average};
mod output;
pub use output::text::*;
pub use output::graphite::*;
pub use output::statsd::*;
pub use output::map::*;
pub use output::logging::*;
mod text;
pub use text::{Text, TextScope};
mod aggregate;
pub use aggregate::bucket::*;
pub use aggregate::scores::*;
mod logging;
pub use logging::{Log, LogScope};
mod cache;
pub use cache::cache_in::CachedInput;
pub use cache::cache_out::CachedOutput;
mod pcg32;
mod multi;
pub use multi::multi_in::*;
pub use multi::multi_out::*;
mod scores;
pub use scores::ScoreType;
mod statds;
pub use statds::{Statsd, StatsdScope};
mod graphite;
pub use graphite::{Graphite, GraphiteScope};
#[cfg(feature="prometheus")]
mod prometheus;
#[cfg(feature="prometheus, proto")]
mod prometheus_proto;
#[cfg(feature="prometheus")]
pub use prometheus::{PrometheusScope, Prometheus};
mod map;
pub use map::StatsMap;
mod socket;
pub use socket::RetrySocket;
mod cache_in;
pub use cache_in::{InputScopeCache, InputCache, CachedInput};
mod cache_out;
pub use cache_out::{OutputScopeCache, OutputCache, CachedOutput};
mod multi_in;
pub use multi_in::{MultiInput, MultiInputScope};
mod multi_out;
pub use multi_out::{MultiOutput, MultiOutputScope};
mod queue_in;
pub use queue_in::{InputQueueScope, InputQueue, QueuedInput};
mod queue_out;
pub use queue_out::{OutputQueueScope, OutputQueue, QueuedOutput};
mod scheduler;
pub use scheduler::{set_schedule, CancelHandle, ScheduleFlush};
mod metrics;
pub use metrics::DIPSTICK_METRICS;
mod clock;
pub use clock::{TimeHandle, mock_clock_advance, mock_clock_reset};
mod queue;
pub use queue::queue_in::*;
pub use queue::queue_out::*;
// FIXME using * to prevent "use of deprecated" warnings. #[allow(dead_code)] doesnt work?
#[macro_use]
mod deprecated;
pub use deprecated::*;
//#[macro_use]
//mod deprecated;
//pub use deprecated::*;

View File

@ -120,8 +120,8 @@ macro_rules! __in_context {
#[cfg(test)]
mod test {
use core::*;
use proxy::Proxy;
use core::input::*;
use core::proxy::Proxy;
metrics!{TEST: Proxy = "test_prefix" => {
M1: Marker = "failed";

3
src/multi/mod.rs Executable file
View File

@ -0,0 +1,3 @@
pub mod multi_in;
pub mod multi_out;

View File

@ -1,7 +1,10 @@
//! Dispatch metrics to multiple sinks.
use core::{Input, InputScope, Name, AddPrefix, Kind, InputMetric, WithAttributes, Attributes, Flush, InputDyn};
use error;
use core::Flush;
use core::input::{Kind, Input, InputScope, InputMetric, InputDyn};
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use core::error;
use std::sync::Arc;
/// Wrap this output behind an asynchronous metrics dispatch queue.

View File

@ -1,7 +1,11 @@
//! Dispatch metrics to multiple sinks.
use core::{Output, OutputScope, Name, AddPrefix, Kind, OutputMetric, WithAttributes, Attributes, Flush, OutputDyn};
use error;
use core::Flush;
use core::component::{Attributes, WithAttributes, Name, AddPrefix};
use core::input::Kind;
use core::output::{Output, OutputMetric, OutputScope, OutputDyn};
use core::error;
use std::rc::Rc;
use std::sync::Arc;

View File

@ -1,8 +1,14 @@
//! Send metrics to a graphite server.
use core::*;
use error;
use metrics;
use core::component::{Buffered, Attributes, WithAttributes, Name, AddPrefix};
use core::{Flush, Value};
use core::input::Kind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
use queue::queue_out;
use cache::cache_out;
use output::socket::RetrySocket;
use std::net::ToSocketAddrs;
@ -11,7 +17,6 @@ use std::time::{SystemTime, UNIX_EPOCH};
use std::io::Write;
use std::fmt::Debug;
use socket::RetrySocket;
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
@ -155,9 +160,6 @@ impl WithAttributes for GraphiteScope {
impl Buffered for GraphiteScope {}
use queue_out;
use cache_out;
impl queue_out::QueuedOutput for Graphite {}
impl cache_out::CachedOutput for Graphite {}

View File

@ -1,10 +1,13 @@
use core::{Name, AddPrefix, Value, InputMetric, Kind, Input, InputScope, WithAttributes, Attributes,
Buffered, Flush};
use error;
use std::sync::{RwLock, Arc};
use text;
use std::io::Write;
use core::{Flush, Value};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::component::{Attributes, WithAttributes, Buffered, Name, AddPrefix};
use core::error;
use cache::cache_in;
use queue::queue_in;
use output::text;
use std::sync::{RwLock, Arc};
use std::io::Write;
use log;
/// Buffered metrics log output.
@ -61,9 +64,6 @@ impl WithAttributes for LogScope {
impl Buffered for LogScope {}
use queue_in;
use cache_in;
impl queue_in::QueuedInput for Log {}
impl cache_in::CachedInput for Log {}
@ -123,7 +123,7 @@ impl Drop for LogScope {
#[cfg(test)]
mod test {
use core::*;
use core::input::*;
#[test]
fn test_to_log() {

View File

@ -1,4 +1,8 @@
use core::{Value, OutputMetric, Kind, Name, OutputScope, Flush};
use core::{Flush, Value};
use core::input::Kind;
use core::component::Name;
use core::output::{OutputMetric, OutputScope};
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::BTreeMap;

17
src/output/mod.rs Executable file
View File

@ -0,0 +1,17 @@
pub mod map;
pub mod text;
pub mod logging;
pub mod socket;
pub mod graphite;
pub mod statsd;
#[cfg(feature="prometheus")]
pub mod prometheus;
#[cfg(feature="prometheus, proto")]
pub mod prometheus_proto;

View File

@ -5,8 +5,11 @@
//! - Serve metrics with basic HTTP server
//! - Print metrics to a buffer provided by an HTTP framework.
use core::*;
use error;
use core::{Flush, Value};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::component::{Attributes, WithAttributes, Buffered, Buffering, Name, AddPrefix};
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
use std::net::ToSocketAddrs;
use std::sync::Arc;

2533
src/output/prometheus_proto.rs Executable file

File diff suppressed because it is too large Load Diff

0
src/socket.rs → src/output/socket.rs Normal file → Executable file
View File

View File

@ -1,15 +1,17 @@
//! Send metrics to a statsd server.
use core::*;
use error;
use metrics;
use core::component::{Buffered, Attributes, Sampled, Sampling, WithAttributes, Name, AddPrefix};
use core::pcg32;
use core::{Flush, Value};
use core::input::Kind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
use cache::cache_out;
use queue::queue_out;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use pcg32;
use std::net::UdpSocket;
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
@ -43,9 +45,6 @@ impl Statsd {
impl Buffered for Statsd {}
impl Sampled for Statsd {}
use queue_out;
use cache_out;
impl queue_out::QueuedOutput for Statsd {}
impl cache_out::CachedOutput for Statsd {}

View File

@ -1,20 +1,28 @@
//! Standard stateless metric outputs.
// TODO parameterize templates
use core::{Name, AddPrefix, Value, Kind, OutputScope, WithAttributes, Attributes,
Buffered, OutputMetric, Output, Flush};
use error;
use core::{Flush, Value};
use core::input::Kind;
use core::component::{Attributes, WithAttributes, Buffered, Name, AddPrefix};
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
use cache::cache_out;
use queue::queue_out;
use std::sync::{RwLock, Arc};
use std::io::{Write, self};
use std::rc::Rc;
use std::cell::RefCell;
/// Join metric name parts into a friendly '.' separated String
pub fn format_name(name: &Name, _kind: Kind) -> Vec<String> {
let mut z = name.join(".");
z.push_str(" ");
vec![z]
}
/// Output template-formatted value
pub fn print_name_value_line(output: &mut impl Write, template: &[String], value: Value) -> error::Result<()> {
write!(output, "{}", template[0])?;
write!(output, "{}", value)?;
@ -30,9 +38,6 @@ pub struct Text<W: Write + Send + Sync + 'static> {
print_fn: Arc<Fn(&mut Vec<u8>, &[String], Value) -> error::Result<()> + Send + Sync>,
}
use queue_out;
use cache_out;
impl<W: Write + Send + Sync + 'static> queue_out::QueuedOutput for Text<W> {}
impl<W: Write + Send + Sync + 'static> cache_out::CachedOutput for Text<W> {}
@ -177,7 +182,8 @@ impl<W: Write + Send + Sync + 'static> Drop for TextScope<W> {
#[cfg(test)]
mod test {
use core::*;
use super::*;
use core::input::Kind;
use std::io;
#[test]

2
src/queue/mod.rs Executable file
View File

@ -0,0 +1,2 @@
pub mod queue_in;
pub mod queue_out;

View File

@ -1,11 +1,13 @@
//! Queue metrics for write on a separate thread,
//! Metrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::{InputScope, Value, InputMetric, Name, Kind, AddPrefix, Input,
WithAttributes, Attributes, Flush, InputDyn};
use error;
use metrics;
use cache_in::CachedInput;
use core::component::{Attributes, Name, WithAttributes, AddPrefix};
use core::input::{Kind, Input, InputScope, InputDyn, InputMetric};
use core::{Value, Flush};
use core::metrics;
use cache::cache_in::CachedInput;
use core::error;
use std::sync::Arc;
use std::sync::mpsc;
@ -85,7 +87,9 @@ impl Input for InputQueue {
/// This is only `pub` because `error` module needs to know about it.
/// Async commands should be of no concerns to applications.
pub enum InputQueueCmd {
/// Send metric write
Write(InputMetric, Value),
/// Send metric flush
Flush(Arc<InputScope + Send + Sync + 'static>),
}

View File

@ -1,17 +1,23 @@
//! Queue metrics for write on a separate thread,
//! RawMetrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::{Value, OutputMetric, Name, Kind, AddPrefix, Output, OutputDyn,
WithAttributes, Attributes, InputScope, Input, InputMetric, UnsafeScope, Flush};
use error;
use metrics;
//!
use core::component::{Attributes, Name, WithAttributes, AddPrefix};
use core::input::{Kind, Input, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output};
use core::{Value, Flush};
use core::metrics;
use cache::cache_in;
use core::error;
use std::rc::Rc;
use std::ops;
use std::fmt;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
use cache_in;
/// Wrap this raw output behind an asynchronous metrics dispatch queue.
pub trait QueuedOutput: Output + Sized {
/// Wrap this output with an asynchronous dispatch queue.
@ -86,7 +92,9 @@ impl Input for OutputQueue {
/// This is only `pub` because `error` module needs to know about it.
/// Async commands should be of no concerns to applications.
pub enum OutputQueueCmd {
/// Send metric write
Write(Arc<OutputMetric>, Value),
/// Send metric flush
Flush(Arc<UnsafeScope>),
}
@ -131,3 +139,35 @@ impl Flush for OutputQueueScope {
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
pub struct UnsafeScope(Rc<OutputScope + 'static> );
unsafe impl Send for UnsafeScope {}
unsafe impl Sync for UnsafeScope {}
impl UnsafeScope {
/// Wrap a dynamic RawScope to make it Send + Sync.
pub fn new(scope: Rc<OutputScope + 'static>) -> Self {
UnsafeScope(scope)
}
}
impl ops::Deref for UnsafeScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
impl fmt::Debug for OutputMetric {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Box<Fn(Value)>")
}
}
unsafe impl Send for OutputMetric {}
unsafe impl Sync for OutputMetric {}