Fix potential observer leak with unique metric IDs

This commit is contained in:
Francis Lalonde 2019-05-10 15:16:21 -04:00
parent ac19241a58
commit d50e38308d
13 changed files with 47 additions and 24 deletions

View File

@ -1,5 +1,12 @@
# Latest changes + history
## version 0.7.5
- Fix leak on observers when registering same metric twice.
- Add `metric_id()` on `InputMetric`
## version 0.7.4
- Reexport `ObserveWhen` to make it public
## version 0.7.3
- Fixed / shushed a bunch of `clippy` warnings
- Made `clippy` part of `make` checks

View File

@ -1,6 +1,6 @@
[package]
name = "dipstick"
version = "0.7.4"
version = "0.7.5"
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
description = """A fast, all-purpose metrics library decoupling instrumentation from reporting backends.

View File

@ -80,6 +80,11 @@ in the `[dependencies]` section:
dipstick = "0.7.4"
```
## External features
Configuring dipstick from a text file is possible using
the [spirit-dipstick](https://crates.io/crates/spirit-dipstick) crate.
## Building
When building the crate prior to PR or release, just run plain old `make`.
This will in turn run `cargo` a few times to run tests, benchmarks, lints, etc.

View File

@ -257,10 +257,10 @@ impl InputScope for AtomicBucket {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let scores = write_lock!(self.inner)
.metrics
.entry(self.prefix_append(name))
.entry(self.prefix_append(name.clone()))
.or_insert_with(|| Arc::new(AtomicScores::new(kind)))
.clone();
InputMetric::new(move |value, _labels| scores.update(value))
InputMetric::new(name.join("/"), move |value, _labels| scores.update(value))
}
}

View File

@ -57,8 +57,9 @@ impl Default for Buffering {
}
}
type Shared<T> = Arc<RwLock<T>>;
type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
pub type MetricId = String;
pub type Shared<T> = Arc<RwLock<T>>;
pub type Listener = Arc<Fn(Instant) -> () + Send + Sync + 'static>;
/// Attributes common to metric components.
/// Not all attributes used by all components.
@ -67,7 +68,7 @@ pub struct Attributes {
naming: NameParts,
sampling: Sampling,
buffering: Buffering,
flush_listeners: Shared<Vec<Listener>>,
flush_listeners: Shared<HashMap<MetricId, Listener>>,
tasks: Shared<Vec<CancelHandle>>,
}
@ -108,7 +109,7 @@ where
{
fn notify_flush_listeners(&self) {
let now = Instant::now();
for listener in read_lock!(self.get_attributes().flush_listeners).iter() {
for listener in read_lock!(self.get_attributes().flush_listeners).values() {
(listener)(now)
}
}
@ -130,8 +131,10 @@ where
pub fn on_flush(self) {
let gauge = self.metric;
let op = self.operation;
write_lock!(self.target.get_attributes().flush_listeners)
.push(Arc::new(move |now| gauge.write(op(now), Labels::default())));
let mut listeners = write_lock!(self.target.get_attributes().flush_listeners);
if !listeners.contains_key(gauge.metric_id()) {
listeners.insert(gauge.metric_id().clone(), Arc::new(move |now| gauge.write(op(now), Labels::default())));
}
}
/// Observe the metric's value periodically.

View File

@ -2,6 +2,7 @@ use core::clock::TimeHandle;
use core::label::Labels;
use core::name::MetricName;
use core::{Flush, MetricValue};
use core::attributes::MetricId;
use std::fmt;
use std::sync::Arc;
@ -75,6 +76,7 @@ pub trait InputScope: Flush {
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
identifier: MetricId,
inner: Arc<Fn(MetricValue, Labels) + Send + Sync>,
}
@ -86,8 +88,9 @@ impl fmt::Debug for InputMetric {
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
pub fn new<F: Fn(MetricValue, Labels) + Send + Sync + 'static>(identifier: MetricId, metric: F) -> InputMetric {
InputMetric {
identifier,
inner: Arc::new(metric),
}
}
@ -97,6 +100,11 @@ impl InputMetric {
pub fn write(&self, value: MetricValue, labels: Labels) {
(self.inner)(value, labels)
}
/// Returns the unique identifier of this metric.
pub fn metric_id(&self) -> &MetricId {
&self.identifier
}
}
/// Used to differentiate between metric kinds in the backend.

View File

@ -37,9 +37,9 @@ impl InputScope for LockingOutput {
.inner
.lock()
.expect("LockingOutput")
.new_metric(name, kind);
.new_metric(name.clone(), kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {
InputMetric::new(name.join("/"), move |value, labels| {
// lock when collecting values
let _guard = mutex.lock().expect("LockingOutput");
raw_metric.write(value, labels)

View File

@ -145,7 +145,7 @@ impl InnerProxy {
break;
}
if let Some(mut metric) = metric.upgrade() {
if let Some(metric) = metric.upgrade() {
// check if metric targeted by _lower_ namespace
if metric.target.borrow().1 > namespace.len() {
continue;
@ -254,7 +254,7 @@ impl InputScope for Proxy {
proxy
}
});
InputMetric::new(move |value, labels| proxy.target.borrow().0.write(value, labels))
InputMetric::new(name.join("/"), move |value, labels| proxy.target.borrow().0.write(value, labels))
}
}

View File

@ -90,7 +90,7 @@ impl InputScope for MultiInputScope {
.iter()
.map(move |scope| scope.new_metric(name.clone(), kind))
.collect();
InputMetric::new(move |value, labels| {
InputMetric::new(name.join("/"), move |value, labels| {
for metric in &metrics {
metric.write(value, labels.clone())
}

View File

@ -117,7 +117,7 @@ impl InputScope for LogScope {
if self.is_buffered() {
// buffered
InputMetric::new(move |value, labels| {
InputMetric::new(name.join("/"), move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {
@ -131,7 +131,7 @@ impl InputScope for LogScope {
// unbuffered
let level = self.log.level;
let target = self.log.target.clone();
InputMetric::new(move |value, labels| {
InputMetric::new(name.join("/"), move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {

View File

@ -58,9 +58,9 @@ impl InputScope for StatsMapScope {
fn new_metric(&self, name: MetricName, _kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let write_to = self.inner.clone();
let name: String = name.join(".");
InputMetric::new(move |value, _labels| {
let _previous = write_to.write().expect("Lock").insert(name.clone(), value);
let key: String = name.join(".");
InputMetric::new(name.join("/"), move |value, _labels| {
let _previous = write_to.write().expect("Lock").insert(key.clone(), value);
})
}
}

View File

@ -185,9 +185,9 @@ impl WithAttributes for InputQueueScope {
impl InputScope for InputQueueScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = self.target.new_metric(name, kind);
let target_metric = self.target.new_metric(name.clone(), kind);
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {
InputMetric::new(name.join("/"), move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels))
{

View File

@ -174,9 +174,9 @@ impl WithAttributes for OutputQueueScope {
impl InputScope for OutputQueueScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let target_metric = Arc::new(self.target.new_metric(name, kind));
let target_metric = Arc::new(self.target.new_metric(name.clone(), kind));
let sender = self.sender.clone();
InputMetric::new(move |value, mut labels| {
InputMetric::new(name.join("/"), move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels))
{