mirror of https://github.com/fralalonde/dipstick
Give each flush listener its own id
This commit is contained in:
parent
58ef0797d2
commit
4182504f76
|
@ -2,7 +2,7 @@
|
|||
|
||||
use bucket::ScoreType::*;
|
||||
use bucket::{stats_summary, ScoreType};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::clock::TimeHandle;
|
||||
use core::error;
|
||||
use core::input::{InputKind, InputMetric, InputScope};
|
||||
|
@ -260,7 +260,9 @@ impl InputScope for AtomicBucket {
|
|||
.entry(self.prefix_append(name.clone()))
|
||||
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
|
||||
.clone();
|
||||
InputMetric::new(MetricId::forge("bucket", name), move |value, _labels| scores.update(value))
|
||||
InputMetric::new(MetricId::forge("bucket", name), move |value, _labels| {
|
||||
scores.update(value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
use std::collections::HashMap;
|
||||
use std::default::Default;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use core::name::{MetricName, NameParts};
|
||||
use core::scheduler::{SCHEDULER, Cancel};
|
||||
use core::scheduler::{Cancel, SCHEDULER};
|
||||
use std::fmt;
|
||||
use std::time::{Duration, Instant};
|
||||
use {CancelHandle, Flush, InputMetric, InputScope, MetricValue};
|
||||
|
@ -58,7 +60,7 @@ impl Default for Buffering {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialOrd, PartialEq)]
|
||||
pub struct MetricId (String);
|
||||
pub struct MetricId(String);
|
||||
|
||||
impl MetricId {
|
||||
pub fn forge(out_type: &str, name: MetricName) -> Self {
|
||||
|
@ -68,7 +70,11 @@ impl MetricId {
|
|||
}
|
||||
|
||||
pub type Shared<T> = Arc<RwLock<T>>;
|
||||
pub type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
|
||||
|
||||
pub struct Listener {
|
||||
listener_id: usize,
|
||||
listener_fn: Arc<Fn(Instant) -> () + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
/// Attributes common to metric components.
|
||||
/// Not all attributes used by all components.
|
||||
|
@ -119,7 +125,7 @@ where
|
|||
fn notify_flush_listeners(&self) {
|
||||
let now = Instant::now();
|
||||
for listener in read_lock!(self.get_attributes().flush_listeners).values() {
|
||||
(listener)(now)
|
||||
(listener.listener_fn)(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,7 +137,9 @@ pub struct ObserveWhen<'a, T, F> {
|
|||
operation: Arc<F>,
|
||||
}
|
||||
|
||||
pub struct OnFlushCancel (Arc<Fn()>);
|
||||
const ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub struct OnFlushCancel(Arc<Fn()>);
|
||||
|
||||
impl Cancel for OnFlushCancel {
|
||||
fn cancel(&self) {
|
||||
|
@ -149,12 +157,25 @@ where
|
|||
let gauge = self.metric;
|
||||
let metric_id = gauge.metric_id().clone();
|
||||
let op = self.operation;
|
||||
write_lock!(self.target.get_attributes().flush_listeners)
|
||||
.insert(metric_id.clone(),
|
||||
Arc::new(move |now| gauge.write(op(now), Labels::default())));
|
||||
let listener_id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
write_lock!(self.target.get_attributes().flush_listeners).insert(
|
||||
metric_id.clone(),
|
||||
Listener {
|
||||
listener_id,
|
||||
listener_fn: Arc::new(move |now| gauge.write(op(now), Labels::default())),
|
||||
},
|
||||
);
|
||||
|
||||
let flush_listeners = self.target.get_attributes().flush_listeners.clone();
|
||||
OnFlushCancel(Arc::new(move || {
|
||||
write_lock!(flush_listeners).remove(&metric_id);
|
||||
let mut listeners = write_lock!(flush_listeners);
|
||||
let installed_listener_id = listeners.get(&metric_id).map(|v| v.listener_id);
|
||||
if let Some(id) = installed_listener_id {
|
||||
if id == listener_id {
|
||||
listeners.remove(&metric_id);
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use core::attributes::MetricId;
|
||||
use core::clock::TimeHandle;
|
||||
use core::label::Labels;
|
||||
use core::name::MetricName;
|
||||
use core::{Flush, MetricValue};
|
||||
use core::attributes::MetricId;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
@ -88,7 +88,10 @@ impl fmt::Debug for InputMetric {
|
|||
|
||||
impl InputMetric {
|
||||
/// Utility constructor
|
||||
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(identifier: MetricId, metric: F) -> InputMetric {
|
||||
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(
|
||||
identifier: MetricId,
|
||||
metric: F,
|
||||
) -> InputMetric {
|
||||
InputMetric {
|
||||
identifier,
|
||||
inner: Arc::new(metric),
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//! This makes all outputs also immediately usable as inputs.
|
||||
//! The alternatives are queuing or thread local.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Decouple metric definition from configuration with trait objects.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{InputKind, InputMetric, InputScope};
|
||||
use core::name::{MetricName, NameParts};
|
||||
|
@ -254,7 +254,9 @@ impl InputScope for Proxy {
|
|||
proxy
|
||||
}
|
||||
});
|
||||
InputMetric::new(MetricId::forge("proxy", name), move |value, labels| proxy.target.borrow().0.write(value, labels))
|
||||
InputMetric::new(MetricId::forge("proxy", name), move |value, labels| {
|
||||
proxy.target.borrow().0.write(value, labels)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ pub use core::label::{AppLabel, Labels, ThreadLabel};
|
|||
pub use core::locking::LockingOutput;
|
||||
pub use core::name::{MetricName, NameParts};
|
||||
pub use core::output::{Output, OutputDyn, OutputMetric, OutputScope};
|
||||
pub use core::scheduler::{CancelHandle, ScheduleFlush, Cancel};
|
||||
pub use core::scheduler::{Cancel, CancelHandle, ScheduleFlush};
|
||||
pub use core::void::Void;
|
||||
pub use core::{Flush, MetricValue};
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Dispatch metrics to multiple sinks.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
@ -90,11 +90,14 @@ impl InputScope for MultiInputScope {
|
|||
.iter()
|
||||
.map(move |scope| scope.new_metric(name.clone(), kind))
|
||||
.collect();
|
||||
InputMetric::new(MetricId::forge("multi", name.clone()), move |value, labels| {
|
||||
for metric in &metrics {
|
||||
metric.write(value, labels.clone())
|
||||
}
|
||||
})
|
||||
InputMetric::new(
|
||||
MetricId::forge("multi", name.clone()),
|
||||
move |value, labels| {
|
||||
for metric in &metrics {
|
||||
metric.write(value, labels.clone())
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use cache::cache_in;
|
||||
use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, Buffered, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::input::InputKind;
|
||||
use core::input::{Input, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! If queue size is exceeded, calling code reverts to blocking.
|
||||
|
||||
use cache::cache_in::CachedInput;
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use core::label::Labels;
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! If queue size is exceeded, calling code reverts to blocking.
|
||||
|
||||
use cache::cache_in;
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::label::Labels;
|
||||
|
|
Loading…
Reference in New Issue