Observer on_flush PR fixes

This commit is contained in:
Francis Lalonde 2019-05-14 20:07:49 -04:00
parent d50e38308d
commit c88cc8f7f8
13 changed files with 66 additions and 34 deletions

View File

@ -28,18 +28,21 @@ fn main() {
metrics.observe(uptime, |_| 6).on_flush();
// record number of threads in pool every second
metrics
let scheduled = metrics
.observe(metrics.gauge("threads"), thread_count)
.every(Duration::from_secs(1));
// "heartbeat" metric
metrics
let on_flush = metrics
.observe(metrics.marker("heartbeat"), |_| 1)
.on_flush();
loop {
for _ in 0..1000 {
std::thread::sleep(Duration::from_millis(40));
}
on_flush.cancel();
scheduled.cancel();
}
/// Query number of running threads in this process using Linux's /proc filesystem.

View File

@ -2,7 +2,7 @@
use bucket::ScoreType::*;
use bucket::{stats_summary, ScoreType};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::clock::TimeHandle;
use core::error;
use core::input::{InputKind, InputMetric, InputScope};
@ -260,7 +260,7 @@ impl InputScope for AtomicBucket {
.entry(self.prefix_append(name.clone()))
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
.clone();
InputMetric::new(name.join("/"), move |value, _labels| scores.update(value))
InputMetric::new(MetricId::forge("bucket", name), move |value, _labels| scores.update(value))
}
}

View File

@ -3,7 +3,7 @@ use std::default::Default;
use std::sync::Arc;
use core::name::{MetricName, NameParts};
use core::scheduler::SCHEDULER;
use core::scheduler::{SCHEDULER, Cancel};
use std::fmt;
use std::time::{Duration, Instant};
use {CancelHandle, Flush, InputMetric, InputScope, MetricValue};
@ -57,7 +57,16 @@ impl Default for Buffering {
}
}
pub type MetricId = String;
#[derive(Clone, Debug, Hash, Eq, PartialOrd, PartialEq)]
pub struct MetricId (String);
impl MetricId {
pub fn forge(out_type: &str, name: MetricName) -> Self {
let id: String = name.join("/");
MetricId(format!("{}:{}", out_type, id))
}
}
pub type Shared<T> = Arc<RwLock<T>>;
pub type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
@ -122,19 +131,31 @@ pub struct ObserveWhen<'a, T, F> {
operation: Arc<F>,
}
pub struct OnFlushCancel (Arc<Fn()>);
impl Cancel for OnFlushCancel {
fn cancel(&self) {
(self.0)()
}
}
impl<'a, T, F> ObserveWhen<'a, T, F>
where
F: Fn(Instant) -> MetricValue + Send + Sync + 'static,
T: InputScope + WithAttributes + Send + Sync,
{
/// Observe the metric's value upon flushing the scope.
pub fn on_flush(self) {
pub fn on_flush(self) -> OnFlushCancel {
let gauge = self.metric;
let metric_id = gauge.metric_id().clone();
let op = self.operation;
let mut listeners = write_lock!(self.target.get_attributes().flush_listeners);
if !listeners.contains_key(gauge.metric_id()) {
listeners.insert(gauge.metric_id().clone(), Arc::new(move |now| gauge.write(op(now), Labels::default())));
}
write_lock!(self.target.get_attributes().flush_listeners)
.insert(metric_id.clone(),
Arc::new(move |now| gauge.write(op(now), Labels::default())));
let flush_listeners = self.target.get_attributes().flush_listeners.clone();
OnFlushCancel(Arc::new(move || {
write_lock!(flush_listeners).remove(&metric_id);
}))
}
/// Observe the metric's value periodically.

View File

@ -2,7 +2,7 @@
//! This makes all outputs also immediately usable as inputs.
//! The alternatives are queuing or thread local.
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{Input, InputKind, InputMetric, InputScope};
use core::name::MetricName;
@ -39,7 +39,7 @@ impl InputScope for LockingOutput {
.expect("LockingOutput")
.new_metric(name.clone(), kind);
let mutex = self.inner.clone();
InputMetric::new(name.join("/"), move |value, labels| {
InputMetric::new(MetricId::forge("locking", name), move |value, labels| {
// lock when collecting values
let _guard = mutex.lock().expect("LockingOutput");
raw_metric.write(value, labels)

View File

@ -1,6 +1,6 @@
//! Decouple metric definition from configuration with trait objects.
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{InputKind, InputMetric, InputScope};
use core::name::{MetricName, NameParts};
@ -254,7 +254,7 @@ impl InputScope for Proxy {
proxy
}
});
InputMetric::new(name.join("/"), move |value, labels| proxy.target.borrow().0.write(value, labels))
InputMetric::new(MetricId::forge("proxy", name), move |value, labels| proxy.target.borrow().0.write(value, labels))
}
}

View File

@ -10,6 +10,12 @@ use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
/// A deferred, repeatable, background action that can be cancelled.
pub trait Cancel {
/// Cancel the action.
fn cancel(&self);
}
/// A handle to cancel a scheduled task if required.
#[derive(Debug, Clone)]
pub struct CancelHandle(Arc<AtomicBool>);
@ -19,16 +25,18 @@ impl CancelHandle {
CancelHandle(Arc::new(AtomicBool::new(false)))
}
fn is_cancelled(&self) -> bool {
self.0.load(SeqCst)
}
}
impl Cancel for CancelHandle {
/// Signals the task to stop.
pub fn cancel(&self) {
fn cancel(&self) {
if self.0.swap(true, SeqCst) {
warn!("Scheduled task was already cancelled.")
}
}
fn is_cancelled(&self) -> bool {
self.0.load(SeqCst)
}
}
/// Enable background periodical publication of metrics

View File

@ -75,7 +75,7 @@ pub use core::label::{AppLabel, Labels, ThreadLabel};
pub use core::locking::LockingOutput;
pub use core::name::{MetricName, NameParts};
pub use core::output::{Output, OutputDyn, OutputMetric, OutputScope};
pub use core::scheduler::{CancelHandle, ScheduleFlush};
pub use core::scheduler::{CancelHandle, ScheduleFlush, Cancel};
pub use core::void::Void;
pub use core::{Flush, MetricValue};

View File

@ -1,6 +1,6 @@
//! Dispatch metrics to multiple sinks.
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
use core::name::MetricName;
@ -90,7 +90,7 @@ impl InputScope for MultiInputScope {
.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();
InputMetric::new(name.join("/"), move |value, labels| {
InputMetric::new(MetricId::forge("multi", name.clone()), move |value, labels| {
for metric in &metrics {
metric.write(value, labels.clone())
}

View File

@ -86,7 +86,7 @@ impl MultiOutputScope {
impl OutputScope for MultiOutputScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = &self.prefix_append(name);
let name = self.prefix_append(name);
let metrics: Vec<OutputMetric> = self
.scopes
.iter()

View File

@ -1,5 +1,5 @@
use cache::cache_in;
use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{Input, InputKind, InputMetric, InputScope};
use core::name::MetricName;
@ -117,7 +117,7 @@ impl InputScope for LogScope {
if self.is_buffered() {
// buffered
InputMetric::new(name.join("/"), move |value, labels| {
InputMetric::new(MetricId::forge("log", name), move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {
@ -131,7 +131,7 @@ impl InputScope for LogScope {
// unbuffered
let level = self.log.level;
let target = self.log.target.clone();
InputMetric::new(name.join("/"), move |value, labels| {
InputMetric::new(MetricId::forge("log", name), move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {

View File

@ -1,4 +1,4 @@
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::input::InputKind;
use core::input::{Input, InputMetric, InputScope};
use core::name::MetricName;
@ -59,7 +59,7 @@ impl InputScope for StatsMapScope {
let name = self.prefix_append(name);
let write_to = self.inner.clone();
let key: String = name.join(".");
InputMetric::new(name.join("/"), move |value, _labels| {
InputMetric::new(MetricId::forge("map", name), move |value, _labels| {
let _previous = write_to.write().expect("Lock").insert(key.clone(), value);
})
}

View File

@ -3,7 +3,7 @@
//! If queue size is exceeded, calling code reverts to blocking.
use cache::cache_in::CachedInput;
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
use core::label::Labels;
@ -187,7 +187,7 @@ impl InputScope for InputQueueScope {
let name = self.prefix_append(name);
let target_metric = self.target.new_metric(name.clone(), kind);
let sender = self.sender.clone();
InputMetric::new(name.join("/"), move |value, mut labels| {
InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels))
{

View File

@ -3,7 +3,7 @@
//! If queue size is exceeded, calling code reverts to blocking.
use cache::cache_in;
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
use core::error;
use core::input::{Input, InputKind, InputMetric, InputScope};
use core::label::Labels;
@ -176,7 +176,7 @@ impl InputScope for OutputQueueScope {
let name = self.prefix_append(name);
let target_metric = Arc::new(self.target.new_metric(name.clone(), kind));
let sender = self.sender.clone();
InputMetric::new(name.join("/"), move |value, mut labels| {
InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels))
{