mirror of https://github.com/fralalonde/dipstick
Merge pull request #66 from fralalonde/unique-metrics
Fix potential observer leak with unique metric IDs
This commit is contained in:
commit
58ef0797d2
|
@ -1,5 +1,12 @@
|
|||
# Latest changes + history
|
||||
|
||||
## version 0.7.5
|
||||
- Fix leak on observers when registering same metric twice.
|
||||
- Add `metric_id()` on `InputMetric`
|
||||
|
||||
## version 0.7.4
|
||||
- Reexport `ObserveWhen` to make it public
|
||||
|
||||
## version 0.7.3
|
||||
- Fixed / shushed a bunch of `clippy` warnings
|
||||
- Made `clippy` part of `make` checks
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "dipstick"
|
||||
version = "0.7.4"
|
||||
version = "0.7.5"
|
||||
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
|
||||
|
||||
description = """A fast, all-purpose metrics library decoupling instrumentation from reporting backends.
|
||||
|
|
|
@ -80,6 +80,11 @@ in the `[dependencies]` section:
|
|||
dipstick = "0.7.4"
|
||||
```
|
||||
|
||||
## External features
|
||||
|
||||
Configuring dipstick from a text file is possible using
|
||||
the [spirit-dipstick](https://crates.io/crates/spirit-dipstick) crate.
|
||||
|
||||
## Building
|
||||
When building the crate prior to PR or release, just run plain old `make`.
|
||||
This will in turn run `cargo` a few times to run tests, benchmarks, lints, etc.
|
||||
|
|
|
@ -28,18 +28,21 @@ fn main() {
|
|||
metrics.observe(uptime, |_| 6).on_flush();
|
||||
|
||||
// record number of threads in pool every second
|
||||
metrics
|
||||
let scheduled = metrics
|
||||
.observe(metrics.gauge("threads"), thread_count)
|
||||
.every(Duration::from_secs(1));
|
||||
|
||||
// "heartbeat" metric
|
||||
metrics
|
||||
let on_flush = metrics
|
||||
.observe(metrics.marker("heartbeat"), |_| 1)
|
||||
.on_flush();
|
||||
|
||||
loop {
|
||||
for _ in 0..1000 {
|
||||
std::thread::sleep(Duration::from_millis(40));
|
||||
}
|
||||
|
||||
on_flush.cancel();
|
||||
scheduled.cancel();
|
||||
}
|
||||
|
||||
/// Query number of running threads in this process using Linux's /proc filesystem.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use bucket::ScoreType::*;
|
||||
use bucket::{stats_summary, ScoreType};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::clock::TimeHandle;
|
||||
use core::error;
|
||||
use core::input::{InputKind, InputMetric, InputScope};
|
||||
|
@ -257,10 +257,10 @@ impl InputScope for AtomicBucket {
|
|||
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
|
||||
let scores = write_lock!(self.inner)
|
||||
.metrics
|
||||
.entry(self.prefix_append(name))
|
||||
.entry(self.prefix_append(name.clone()))
|
||||
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
|
||||
.clone();
|
||||
InputMetric::new(move |value, _labels| scores.update(value))
|
||||
InputMetric::new(MetricId::forge("bucket", name), move |value, _labels| scores.update(value))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::default::Default;
|
|||
use std::sync::Arc;
|
||||
|
||||
use core::name::{MetricName, NameParts};
|
||||
use core::scheduler::SCHEDULER;
|
||||
use core::scheduler::{SCHEDULER, Cancel};
|
||||
use std::fmt;
|
||||
use std::time::{Duration, Instant};
|
||||
use {CancelHandle, Flush, InputMetric, InputScope, MetricValue};
|
||||
|
@ -57,8 +57,18 @@ impl Default for Buffering {
|
|||
}
|
||||
}
|
||||
|
||||
type Shared<T> = Arc<RwLock<T>>;
|
||||
type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialOrd, PartialEq)]
|
||||
pub struct MetricId (String);
|
||||
|
||||
impl MetricId {
|
||||
pub fn forge(out_type: &str, name: MetricName) -> Self {
|
||||
let id: String = name.join("/");
|
||||
MetricId(format!("{}:{}", out_type, id))
|
||||
}
|
||||
}
|
||||
|
||||
pub type Shared<T> = Arc<RwLock<T>>;
|
||||
pub type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
|
||||
|
||||
/// Attributes common to metric components.
|
||||
/// Not all attributes used by all components.
|
||||
|
@ -67,7 +77,7 @@ pub struct Attributes {
|
|||
naming: NameParts,
|
||||
sampling: Sampling,
|
||||
buffering: Buffering,
|
||||
flush_listeners: Shared<Vec<Listener>>,
|
||||
flush_listeners: Shared<HashMap<MetricId, Listener>>,
|
||||
tasks: Shared<Vec<CancelHandle>>,
|
||||
}
|
||||
|
||||
|
@ -108,7 +118,7 @@ where
|
|||
{
|
||||
fn notify_flush_listeners(&self) {
|
||||
let now = Instant::now();
|
||||
for listener in read_lock!(self.get_attributes().flush_listeners).iter() {
|
||||
for listener in read_lock!(self.get_attributes().flush_listeners).values() {
|
||||
(listener)(now)
|
||||
}
|
||||
}
|
||||
|
@ -121,17 +131,31 @@ pub struct ObserveWhen<'a, T, F> {
|
|||
operation: Arc<F>,
|
||||
}
|
||||
|
||||
pub struct OnFlushCancel (Arc<Fn()>);
|
||||
|
||||
impl Cancel for OnFlushCancel {
|
||||
fn cancel(&self) {
|
||||
(self.0)()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T, F> ObserveWhen<'a, T, F>
|
||||
where
|
||||
F: Fn(Instant) -> MetricValue + Send + Sync + 'static,
|
||||
T: InputScope + WithAttributes + Send + Sync,
|
||||
{
|
||||
/// Observe the metric's value upon flushing the scope.
|
||||
pub fn on_flush(self) {
|
||||
pub fn on_flush(self) -> OnFlushCancel {
|
||||
let gauge = self.metric;
|
||||
let metric_id = gauge.metric_id().clone();
|
||||
let op = self.operation;
|
||||
write_lock!(self.target.get_attributes().flush_listeners)
|
||||
.push(Arc::new(move |now| gauge.write(op(now), Labels::default())));
|
||||
.insert(metric_id.clone(),
|
||||
Arc::new(move |now| gauge.write(op(now), Labels::default())));
|
||||
let flush_listeners = self.target.get_attributes().flush_listeners.clone();
|
||||
OnFlushCancel(Arc::new(move || {
|
||||
write_lock!(flush_listeners).remove(&metric_id);
|
||||
}))
|
||||
}
|
||||
|
||||
/// Observe the metric's value periodically.
|
||||
|
|
|
@ -2,6 +2,7 @@ use core::clock::TimeHandle;
|
|||
use core::label::Labels;
|
||||
use core::name::MetricName;
|
||||
use core::{Flush, MetricValue};
|
||||
use core::attributes::MetricId;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
@ -75,6 +76,7 @@ pub trait InputScope: Flush {
|
|||
/// A metric is actually a function that knows to write a metric value to a metric output.
|
||||
#[derive(Clone)]
|
||||
pub struct InputMetric {
|
||||
identifier: MetricId,
|
||||
inner: Arc<Fn(MetricValue, Labels) + Send + Sync>,
|
||||
}
|
||||
|
||||
|
@ -86,8 +88,9 @@ impl fmt::Debug for InputMetric {
|
|||
|
||||
impl InputMetric {
|
||||
/// Utility constructor
|
||||
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
|
||||
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(identifier: MetricId, metric: F) -> InputMetric {
|
||||
InputMetric {
|
||||
identifier,
|
||||
inner: Arc::new(metric),
|
||||
}
|
||||
}
|
||||
|
@ -97,6 +100,11 @@ impl InputMetric {
|
|||
pub fn write(&self, value: MetricValue, labels: Labels) {
|
||||
(self.inner)(value, labels)
|
||||
}
|
||||
|
||||
/// Returns the unique identifier of this metric.
|
||||
pub fn metric_id(&self) -> &MetricId {
|
||||
&self.identifier
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to differentiate between metric kinds in the backend.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//! This makes all outputs also immediately usable as inputs.
|
||||
//! The alternatives are queuing or thread local.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
@ -37,9 +37,9 @@ impl InputScope for LockingOutput {
|
|||
.inner
|
||||
.lock()
|
||||
.expect("LockingOutput")
|
||||
.new_metric(name, kind);
|
||||
.new_metric(name.clone(), kind);
|
||||
let mutex = self.inner.clone();
|
||||
InputMetric::new(move |value, labels| {
|
||||
InputMetric::new(MetricId::forge("locking", name), move |value, labels| {
|
||||
// lock when collecting values
|
||||
let _guard = mutex.lock().expect("LockingOutput");
|
||||
raw_metric.write(value, labels)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Decouple metric definition from configuration with trait objects.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{InputKind, InputMetric, InputScope};
|
||||
use core::name::{MetricName, NameParts};
|
||||
|
@ -145,7 +145,7 @@ impl InnerProxy {
|
|||
break;
|
||||
}
|
||||
|
||||
if let Some(mut metric) = metric.upgrade() {
|
||||
if let Some(metric) = metric.upgrade() {
|
||||
// check if metric targeted by _lower_ namespace
|
||||
if metric.target.borrow().1 > namespace.len() {
|
||||
continue;
|
||||
|
@ -254,7 +254,7 @@ impl InputScope for Proxy {
|
|||
proxy
|
||||
}
|
||||
});
|
||||
InputMetric::new(move |value, labels| proxy.target.borrow().0.write(value, labels))
|
||||
InputMetric::new(MetricId::forge("proxy", name), move |value, labels| proxy.target.borrow().0.write(value, labels))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,12 @@ use std::sync::{Arc, Condvar, Mutex};
|
|||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// A deferred, repeatable, background action that can be cancelled.
|
||||
pub trait Cancel {
|
||||
/// Cancel the action.
|
||||
fn cancel(&self);
|
||||
}
|
||||
|
||||
/// A handle to cancel a scheduled task if required.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CancelHandle(Arc<AtomicBool>);
|
||||
|
@ -19,16 +25,18 @@ impl CancelHandle {
|
|||
CancelHandle(Arc::new(AtomicBool::new(false)))
|
||||
}
|
||||
|
||||
fn is_cancelled(&self) -> bool {
|
||||
self.0.load(SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl Cancel for CancelHandle {
|
||||
/// Signals the task to stop.
|
||||
pub fn cancel(&self) {
|
||||
fn cancel(&self) {
|
||||
if self.0.swap(true, SeqCst) {
|
||||
warn!("Scheduled task was already cancelled.")
|
||||
}
|
||||
}
|
||||
|
||||
fn is_cancelled(&self) -> bool {
|
||||
self.0.load(SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
/// Enable background periodical publication of metrics
|
||||
|
|
|
@ -75,7 +75,7 @@ pub use core::label::{AppLabel, Labels, ThreadLabel};
|
|||
pub use core::locking::LockingOutput;
|
||||
pub use core::name::{MetricName, NameParts};
|
||||
pub use core::output::{Output, OutputDyn, OutputMetric, OutputScope};
|
||||
pub use core::scheduler::{CancelHandle, ScheduleFlush};
|
||||
pub use core::scheduler::{CancelHandle, ScheduleFlush, Cancel};
|
||||
pub use core::void::Void;
|
||||
pub use core::{Flush, MetricValue};
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Dispatch metrics to multiple sinks.
|
||||
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
@ -90,7 +90,7 @@ impl InputScope for MultiInputScope {
|
|||
.iter()
|
||||
.map(move |scope| scope.new_metric(name.clone(), kind))
|
||||
.collect();
|
||||
InputMetric::new(move |value, labels| {
|
||||
InputMetric::new(MetricId::forge("multi", name.clone()), move |value, labels| {
|
||||
for metric in &metrics {
|
||||
metric.write(value, labels.clone())
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ impl MultiOutputScope {
|
|||
|
||||
impl OutputScope for MultiOutputScope {
|
||||
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
|
||||
let name = &self.prefix_append(name);
|
||||
let name = self.prefix_append(name);
|
||||
let metrics: Vec<OutputMetric> = self
|
||||
.scopes
|
||||
.iter()
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use cache::cache_in;
|
||||
use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
@ -117,7 +117,7 @@ impl InputScope for LogScope {
|
|||
|
||||
if self.is_buffered() {
|
||||
// buffered
|
||||
InputMetric::new(move |value, labels| {
|
||||
InputMetric::new(MetricId::forge("log", name), move |value, labels| {
|
||||
let mut buffer = Vec::with_capacity(32);
|
||||
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
|
||||
Ok(()) => {
|
||||
|
@ -131,7 +131,7 @@ impl InputScope for LogScope {
|
|||
// unbuffered
|
||||
let level = self.log.level;
|
||||
let target = self.log.target.clone();
|
||||
InputMetric::new(move |value, labels| {
|
||||
InputMetric::new(MetricId::forge("log", name), move |value, labels| {
|
||||
let mut buffer = Vec::with_capacity(32);
|
||||
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
|
||||
Ok(()) => {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::input::InputKind;
|
||||
use core::input::{Input, InputMetric, InputScope};
|
||||
use core::name::MetricName;
|
||||
|
@ -58,9 +58,9 @@ impl InputScope for StatsMapScope {
|
|||
fn new_metric(&self, name: MetricName, _kind: InputKind) -> InputMetric {
|
||||
let name = self.prefix_append(name);
|
||||
let write_to = self.inner.clone();
|
||||
let name: String = name.join(".");
|
||||
InputMetric::new(move |value, _labels| {
|
||||
let _previous = write_to.write().expect("Lock").insert(name.clone(), value);
|
||||
let key: String = name.join(".");
|
||||
InputMetric::new(MetricId::forge("map", name), move |value, _labels| {
|
||||
let _previous = write_to.write().expect("Lock").insert(key.clone(), value);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! If queue size is exceeded, calling code reverts to blocking.
|
||||
|
||||
use cache::cache_in::CachedInput;
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use core::label::Labels;
|
||||
|
@ -185,9 +185,9 @@ impl WithAttributes for InputQueueScope {
|
|||
impl InputScope for InputQueueScope {
|
||||
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
|
||||
let name = self.prefix_append(name);
|
||||
let target_metric = self.target.new_metric(name, kind);
|
||||
let target_metric = self.target.new_metric(name.clone(), kind);
|
||||
let sender = self.sender.clone();
|
||||
InputMetric::new(move |value, mut labels| {
|
||||
InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
|
||||
labels.save_context();
|
||||
if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels))
|
||||
{
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! If queue size is exceeded, calling code reverts to blocking.
|
||||
|
||||
use cache::cache_in;
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId};
|
||||
use core::error;
|
||||
use core::input::{Input, InputKind, InputMetric, InputScope};
|
||||
use core::label::Labels;
|
||||
|
@ -174,9 +174,9 @@ impl WithAttributes for OutputQueueScope {
|
|||
impl InputScope for OutputQueueScope {
|
||||
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
|
||||
let name = self.prefix_append(name);
|
||||
let target_metric = Arc::new(self.target.new_metric(name, kind));
|
||||
let target_metric = Arc::new(self.target.new_metric(name.clone(), kind));
|
||||
let sender = self.sender.clone();
|
||||
InputMetric::new(move |value, mut labels| {
|
||||
InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
|
||||
labels.save_context();
|
||||
if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels))
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue