From d50e38308d828beb29ae5eca56c0b7bf744e9219 Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Fri, 10 May 2019 15:16:21 -0400 Subject: [PATCH 1/2] Fix potential observer leak with unique metric IDs --- CHANGES.md | 7 +++++++ Cargo.toml | 2 +- README.md | 5 +++++ src/bucket/atomic.rs | 4 ++-- src/core/attributes.rs | 15 +++++++++------ src/core/input.rs | 10 +++++++++- src/core/locking.rs | 4 ++-- src/core/proxy.rs | 4 ++-- src/multi/multi_in.rs | 2 +- src/output/log.rs | 4 ++-- src/output/map.rs | 6 +++--- src/queue/queue_in.rs | 4 ++-- src/queue/queue_out.rs | 4 ++-- 13 files changed, 47 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c36181d..540eaa4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Latest changes + history +## version 0.7.5 +- Fix leak on observers when registering same metric twice. +- Add `metric_id()` on `InputMetric` + +## version 0.7.4 +- Reexport `ObserveWhen` to make it public + ## version 0.7.3 - Fixed / shushed a bunch of `clippy` warnings - Made `clippy` part of `make` checks diff --git a/Cargo.toml b/Cargo.toml index d1c5623..f76a513 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dipstick" -version = "0.7.4" +version = "0.7.5" authors = ["Francis Lalonde "] description = """A fast, all-purpose metrics library decoupling instrumentation from reporting backends. diff --git a/README.md b/README.md index 1b4a1a2..8a6634e 100755 --- a/README.md +++ b/README.md @@ -80,6 +80,11 @@ in the `[dependencies]` section: dipstick = "0.7.4" ``` +## External features + +Configuring dipstick from a text file is possible using +the [spirit-dipstick](https://crates.io/crates/spirit-dipstick) crate. + ## Building When building the crate prior to PR or release, just run plain old `make`. This will in turn run `cargo` a few times to run tests, benchmarks, lints, etc. diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index 3779707..d5377c6 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -257,10 +257,10 @@ impl InputScope for AtomicBucket { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let scores = write_lock!(self.inner) .metrics - .entry(self.prefix_append(name)) + .entry(self.prefix_append(name.clone())) .or_insert_with(|| Arc::new(AtomicScores::new(kind))) .clone(); - InputMetric::new(move |value, _labels| scores.update(value)) + InputMetric::new(name.join("/"), move |value, _labels| scores.update(value)) } } diff --git a/src/core/attributes.rs b/src/core/attributes.rs index f9467ea..a261c22 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -57,8 +57,9 @@ impl Default for Buffering { } } -type Shared = Arc>; -type Listener = Arc () + Send + Sync + 'static>; +pub type MetricId = String; +pub type Shared = Arc>; +pub type Listener = Arc () + Send + Sync + 'static>; /// Attributes common to metric components. /// Not all attributes used by all components. @@ -67,7 +68,7 @@ pub struct Attributes { naming: NameParts, sampling: Sampling, buffering: Buffering, - flush_listeners: Shared>, + flush_listeners: Shared>, tasks: Shared>, } @@ -108,7 +109,7 @@ where { fn notify_flush_listeners(&self) { let now = Instant::now(); - for listener in read_lock!(self.get_attributes().flush_listeners).iter() { + for listener in read_lock!(self.get_attributes().flush_listeners).values() { (listener)(now) } } @@ -130,8 +131,10 @@ where pub fn on_flush(self) { let gauge = self.metric; let op = self.operation; - write_lock!(self.target.get_attributes().flush_listeners) - .push(Arc::new(move |now| gauge.write(op(now), Labels::default()))); + let mut listeners = write_lock!(self.target.get_attributes().flush_listeners); + if !listeners.contains_key(gauge.metric_id()) { + listeners.insert(gauge.metric_id().clone(), Arc::new(move |now| gauge.write(op(now), Labels::default()))); + } } /// Observe the metric's value periodically. diff --git a/src/core/input.rs b/src/core/input.rs index 734c1a9..05e09d8 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -2,6 +2,7 @@ use core::clock::TimeHandle; use core::label::Labels; use core::name::MetricName; use core::{Flush, MetricValue}; +use core::attributes::MetricId; use std::fmt; use std::sync::Arc; @@ -75,6 +76,7 @@ pub trait InputScope: Flush { /// A metric is actually a function that knows to write a metric value to a metric output. #[derive(Clone)] pub struct InputMetric { + identifier: MetricId, inner: Arc, } @@ -86,8 +88,9 @@ impl fmt::Debug for InputMetric { impl InputMetric { /// Utility constructor - pub fn new(metric: F) -> InputMetric { + pub fn new(identifier: MetricId, metric: F) -> InputMetric { InputMetric { + identifier, inner: Arc::new(metric), } } @@ -97,6 +100,11 @@ impl InputMetric { pub fn write(&self, value: MetricValue, labels: Labels) { (self.inner)(value, labels) } + + /// Returns the unique identifier of this metric. + pub fn metric_id(&self) -> &MetricId { + &self.identifier + } } /// Used to differentiate between metric kinds in the backend. diff --git a/src/core/locking.rs b/src/core/locking.rs index b5b1b9d..634bd80 100755 --- a/src/core/locking.rs +++ b/src/core/locking.rs @@ -37,9 +37,9 @@ impl InputScope for LockingOutput { .inner .lock() .expect("LockingOutput") - .new_metric(name, kind); + .new_metric(name.clone(), kind); let mutex = self.inner.clone(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { // lock when collecting values let _guard = mutex.lock().expect("LockingOutput"); raw_metric.write(value, labels) diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 96523a0..9142ec4 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -145,7 +145,7 @@ impl InnerProxy { break; } - if let Some(mut metric) = metric.upgrade() { + if let Some(metric) = metric.upgrade() { // check if metric targeted by _lower_ namespace if metric.target.borrow().1 > namespace.len() { continue; @@ -254,7 +254,7 @@ impl InputScope for Proxy { proxy } }); - InputMetric::new(move |value, labels| proxy.target.borrow().0.write(value, labels)) + InputMetric::new(name.join("/"), move |value, labels| proxy.target.borrow().0.write(value, labels)) } } diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index 7a10c49..728c141 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -90,7 +90,7 @@ impl InputScope for MultiInputScope { .iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { for metric in &metrics { metric.write(value, labels.clone()) } diff --git a/src/output/log.rs b/src/output/log.rs index be30e9d..5b205a2 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -117,7 +117,7 @@ impl InputScope for LogScope { if self.is_buffered() { // buffered - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { @@ -131,7 +131,7 @@ impl InputScope for LogScope { // unbuffered let level = self.log.level; let target = self.log.target.clone(); - InputMetric::new(move |value, labels| { + InputMetric::new(name.join("/"), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { diff --git a/src/output/map.rs b/src/output/map.rs index e43b8b9..cf8be99 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -58,9 +58,9 @@ impl InputScope for StatsMapScope { fn new_metric(&self, name: MetricName, _kind: InputKind) -> InputMetric { let name = self.prefix_append(name); let write_to = self.inner.clone(); - let name: String = name.join("."); - InputMetric::new(move |value, _labels| { - let _previous = write_to.write().expect("Lock").insert(name.clone(), value); + let key: String = name.join("."); + InputMetric::new(name.join("/"), move |value, _labels| { + let _previous = write_to.write().expect("Lock").insert(key.clone(), value); }) } } diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 79a6718..8b332fb 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -185,9 +185,9 @@ impl WithAttributes for InputQueueScope { impl InputScope for InputQueueScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - let target_metric = self.target.new_metric(name, kind); + let target_metric = self.target.new_metric(name.clone(), kind); let sender = self.sender.clone(); - InputMetric::new(move |value, mut labels| { + InputMetric::new(name.join("/"), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) { diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index 1628ec1..8a276db 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -174,9 +174,9 @@ impl WithAttributes for OutputQueueScope { impl InputScope for OutputQueueScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - let target_metric = Arc::new(self.target.new_metric(name, kind)); + let target_metric = Arc::new(self.target.new_metric(name.clone(), kind)); let sender = self.sender.clone(); - InputMetric::new(move |value, mut labels| { + InputMetric::new(name.join("/"), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) { From c88cc8f7f869731e280d695102c47ed7edc15ec7 Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Tue, 14 May 2019 20:07:49 -0400 Subject: [PATCH 2/2] Observer on_flush PR fixes --- examples/observer.rs | 9 ++++++--- src/bucket/atomic.rs | 4 ++-- src/core/attributes.rs | 35 ++++++++++++++++++++++++++++------- src/core/locking.rs | 4 ++-- src/core/proxy.rs | 4 ++-- src/core/scheduler.rs | 18 +++++++++++++----- src/lib.rs | 2 +- src/multi/multi_in.rs | 4 ++-- src/multi/multi_out.rs | 2 +- src/output/log.rs | 6 +++--- src/output/map.rs | 4 ++-- src/queue/queue_in.rs | 4 ++-- src/queue/queue_out.rs | 4 ++-- 13 files changed, 66 insertions(+), 34 deletions(-) diff --git a/examples/observer.rs b/examples/observer.rs index 860494c..c77b9e1 100644 --- a/examples/observer.rs +++ b/examples/observer.rs @@ -28,18 +28,21 @@ fn main() { metrics.observe(uptime, |_| 6).on_flush(); // record number of threads in pool every second - metrics + let scheduled = metrics .observe(metrics.gauge("threads"), thread_count) .every(Duration::from_secs(1)); // "heartbeat" metric - metrics + let on_flush = metrics .observe(metrics.marker("heartbeat"), |_| 1) .on_flush(); - loop { + for _ in 0..1000 { std::thread::sleep(Duration::from_millis(40)); } + + on_flush.cancel(); + scheduled.cancel(); } /// Query number of running threads in this process using Linux's /proc filesystem. diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index d5377c6..11a9798 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -2,7 +2,7 @@ use bucket::ScoreType::*; use bucket::{stats_summary, ScoreType}; -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::clock::TimeHandle; use core::error; use core::input::{InputKind, InputMetric, InputScope}; @@ -260,7 +260,7 @@ impl InputScope for AtomicBucket { .entry(self.prefix_append(name.clone())) .or_insert_with(|| Arc::new(AtomicScores::new(kind))) .clone(); - InputMetric::new(name.join("/"), move |value, _labels| scores.update(value)) + InputMetric::new(MetricId::forge("bucket", name), move |value, _labels| scores.update(value)) } } diff --git a/src/core/attributes.rs b/src/core/attributes.rs index a261c22..c53d9ee 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -3,7 +3,7 @@ use std::default::Default; use std::sync::Arc; use core::name::{MetricName, NameParts}; -use core::scheduler::SCHEDULER; +use core::scheduler::{SCHEDULER, Cancel}; use std::fmt; use std::time::{Duration, Instant}; use {CancelHandle, Flush, InputMetric, InputScope, MetricValue}; @@ -57,7 +57,16 @@ impl Default for Buffering { } } -pub type MetricId = String; +#[derive(Clone, Debug, Hash, Eq, PartialOrd, PartialEq)] +pub struct MetricId (String); + +impl MetricId { + pub fn forge(out_type: &str, name: MetricName) -> Self { + let id: String = name.join("/"); + MetricId(format!("{}:{}", out_type, id)) + } +} + pub type Shared = Arc>; pub type Listener = Arc () + Send + Sync + 'static>; @@ -122,19 +131,31 @@ pub struct ObserveWhen<'a, T, F> { operation: Arc, } +pub struct OnFlushCancel (Arc); + +impl Cancel for OnFlushCancel { + fn cancel(&self) { + (self.0)() + } +} + impl<'a, T, F> ObserveWhen<'a, T, F> where F: Fn(Instant) -> MetricValue + Send + Sync + 'static, T: InputScope + WithAttributes + Send + Sync, { /// Observe the metric's value upon flushing the scope. - pub fn on_flush(self) { + pub fn on_flush(self) -> OnFlushCancel { let gauge = self.metric; + let metric_id = gauge.metric_id().clone(); let op = self.operation; - let mut listeners = write_lock!(self.target.get_attributes().flush_listeners); - if !listeners.contains_key(gauge.metric_id()) { - listeners.insert(gauge.metric_id().clone(), Arc::new(move |now| gauge.write(op(now), Labels::default()))); - } + write_lock!(self.target.get_attributes().flush_listeners) + .insert(metric_id.clone(), + Arc::new(move |now| gauge.write(op(now), Labels::default()))); + let flush_listeners = self.target.get_attributes().flush_listeners.clone(); + OnFlushCancel(Arc::new(move || { + write_lock!(flush_listeners).remove(&metric_id); + })) } /// Observe the metric's value periodically. diff --git a/src/core/locking.rs b/src/core/locking.rs index 634bd80..76e67cc 100755 --- a/src/core/locking.rs +++ b/src/core/locking.rs @@ -2,7 +2,7 @@ //! This makes all outputs also immediately usable as inputs. //! The alternatives are queuing or thread local. -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -39,7 +39,7 @@ impl InputScope for LockingOutput { .expect("LockingOutput") .new_metric(name.clone(), kind); let mutex = self.inner.clone(); - InputMetric::new(name.join("/"), move |value, labels| { + InputMetric::new(MetricId::forge("locking", name), move |value, labels| { // lock when collecting values let _guard = mutex.lock().expect("LockingOutput"); raw_metric.write(value, labels) diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 9142ec4..030e226 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -1,6 +1,6 @@ //! Decouple metric definition from configuration with trait objects. -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{InputKind, InputMetric, InputScope}; use core::name::{MetricName, NameParts}; @@ -254,7 +254,7 @@ impl InputScope for Proxy { proxy } }); - InputMetric::new(name.join("/"), move |value, labels| proxy.target.borrow().0.write(value, labels)) + InputMetric::new(MetricId::forge("proxy", name), move |value, labels| proxy.target.borrow().0.write(value, labels)) } } diff --git a/src/core/scheduler.rs b/src/core/scheduler.rs index 73e103b..a3ba434 100644 --- a/src/core/scheduler.rs +++ b/src/core/scheduler.rs @@ -10,6 +10,12 @@ use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; +/// A deferred, repeatable, background action that can be cancelled. +pub trait Cancel { + /// Cancel the action. + fn cancel(&self); +} + /// A handle to cancel a scheduled task if required. #[derive(Debug, Clone)] pub struct CancelHandle(Arc); @@ -19,16 +25,18 @@ impl CancelHandle { CancelHandle(Arc::new(AtomicBool::new(false))) } + fn is_cancelled(&self) -> bool { + self.0.load(SeqCst) + } +} + +impl Cancel for CancelHandle { /// Signals the task to stop. - pub fn cancel(&self) { + fn cancel(&self) { if self.0.swap(true, SeqCst) { warn!("Scheduled task was already cancelled.") } } - - fn is_cancelled(&self) -> bool { - self.0.load(SeqCst) - } } /// Enable background periodical publication of metrics diff --git a/src/lib.rs b/src/lib.rs index 5b4e365..fe13110 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,7 @@ pub use core::label::{AppLabel, Labels, ThreadLabel}; pub use core::locking::LockingOutput; pub use core::name::{MetricName, NameParts}; pub use core::output::{Output, OutputDyn, OutputMetric, OutputScope}; -pub use core::scheduler::{CancelHandle, ScheduleFlush}; +pub use core::scheduler::{CancelHandle, ScheduleFlush, Cancel}; pub use core::void::Void; pub use core::{Flush, MetricValue}; diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index 728c141..8d0f53d 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -1,6 +1,6 @@ //! Dispatch metrics to multiple sinks. -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -90,7 +90,7 @@ impl InputScope for MultiInputScope { .iter() .map(move |scope| scope.new_metric(name.clone(), kind)) .collect(); - InputMetric::new(name.join("/"), move |value, labels| { + InputMetric::new(MetricId::forge("multi", name.clone()), move |value, labels| { for metric in &metrics { metric.write(value, labels.clone()) } diff --git a/src/multi/multi_out.rs b/src/multi/multi_out.rs index 688deb9..111b25d 100755 --- a/src/multi/multi_out.rs +++ b/src/multi/multi_out.rs @@ -86,7 +86,7 @@ impl MultiOutputScope { impl OutputScope for MultiOutputScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { - let name = &self.prefix_append(name); + let name = self.prefix_append(name); let metrics: Vec = self .scopes .iter() diff --git a/src/output/log.rs b/src/output/log.rs index 5b205a2..0131fea 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -1,5 +1,5 @@ use cache::cache_in; -use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -117,7 +117,7 @@ impl InputScope for LogScope { if self.is_buffered() { // buffered - InputMetric::new(name.join("/"), move |value, labels| { + InputMetric::new(MetricId::forge("log", name), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { @@ -131,7 +131,7 @@ impl InputScope for LogScope { // unbuffered let level = self.log.level; let target = self.log.target.clone(); - InputMetric::new(name.join("/"), move |value, labels| { + InputMetric::new(MetricId::forge("log", name), move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { Ok(()) => { diff --git a/src/output/map.rs b/src/output/map.rs index cf8be99..c60ae98 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -1,4 +1,4 @@ -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::input::InputKind; use core::input::{Input, InputMetric, InputScope}; use core::name::MetricName; @@ -59,7 +59,7 @@ impl InputScope for StatsMapScope { let name = self.prefix_append(name); let write_to = self.inner.clone(); let key: String = name.join("."); - InputMetric::new(name.join("/"), move |value, _labels| { + InputMetric::new(MetricId::forge("map", name), move |value, _labels| { let _previous = write_to.write().expect("Lock").insert(key.clone(), value); }) } diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 8b332fb..73fdfb2 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -3,7 +3,7 @@ //! If queue size is exceeded, calling code reverts to blocking. use cache::cache_in::CachedInput; -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::label::Labels; @@ -187,7 +187,7 @@ impl InputScope for InputQueueScope { let name = self.prefix_append(name); let target_metric = self.target.new_metric(name.clone(), kind); let sender = self.sender.clone(); - InputMetric::new(name.join("/"), move |value, mut labels| { + InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) { diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index 8a276db..d8b69c0 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -3,7 +3,7 @@ //! If queue size is exceeded, calling code reverts to blocking. use cache::cache_in; -use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes, MetricId}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::label::Labels; @@ -176,7 +176,7 @@ impl InputScope for OutputQueueScope { let name = self.prefix_append(name); let target_metric = Arc::new(self.target.new_metric(name.clone(), kind)); let sender = self.sender.clone(); - InputMetric::new(name.join("/"), move |value, mut labels| { + InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| { labels.save_context(); if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) {