diff --git a/CHANGES.md b/CHANGES.md index c36181d..540eaa4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Latest changes + history +## version 0.7.5 +- Fix leak on observers when registering same metric twice. +- Add `metric_id()` on `InputMetric` + +## version 0.7.4 +- Reexport `ObserveWhen` to make it public + ## version 0.7.3 - Fixed / shushed a bunch of `clippy` warnings - Made `clippy` part of `make` checks diff --git a/Cargo.toml b/Cargo.toml index d1c5623..f76a513 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dipstick" -version = "0.7.4" +version = "0.7.5" authors = ["Francis Lalonde "] description = """A fast, all-purpose metrics library decoupling instrumentation from reporting backends. diff --git a/README.md b/README.md index 1b4a1a2..8a6634e 100755 --- a/README.md +++ b/README.md @@ -80,6 +80,11 @@ in the `[dependencies]` section: dipstick = "0.7.4" ``` +## External features + +Configuring dipstick from a text file is possible using +the [spirit-dipstick](https://crates.io/crates/spirit-dipstick) crate. + ## Building When building the crate prior to PR or release, just run plain old `make`. This will in turn run `cargo` a few times to run tests, benchmarks, lints, etc. diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index 3779707..d5377c6 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -257,10 +257,10 @@ impl InputScope for AtomicBucket { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let scores = write_lock!(self.inner) .metrics - .entry(self.prefix_append(name)) + .entry(self.prefix_append(name.clone())) .or_insert_with(|| Arc::new(AtomicScores::new(kind))) .clone(); - InputMetric::new(move |value, _labels| scores.update(value)) + InputMetric::new(name.join("/"), move |value, _labels| scores.update(value)) } } diff --git a/src/core/attributes.rs b/src/core/attributes.rs index f9467ea..a261c22 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -57,8 +57,9 @@ impl Default for Buffering { } } -type Shared = Arc>; -type Listener = Arc () + Send + Sync + 'static>; +pub type MetricId = String; +pub type Shared = Arc>; +pub type Listener = Arc () + Send + Sync + 'static>; /// Attributes common to metric components. /// Not all attributes used by all components. @@ -67,7 +68,7 @@ pub struct Attributes { naming: NameParts, sampling: Sampling, buffering: Buffering, - flush_listeners: Shared>, + flush_listeners: Shared>, tasks: Shared>, } @@ -108,7 +109,7 @@ where { fn notify_flush_listeners(&self) { let now = Instant::now(); - for listener in read_lock!(self.get_attributes().flush_listeners).iter() { + for listener in read_lock!(self.get_attributes().flush_listeners).values() { (listener)(now) } } @@ -130,8 +131,10 @@ where pub fn on_flush(self) { let gauge = self.metric; let op = self.operation; - write_lock!(self.target.get_attributes().flush_listeners) - .push(Arc::new(move |now| gauge.write(op(now), Labels::default()))); + let mut listeners = write_lock!(self.target.get_attributes().flush_listeners); + if !listeners.contains_key(gauge.metric_id()) { + listeners.insert(gauge.metric_id().clone(), Arc::new(move |now| gauge.write(op(now), Labels::default()))); + } } /// Observe the metric's value periodically. diff --git a/src/core/input.rs b/src/core/input.rs index 734c1a9..05e09d8 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -2,6 +2,7 @@ use core::clock::TimeHandle; use core::label::Labels; use core::name::MetricName; use core::{Flush, MetricValue}; +use core::attributes::MetricId; use std::fmt; use std::sync::Arc; @@ -75,6 +76,7 @@ pub trait InputScope: Flush { /// A metric is actually a function that knows to write a metric value to a metric output. #[derive(Clone)] pub struct InputMetric { + identifier: MetricId, inner: Arc, } @@ -86,8 +88,9 @@ impl fmt::Debug for InputMetric { impl InputMetric { /// Utility constructor - pub fn new(metric: F) -> InputMetric { + pub fn new(identifier: MetricId, metric: F) -> InputMetric { InputMetric { + identifier, inner: Arc::new(metric), } } @@ -97,6 +100,11 @@ impl InputMetric { pub fn write(&self, value: MetricValue, labels: Labels) { (self.inner)(value, labels) } + + /// Returns the unique identifier of this metric. + pub fn metric_id(&self) -> &MetricId { + &self.identifier + } } /// Used to differentiate between metric kinds in the backend. diff --git a/src/core/locking.rs b/src/core/locking.rs index b5b1b9d..634bd80 100755 --- a/src/core/locking.rs +++ b/src/core/locking.rs @@ -37,9 +37,9 @@ impl InputScope for LockingOutput { .inner .lock() .expect("LockingOutput") - .new_metric(name, kind); + .new_metric(name.clone(), kind); let mutex = self.inner.clone(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { // lock when collecting values let _guard = mutex.lock().expect("LockingOutput"); raw_metric.write(value, labels) diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 96523a0..9142ec4 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -145,7 +145,7 @@ impl InnerProxy { break; } - if let Some(mut metric) = metric.upgrade() { + if let Some(metric) = metric.upgrade() { // check if metric targeted by _lower_ namespace if metric.target.borrow().1 > namespace.len() { continue; @@ -254,7 +254,7 @@ impl InputScope for Proxy { proxy } }); - InputMetric::new(move |value, labels| proxy.target.borrow().0.write(value, labels)) + InputMetric::new(name.join("/"), move |value, labels| proxy.target.borrow().0.write(value, labels)) } } diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index 7a10c49..728c141 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -90,7 +90,7 @@ impl InputScope for MultiInputScope { .iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { for metric in &metrics { metric.write(value, labels.clone()) } diff --git a/src/output/log.rs b/src/output/log.rs index be30e9d..5b205a2 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -117,7 +117,7 @@ impl InputScope for LogScope { if self.is_buffered() { // buffered - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { @@ -131,7 +131,7 @@ impl InputScope for LogScope { // unbuffered let level = self.log.level; let target = self.log.target.clone(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { diff --git a/src/output/map.rs b/src/output/map.rs index e43b8b9..cf8be99 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -58,9 +58,9 @@ impl InputScope for StatsMapScope { fn new_metric(&self, name: MetricName, _kind: InputKind) -> InputMetric { let name = self.prefix_append(name); let write_to = self.inner.clone(); - let name: String = name.join("."); - InputMetric::new(move |value, _labels| { - let _previous = write_to.write().expect("Lock").insert(name.clone(), value); + let key: String = name.join("."); + InputMetric::new(name.join("/"), move |value, _labels| { + let _previous = write_to.write().expect("Lock").insert(key.clone(), value); }) } } diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 79a6718..8b332fb 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -185,9 +185,9 @@ impl WithAttributes for InputQueueScope { impl InputScope for InputQueueScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - let target_metric = self.target.new_metric(name, kind); + let target_metric = self.target.new_metric(name.clone(), kind); let sender = self.sender.clone(); - InputMetric::new(move |value, mut labels| { + InputMetric::new(name.join("/"), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) { diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index 1628ec1..8a276db 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -174,9 +174,9 @@ impl WithAttributes for OutputQueueScope { impl InputScope for OutputQueueScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - let target_metric = Arc::new(self.target.new_metric(name, kind)); + let target_metric = Arc::new(self.target.new_metric(name.clone(), kind)); let sender = self.sender.clone(); - InputMetric::new(move |value, mut labels| { + InputMetric::new(name.join("/"), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) {