Retain last value to fill any blanks upon next flushes

This commit is contained in:
Francis Lalonde 2019-04-11 23:08:20 -04:00
parent 7bb718c1b4
commit 601674c4cd
4 changed files with 159 additions and 0 deletions

143
src/cache/gapless_in.rs vendored Executable file
View File

@ -0,0 +1,143 @@
//! Metric input scope caching.
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::error;
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
use core::name::MetricName;
use core::Flush;
use std::sync::Arc;
#[cfg(not(feature = "parking_lot"))]
use std::sync::RwLock;
#[cfg(feature = "parking_lot")]
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicIsize};
use std::sync::atomic::Ordering::Relaxed;
pub trait Gapless: Input + Send + Sync + 'static + Sized {
fn gapless(self) -> GaplessInput {
GaplessInput::wrap(self)
}
}
/// Output wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct GaplessInput {
attributes: Attributes,
target: Arc<InputDyn + Send + Sync + 'static>,
}
impl GaplessInput {
/// Wrap scopes with an asynchronous metric write & flush dispatcher.
fn wrap<OUT: Input + Send + Sync + 'static>(target: OUT) -> GaplessInput {
GaplessInput {
attributes: Attributes::default(),
target: Arc::new(target),
}
}
}
impl WithAttributes for GaplessInput {
fn get_attributes(&self) -> &Attributes {
&self.attributes
}
fn mut_attributes(&mut self) -> &mut Attributes {
&mut self.attributes
}
}
impl Input for GaplessInput {
type SCOPE = GaplessInputScope;
fn metrics(&self) -> Self::SCOPE {
let target = self.target.input_dyn();
GaplessInputScope {
attributes: self.attributes.clone(),
target,
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
}
struct LastValueMetric {
metric: InputMetric,
touched: AtomicBool,
last_value: AtomicIsize,
}
/// Input wrapper caching frequently defined metrics
#[derive(Clone)]
pub struct GaplessInputScope {
attributes: Attributes,
target: Arc<InputScope + Send + Sync + 'static>,
cache: Arc<RwLock<HashMap<MetricName, Arc<LastValueMetric>>>>,
}
impl WithAttributes for GaplessInputScope {
fn get_attributes(&self) -> &Attributes {
&self.attributes
}
fn mut_attributes(&mut self) -> &mut Attributes {
&mut self.attributes
}
}
impl InputScope for GaplessInputScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let lookup = { write_lock!(self.cache).get(&name).cloned() };
let last_val_metric: Arc<LastValueMetric> = lookup.unwrap_or_else(|| {
let new_metric = Arc::new(LastValueMetric{
metric: self.target.new_metric(name.clone(), kind),
touched: AtomicBool::new(false),
last_value: AtomicIsize::new(0),
});
// FIXME (perf) having to take another write lock for a cache miss
write_lock!(self.cache).insert(name, new_metric.clone());
new_metric
});
InputMetric::new(move |value, labels| {
last_val_metric.last_value.store(value, Relaxed);
last_val_metric.touched.store(true, Relaxed);
last_val_metric.metric.write(value, labels)
})
}
}
impl Flush for GaplessInputScope {
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
let cache = read_lock!(self.cache);
for last_val in cache.values() {
if !last_val.touched.swap(false, Relaxed) {
last_val.metric.write(last_val.last_value.load(Relaxed), labels!())
}
}
self.target.flush()
}
}
//#[cfg(test)]
//pub mod test {
// use super::*;
// use std::sync::atomic::AtomicUsize;
// use output::map;
//
// #[test]
// fn fill_blanks() {
// let map = map::StatsMap::default();
// let metrics = map.gapless().metrics();
//
// let counter = metrics.counter("count");
// assert_eq!(None, map.as_map().get("count"));
// counter.count(1);
//
// assert_eq!(3, trig1a.load(SeqCst));
// }
//
//}

1
src/cache/mod.rs vendored
View File

@ -1,3 +1,4 @@
pub mod cache_in;
pub mod gapless_in;
pub mod cache_out;
pub mod lru_cache;

View File

@ -4,6 +4,7 @@ use core::error;
use core::input::{Input, InputKind, InputMetric, InputScope};
use core::name::MetricName;
use core::Flush;
use cache::gapless_in;
use output::format::{Formatting, LineFormat, SimpleFormat};
use queue::queue_in;
@ -109,6 +110,8 @@ impl Buffered for LogScope {}
impl queue_in::QueuedInput for Log {}
impl cache_in::CachedInput for Log {}
impl gapless_in::Gapless for Log {}
impl InputScope for LogScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);

View File

@ -3,6 +3,7 @@ use core::input::InputKind;
use core::input::{Input, InputMetric, InputScope};
use core::name::MetricName;
use core::{Flush, MetricValue};
use cache::gapless_in;
use std::collections::BTreeMap;
use std::error::Error;
@ -17,6 +18,14 @@ pub struct StatsMap {
attributes: Attributes,
}
impl StatsMap {
/// Create a new StatMap.
pub fn new() -> Self {
Self::default()
}
}
impl WithAttributes for StatsMap {
fn get_attributes(&self) -> &Attributes {
&self.attributes
@ -26,6 +35,8 @@ impl WithAttributes for StatsMap {
}
}
impl gapless_in::Gapless for StatsMap {}
impl Input for StatsMap {
type SCOPE = StatsMapScope;
@ -96,4 +107,5 @@ impl StatsMapScope {
pub fn into_map(self) -> BTreeMap<String, MetricValue> {
self.into()
}
}