Save scopes

This commit is contained in:
Francis Lalonde 2018-10-12 16:44:39 -04:00
parent ad4168e977
commit 5e9aebbb22
35 changed files with 497 additions and 231 deletions

View File

@ -43,6 +43,7 @@ bench = []
self_metrics = []
proto = [ "protoc-rust", "protobuf" ]
prometheus = []
tokio = []
[package.metadata.release]
#sign-commit = true

View File

@ -3,12 +3,9 @@
#[macro_use]
extern crate dipstick;
#[macro_use]
extern crate lazy_static;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Proxy, Text, Counter, Marker, InputScope, QueuedOutput, Input};
use dipstick::{Proxy, Stream, Counter, Marker, InputScope, QueuedOutput, Input};
use std::io;
use std::thread;
@ -19,7 +16,7 @@ metrics!{
fn main() {
Proxy::set_default_target(
Text::write_to(io::stdout()).queued(100).input());
Stream::write_to(io::stdout()).queued(100).input());
for _ in 0..4 {
thread::spawn(move || {
loop {

View File

@ -10,7 +10,7 @@ use dipstick::*;
fn main() {
// for this demo, print metric values to the console
let app_metrics = Text::write_to(io::stdout()).input();
let app_metrics = Stream::write_to(io::stdout()).input();
// metrics can be predefined by type and name
let counter = app_metrics.counter("counter_a");

View File

@ -26,6 +26,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_to(&Text::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
}

View File

@ -30,6 +30,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_to(&Text::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
}

View File

@ -27,6 +27,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_to(&Text::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
}

View File

@ -11,7 +11,7 @@ fn main() {
let metrics = Bucket::new().add_naming("test");
// Bucket::set_default_output(to_stdout());
metrics.set_target(Text::write_to(io::stdout()));
metrics.set_target(Stream::write_to(io::stdout()));
metrics.flush_every(Duration::from_secs(3));

View File

@ -11,7 +11,7 @@ use std::thread::sleep;
fn main() {
let bucket = Bucket::new();
Bucket::set_default_target(Text::write_to(io::stdout()));
Bucket::set_default_target(Stream::write_to(io::stdout()));
let persistent_marker = bucket.marker("persistent");

View File

@ -10,7 +10,7 @@ use std::io;
fn main() {
let app_metrics = Bucket::new();
app_metrics.set_target(Text::write_to(io::stdout()));
app_metrics.set_target(Stream::write_to(io::stdout()));
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -9,7 +9,7 @@ use std::io;
use dipstick::*;
fn main() {
let input = Text::write_to(io::stdout()).buffered(Buffering::Unlimited);
let input = Stream::write_to(io::stdout()).buffered(Buffering::Unlimited);
loop {
println!("\n------- open scope");

View File

@ -8,7 +8,7 @@ use std::io;
use dipstick::*;
fn main() {
let metrics = Text::write_to(io::stdout()).cached(5).input().add_naming("cache");
let metrics = Stream::write_to(io::stdout()).cached(5).input().add_naming("cache");
loop {
// report some ad-hoc metric values from our "application" loop

View File

@ -4,9 +4,6 @@
#[macro_use]
extern crate dipstick;
#[macro_use]
extern crate lazy_static;
use std::time::Duration;
use dipstick::*;
use std::thread::sleep;
@ -37,7 +34,7 @@ fn main() {
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
Bucket::set_default_target(Text::write_to(io::stdout()));
Bucket::set_default_target(Stream::write_to(io::stdout()));
Bucket::set_default_stats(stats_all);
loop {

View File

@ -42,7 +42,7 @@ fn main() {
}
// send application metrics to aggregator
Bucket::set_default_target(Text::stderr());
Bucket::set_default_target(Stream::stderr());
Bucket::set_default_stats(custom_statistics);
let app_metrics = Bucket::new();

View File

@ -2,8 +2,6 @@
#[macro_use]
extern crate dipstick;
#[macro_use]
pub extern crate lazy_static;
use dipstick::*;
@ -39,7 +37,7 @@ metrics!(LIB_METRICS => {
});
fn main() {
dipstick::Proxy::set_default_target(dipstick::Text::write_to(io::stdout()).input());
dipstick::Proxy::set_default_target(dipstick::Stream::write_to(io::stdout()).input());
loop {
ROOT_COUNTER.count(123);

View File

@ -2,7 +2,7 @@
extern crate dipstick;
use dipstick::{MultiInput, Graphite, Text, Input, InputScope, Naming};
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Naming};
use std::time::Duration;
use std::io;
@ -10,13 +10,13 @@ fn main() {
// will output metrics to graphite and to stdout
let different_type_metrics = MultiInput::input()
.add_target(Graphite::send_to("localhost:2003").expect("Connecting"))
.add_target(Text::write_to(io::stdout()))
.add_target(Stream::write_to(io::stdout()))
.input();
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiInput::input()
.add_target(Text::write_to(io::stdout()).add_naming("yeah"))
.add_target(Text::write_to(io::stdout()).add_naming("ouch"))
.add_target(Stream::write_to(io::stdout()).add_naming("yeah"))
.add_target(Stream::write_to(io::stdout()).add_naming("ouch"))
.add_naming("cool")
.input();

View File

@ -10,18 +10,18 @@ fn main() {
// will output metrics to graphite and to stdout
let different_type_metrics = MultiOutput::output()
.add_target(Graphite::send_to("localhost:2003").expect("Connecting"))
.add_target(Text::write_to(io::stdout()))
.add_target(Stream::write_to(io::stdout()))
.input();
// will output metrics twice, once with "cool.yeah" prefix and once with "cool.ouch" prefix.
let same_type_metrics = MultiOutput::output()
.add_target(Text::write_to(io::stderr()).add_naming("out_1"))
.add_target(Text::write_to(io::stderr()).add_naming("out_2"))
.add_target(Stream::write_to(io::stderr()).add_naming("out_1"))
.add_target(Stream::write_to(io::stderr()).add_naming("out_2"))
.add_naming("out_both").input();
loop {
different_type_metrics.new_metric("counter_a".into(), Kind::Counter).write(123);
same_type_metrics.new_metric("timer_a".into(), Kind::Timer).write(6677);
different_type_metrics.new_metric("counter_a".into(), Kind::Counter).write(123, labels![]);
same_type_metrics.new_metric("timer_a".into(), Kind::Timer).write(6677, labels![]);
std::thread::sleep(Duration::from_millis(400));
}
}

View File

@ -5,7 +5,7 @@ extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use std::io;
use dipstick::{Proxy, Text, InputScope, Input, Naming};
use dipstick::{Proxy, Stream, InputScope, Input, Naming};
fn main() {
@ -17,17 +17,17 @@ fn main() {
let count2 = sub.counter("counter_b");
loop {
root.set_target(Text::write_to(io::stdout()).input());
root.set_target(Stream::write_to(io::stdout()).input());
count1.count(1);
count2.count(2);
// route every metric from the root to stdout with prefix "root"
root.set_target(Text::write_to(io::stdout()).add_naming("root").input());
root.set_target(Stream::write_to(io::stdout()).add_naming("root").input());
count1.count(3);
count2.count(4);
// route metrics from "sub" to stdout with prefix "mutant"
sub.set_target(Text::write_to(io::stdout()).add_naming("mutant").input());
sub.set_target(Stream::write_to(io::stdout()).add_naming("mutant").input());
count1.count(5);
count2.count(6);
@ -42,7 +42,7 @@ fn main() {
count2.count(10);
// go back to initial single un-prefixed route
root.set_target(Text::write_to(io::stdout()).input());
root.set_target(Stream::write_to(io::stdout()).input());
count1.count(11);
count2.count(12);

View File

@ -1,9 +1,10 @@
//! Use the metrics backend directly to log a metric value.
//! Applications should use the metrics()-provided instruments instead.
#[macro_use]
extern crate dipstick;
use dipstick::{Input, InputScope};
use dipstick::{Input, InputScope, Labels};
fn main() {
raw_write()
@ -18,5 +19,5 @@ pub fn raw_write() {
"count_a".into(),
dipstick::Kind::Counter,
);
counter.write(1);
counter.write(1, labels![]);
}

View File

@ -0,0 +1,26 @@
//! A sample application asynchronously printing metrics to stdout.
#[macro_use]
extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Proxy, Stream, Counter, Marker, InputScope, QueuedOutput, Input, SimpleFormat, Formatting, AppLabel};
use std::io;
use std::thread;
metrics!{
COUNTER: Counter = "counter_a";
}
fn main() {
Proxy::set_default_target(
Stream::stderr().formatting(SimpleFormat::default()).input());
AppLabel::set("abc", "xyz");
loop {
// report some metric values from our "application" loop
COUNTER.count(11);
sleep(Duration::from_millis(500));
}
}

View File

@ -10,8 +10,9 @@
For speed and easier maintenance, metrics are usually defined statically:
```rust,skt-plain
#[macro_use] extern crate dipstick;
#[macro_use] extern crate lazy_static;
#[macro_use]
extern crate dipstick;
use dipstick::*;
metrics!("my_app" => {

View File

@ -119,7 +119,7 @@ impl InnerBucket {
let filtered = (stats_fn)(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: OutputMetric = publish_scope.new_metric(name, kind);
metric.write(value, vec![])
metric.write(value, labels![])
}
}
}
@ -298,14 +298,14 @@ mod bench {
fn aggregate_marker(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("event_a".into(), Kind::Marker);
b.iter(|| test::black_box(metric.write(1, vec![])));
b.iter(|| test::black_box(metric.write(1, labels![])));
}
#[bench]
fn aggregate_counter(b: &mut test::Bencher) {
let sink = Bucket::new();
let metric = sink.new_metric("count_a".into(), Kind::Counter);
b.iter(|| test::black_box(metric.write(1, vec![])));
b.iter(|| test::black_box(metric.write(1, labels![])));
}
}

View File

@ -1,7 +1,7 @@
use core::clock::TimeHandle;
use core::{Value, Flush};
use core::name::Name;
use ::Labels;
use ::{Labels};
use std::sync::Arc;
use std::fmt;
@ -63,7 +63,7 @@ pub trait InputScope: Flush {
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {
inner: Arc<Fn(Value, Vec<Labels>) + Send + Sync>
inner: Arc<Fn(Value, Labels) + Send + Sync>
}
impl fmt::Debug for InputMetric {
@ -74,13 +74,13 @@ impl fmt::Debug for InputMetric {
impl InputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Vec<Labels>) + Send + Sync + 'static>(metric: F) -> InputMetric {
pub fn new<F: Fn(Value, Labels) + Send + Sync + 'static>(metric: F) -> InputMetric {
InputMetric { inner: Arc::new(metric) }
}
/// Collect a new value for this metric.
#[inline]
pub fn write(&self, value: Value, labels: Vec<Labels>) {
pub fn write(&self, value: Value, labels: Labels) {
(self.inner)(value, labels)
}
}
@ -122,7 +122,7 @@ pub struct Marker {
impl Marker {
/// Record a single event occurence.
pub fn mark(&self) {
self.inner.write(1, vec![])
self.inner.write(1, labels![])
}
}
@ -135,7 +135,7 @@ pub struct Counter {
impl Counter {
/// Record a value count.
pub fn count<V: ToPrimitive>(&self, count: V) {
self.inner.write(count.to_u64().unwrap(), vec![])
self.inner.write(count.to_u64().unwrap(), labels![])
}
}
@ -148,7 +148,7 @@ pub struct Gauge {
impl Gauge {
/// Record a value point for this gauge.
pub fn value<V: ToPrimitive>(&self, value: V) {
self.inner.write(value.to_u64().unwrap(), vec![])
self.inner.write(value.to_u64().unwrap(), labels![])
}
}
@ -167,7 +167,7 @@ impl Timer {
/// Record a microsecond interval for this timer
/// Can be used in place of start()/stop() if an external time interval source is used
pub fn interval_us<V: ToPrimitive>(&self, interval_us: V) -> V {
self.inner.write(interval_us.to_u64().unwrap(), vec![]);
self.inner.write(interval_us.to_u64().unwrap(), labels![]);
interval_us
}
@ -222,4 +222,3 @@ impl From<InputMetric> for Marker {
Marker { inner: metric }
}
}

View File

@ -1,17 +1,21 @@
use std::collections::{HashMap};
use std::sync::{Arc, RwLock};
use std::cell::{RefCell, Ref};
use std::cell::RefCell;
/// Label values are immutable but can move around a lot.
pub type LabelValue = Arc<String>;
type LabelValue = Arc<String>;
/// A reference table of key / value string pairs that may be used on output for additional metric context.
#[derive(Default, Debug, Clone)]
pub struct Labels {
///
/// For concurrency reasons, labels are immutable.
/// All write operations return a mutated clone of the original.
#[derive(Debug, Clone, Default)]
struct LabelScope {
pairs: Option<Arc<HashMap<String, LabelValue>>>
}
impl Labels {
impl LabelScope {
/// Sets the value on a new copy of the map, then returns that copy.
fn set(&self, key: String, value: LabelValue) -> Self {
let mut new_pairs = match self.pairs {
None => HashMap::new(),
@ -19,7 +23,7 @@ impl Labels {
};
new_pairs.insert(key, value);
Labels { pairs: Some(Arc::new(new_pairs)) }
LabelScope { pairs: Some(Arc::new(new_pairs)) }
}
fn unset(&self, key: &str) -> Self {
@ -29,9 +33,9 @@ impl Labels {
let mut new_pairs = old_pairs.as_ref().clone();
if new_pairs.remove(key).is_some() {
if new_pairs.is_empty() {
Labels { pairs: None }
LabelScope { pairs: None }
} else {
Labels { pairs: Some(Arc::new(new_pairs)) }
LabelScope { pairs: Some(Arc::new(new_pairs)) }
}
} else {
// key wasn't set, labels unchanged
@ -51,92 +55,205 @@ impl Labels {
}
lazy_static!(
static ref GLOBAL_LABELS: RwLock<Labels> = RwLock::new(Labels::default());
static ref APP_LABELS: RwLock<LabelScope> = RwLock::new(LabelScope::default());
);
thread_local! {
static THREAD_LABELS: RefCell<Labels> = RefCell::new(Labels::default());
static THREAD_LABELS: RefCell<LabelScope> = RefCell::new(LabelScope::default());
}
/// Scopes to which metric labels can be attached.
pub enum LabelScope {
/// Handle metric labels for the whole application (globals).
APP,
/// Handle metric labels for the current thread.
THREAD,
///// Scopes to which metric labels can be attached.
//#[derive(Debug, Clone, Copy)]
//pub enum LabelContext {
//
// #[cfg(feature="tokio")]
// /// Handle metric labels for the current tokio "task".
// /// Serves the same purpose as thread scope when using shared-thread async frameworks.
// TASK,
}
//
// /// Scope local to this single metric value.
// /// Labels are passed explicitly by the calling code.
// /// Value scope labels have the highest lookup priority and will override any value specified
// /// in lower scopes.
// VALUE,
//}
impl LabelScope {
/// Freeze the current label values for usage at later time.
pub fn export(&self) -> Labels {
match *self {
LabelScope::APP => GLOBAL_LABELS.read().expect("Global Labels").clone(),
LabelScope::THREAD => {
// FIXME is there a cleaner way to capture the clone out of the 'with' closure?
let mut labels: Option<Labels> = None;
THREAD_LABELS.with(|map| labels = Some(map.borrow().clone()));
labels.unwrap()
},
}
}
pub trait LabelContext {
/// Retrieve a value for the scope.
fn get(key: &str) -> Option<Arc<String>>;
/// Set a new value for the scope.
/// Replaces any previous value for the key.
pub fn set(&self, key: String, value: String) {
match *self {
LabelScope::APP => {
let b = GLOBAL_LABELS.read().expect("Global Labels");
*GLOBAL_LABELS.write().expect("Global Labels") = b.set(key, Arc::new(value));
},
LabelScope::THREAD => {
THREAD_LABELS.with(|map| {
let b: Ref<Labels> = map.borrow();
*map.borrow_mut() = b.set(key, Arc::new(value));
})
},
}
}
fn set(key: String, value: String);
/// Unset a value for the scope.
/// Has no effect if key was not set.
pub fn unset(&self, key: &str) {
match *self {
LabelScope::APP => {
let b = GLOBAL_LABELS.read().expect("Global Labels");
*GLOBAL_LABELS.write().expect("Global Labels") = b.unset(key);
},
LabelScope::THREAD => {
THREAD_LABELS.with(|map| {
let b: Ref<Labels> = map.borrow();
*map.borrow_mut() = b.unset(key);
})
},
}
fn unset(key: &str);
/// Freeze the current label values for usage at later time.
fn export() -> LabelScope;
}
/// Handle metric labels for the current thread.
/// App scope labels have the lowest lookup priority and serve as a fallback to other scopes.
pub struct ThreadLabel;
impl LabelContext for ThreadLabel {
fn get(key: &str) -> Option<Arc<String>> {
THREAD_LABELS.with(|map| map.borrow().get(key))
}
/// Retrieve a value for the scope.
pub fn get(&self, key: &str) -> Option<Arc<String>> {
match *self {
LabelScope::APP => {
let b = GLOBAL_LABELS.read().expect("Global Labels");
b.get(key)
},
LabelScope::THREAD => {
THREAD_LABELS.with(|map| {
let b: Ref<Labels> = map.borrow();
b.get(key)
})
},
fn set(key: String, value: String) {
let lab_val = Arc::new(value);
THREAD_LABELS.with(|map| {
let new = { map.borrow().set(key, lab_val) };
*map.borrow_mut() = new;
});
}
fn unset(key: &str) {
THREAD_LABELS.with(|map| {
let new = { map.borrow().unset(key) };
*map.borrow_mut() = new;
});
}
fn export() -> LabelScope {
THREAD_LABELS.with(|map| map.borrow().clone())
}
}
/// Handle metric labels for the whole application (globals).
/// App scope labels have the lowest lookup priority and serve as a fallback to other scopes.
pub struct AppLabel;
impl LabelContext for AppLabel {
fn get(key: &str) -> Option<Arc<String>> {
APP_LABELS.read().expect("Global Labels").get(key)
}
fn set(key: String, value: String) {
let b = { APP_LABELS.read().expect("Global Labels").set(key, Arc::new(value)) };
*APP_LABELS.write().expect("Global Labels") = b;
}
fn unset(key: &str) {
let b = { APP_LABELS.read().expect("Global Labels").unset(key) };
*APP_LABELS.write().expect("Global Labels") = b;
}
fn export() -> LabelScope {
APP_LABELS.read().expect("Global Labels").clone()
}
}
/// Base structure to carry metric labels from the application to the metric backend(s).
/// Can carry both one-off labels and exported context labels (if async metrics are enabled).
/// Used in applications through the labels!() macro.
#[derive(Debug, Clone)]
pub struct Labels {
scopes: Vec<LabelScope>,
}
impl From<HashMap<String, LabelValue>> for Labels {
fn from(map: HashMap<String, LabelValue>) -> Self {
Labels {
scopes: vec![LabelScope {
pairs: Some(Arc::new(map))
}]
}
}
}
impl Default for Labels {
/// Create empty labels.
/// Only Thread and App labels will be used for lookups.
#[inline]
fn default() -> Self {
Labels { scopes: vec![] }
}
}
impl Labels {
/// Used to save metric context before enqueuing value for async output.
pub fn save_context(&mut self) {
self.scopes.push(ThreadLabel::export());
self.scopes.push(AppLabel::export());
}
/// Generic label lookup function.
/// Searches provided labels, provided scopes or default scopes.
pub fn lookup(key: &str, labels: &Vec<Labels>) -> Option<LabelValue> {
LabelScope::THREAD.get(key).or_else(|| LabelScope::APP.get(key))
// TODO needs less magic, add checks?
pub fn lookup(&self, key: &str) -> Option<LabelValue> {
fn lookup_current_context(key: &str) -> Option<LabelValue> {
ThreadLabel::get(key).or_else(|| AppLabel::get(key))
}
match self.scopes.len() {
// no value labels, no saved context labels
0 => lookup_current_context(key),
// some value labels, no saved context labels
1 => self.scopes[0].get(key).or_else(|| lookup_current_context(key)),
// some context labels
_ => {
for src in &self.scopes {
if let Some(label_value) = src.get(key) {
return Some(label_value)
}
}
None
}
}
}
}
#[cfg(test)]
pub mod test {
use super::*;
#[test]
fn context_labels() {
AppLabel::set("abc".into(), "456".into());
ThreadLabel::set("abc".into(), "123".into());
assert_eq!(Arc::new("123".into()), labels!().lookup("abc").unwrap());
ThreadLabel::unset("abc");
assert_eq!(Arc::new("456".into()), labels!().lookup("abc").unwrap());
AppLabel::unset("abc");
assert_eq!(false, labels!().lookup("abc").is_some());
}
#[test]
fn labels_macro() {
let labels = labels!{
"abc" => "789",
"xyz" => "123"
};
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
assert_eq!(Arc::new("123".into()), labels.lookup("xyz").unwrap());
}
#[test]
fn value_labels() {
AppLabel::set("abc".into(), "456".into());
ThreadLabel::set("abc".into(), "123".into());
let mut labels = labels!{
"abc" => "789",
};
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
ThreadLabel::unset("abc");
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
AppLabel::unset("abc");
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
labels = labels![];
assert_eq!(false, labels.lookup("abc").is_some());
}
}

View File

@ -32,7 +32,7 @@ pub mod test {
fn test_to_void() {
let c = void::Void::metrics().input();
let m = c.new_metric("test".into(), input::Kind::Marker);
m.write(33, vec![]);
m.write(33, labels![]);
}
}

View File

@ -2,7 +2,7 @@ use core::{Flush, Value};
use core::input::Kind;
use core::name::Name;
use core::void::Void;
use ::Labels;
use ::{Labels};
use std::rc::Rc;
@ -17,19 +17,19 @@ pub trait OutputScope: Flush {
/// Output metrics are not thread safe.
#[derive(Clone)]
pub struct OutputMetric {
inner: Rc<Fn(Value, Vec<Labels>)>
inner: Rc<Fn(Value, Labels)>
}
impl OutputMetric {
/// Utility constructor
pub fn new<F: Fn(Value, Vec<Labels>) + 'static>(metric: F) -> OutputMetric {
pub fn new<F: Fn(Value, Labels) + 'static>(metric: F) -> OutputMetric {
OutputMetric { inner: Rc::new(metric) }
}
/// Some may prefer the `metric.write(value)` form to the `(metric)(value)` form.
/// This shouldn't matter as metrics should be of type Counter, Marker, etc.
#[inline]
pub fn write(&self, value: Value, labels: Vec<Labels>) {
pub fn write(&self, value: Value, labels: Labels) {
(self.inner)(value, labels)
}
}

View File

@ -24,6 +24,7 @@ extern crate time;
#[macro_use]
mod macros;
pub use macros::*;
mod core;
pub use core::{Flush, Value};
@ -35,7 +36,7 @@ pub use core::scheduler::{ScheduleFlush, CancelHandle};
pub use core::out_lock::{LockingScopeBox};
pub use core::error::{Error, Result};
pub use core::clock::{TimeHandle};
pub use core::label::{LabelScope, LabelValue, Labels};
pub use core::label::{Labels, AppLabel, ThreadLabel};
#[cfg(test)]
pub use core::clock::{mock_clock_advance, mock_clock_reset};
@ -43,8 +44,8 @@ pub use core::clock::{mock_clock_advance, mock_clock_reset};
pub use core::proxy::Proxy;
mod output;
pub use output::format::{Format, LineFormat, Print, Template};
pub use output::text::{Text, TextScope};
pub use output::format::{LineFormat, SimpleFormat, LineToken, LineTemplate, Formatting};
pub use output::stream::{Stream, TextScope};
pub use output::graphite::{Graphite, GraphiteScope, GraphiteMetric};
pub use output::statsd::{Statsd, StatsdScope, StatsdMetric};
pub use output::map::{StatsMap};

View File

@ -1,5 +1,7 @@
//! Publicly exposed metric macros are defined here.
pub use lazy_static::*;
// TODO add #[timer("name")] custom derive
/// A convenience macro to wrap a block or an expression with a start / stop timer.
@ -15,109 +17,147 @@ macro_rules! time {
}};
}
/// Create **Labels** from a list of key-value pairs
/// Adapted from the hashmap!() macro in the *maplit* crate.
///
/// ## Example
///
/// ```
/// #[macro_use] extern crate dipstick;
///
/// use dipstick::*;
///
/// # fn main() {
///
/// let labels = labels!{
///
/// "a" => "1",
/// "b" => "2",
/// };
/// assert_eq!(labels.lookup("a"), Some(::std::sync::Arc::new("1".into())));
/// assert_eq!(labels.lookup("b"), Some(::std::sync::Arc::new("2".into())));
/// assert_eq!(labels.lookup("c"), None);
/// # }
/// ```
#[macro_export]
macro_rules! labels {
(@single $($x:tt)*) => (());
(@count $($rest:expr),*) => (<[()]>::len(&[$(labels!(@single $rest)),*]));
($($key:expr => $value:expr,)+) => { labels!($($key => $value),+) };
($($key:expr => $value:expr),*) => {
{
let _cap = labels!(@count $($key),*);
let mut _map: ::std::collections::HashMap<String, ::std::sync::Arc<String>> = ::std::collections::HashMap::with_capacity(_cap);
$(
let _ = _map.insert($key.into(), ::std::sync::Arc::new($value.into()));
)*
::Labels::from(_map)
}
};
() => {
::Labels::default()
}
}
/// Metrics can be used from anywhere (public), does not need to declare metrics in this block.
#[macro_export]
macro_rules! metrics {
// BRANCH NODE - public type decl
($(#[$attr:meta])* pub $IDENT:ident: $TYPE:ty = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT: $TYPE = $e.into(); }
__in_context!{ $IDENT; $TYPE; $($BRANCH)* }
metrics!{ @internal $IDENT; $TYPE; $($BRANCH)* }
metrics!{ $($REST)* }
};
// BRANCH NODE - private typed decl
($(#[$attr:meta])* $IDENT:ident: $TYPE:ty = $e:expr => { $($BRANCH:tt)* } $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT: $TYPE = $e.into(); }
__in_context!{ $IDENT; $TYPE; $($BRANCH)* }
metrics!{ @internal $IDENT; $TYPE; $($BRANCH)* }
metrics!{ $($REST)* }
};
// BRANCH NODE - public untyped decl
($(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)* } $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT: Proxy = $e.into(); }
__in_context!{ $IDENT; Proxy; $($BRANCH)* }
metrics!{ @internal $IDENT; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// BRANCH NODE - private untyped decl
($(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)* } $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT: Proxy = $e.into(); }
__in_context!{ $IDENT; Proxy; $($BRANCH)* }
metrics!{ @internal $IDENT; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// BRANCH NODE - untyped expr
($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => {
__in_context!{ $e; Proxy; $($BRANCH)* }
metrics!{ @internal $e; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// LEAF NODE - public typed decl
($(#[$attr:meta])* pub $IDENT:ident: $TYPE:ty = $e:expr; $($REST:tt)*) => {
__in_context!{ Proxy::default(); Proxy; $(#[$attr])* pub $IDENT: $TYPE = $e; }
metrics!{ @internal Proxy::default(); Proxy; $(#[$attr])* pub $IDENT: $TYPE = $e; }
metrics!{ $($REST)* }
};
// LEAF NODE - private typed decl
($(#[$attr:meta])* $IDENT:ident: $TYPE:ty = $e:expr; $($REST:tt)*) => {
__in_context!{ Proxy::default(); Proxy; $(#[$attr])* $IDENT: $TYPE = $e; }
metrics!{ @internal Proxy::default(); Proxy; $(#[$attr])* $IDENT: $TYPE = $e; }
metrics!{ $($REST)* }
};
// END NODE
() => ()
}
() => ();
/// Internal macro required to abstract over pub/non-pub versions of the macro
#[macro_export]
#[doc(hidden)]
macro_rules! __in_context {
// METRIC NODE - public
($WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident: $MTY:ty = $METRIC_NAME:expr; $($REST:tt)*) => {
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident: $MTY:ty = $METRIC_NAME:expr; $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT: $MTY =
$WITH.new_metric($METRIC_NAME.into(), stringify!($MTY).into()).into();
}
__in_context!{ $WITH; $TY; $($REST)* }
metrics!{ @internal $WITH; $TY; $($REST)* }
};
// METRIC NODE - private
($WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident: $MTY:ty = $METRIC_NAME:expr; $($REST:tt)*) => {
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident: $MTY:ty = $METRIC_NAME:expr; $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT: $MTY =
$WITH.new_metric($METRIC_NAME.into(), stringify!($MTY).into()).into();
}
__in_context!{ $WITH; $TY; $($REST)* }
metrics!{ @internal $WITH; $TY; $($REST)* }
};
// SUB BRANCH NODE - public identifier
($WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* pub static ref $IDENT = $WITH.add_naming($e); }
__in_context!($IDENT; $TY; $($BRANCH)*);
__in_context!($WITH; $TY; $($REST)*);
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE - private identifier
($WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $IDENT:ident = $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
lazy_static! { $(#[$attr])* static ref $IDENT = $WITH.add_naming($e); }
__in_context!($IDENT; $TY; $($BRANCH)*);
__in_context!($WITH; $TY; $($REST)*);
metrics!( @internal $IDENT; $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
($WITH:expr; $TY:ty; $(#[$attr:meta])* pub $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
__in_context!($WITH.add_naming($e); $TY; $($BRANCH)*);
__in_context!($WITH; $TY; $($REST)*);
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* pub $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
// SUB BRANCH NODE (not yet)
($WITH:expr; $TY:ty; $(#[$attr:meta])* $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
__in_context!($WITH.add_naming($e); $TY; $($BRANCH)*);
__in_context!($WITH; $TY; $($REST)*);
(@internal $WITH:expr; $TY:ty; $(#[$attr:meta])* $e:expr => { $($BRANCH:tt)*} $($REST:tt)*) => {
metrics!( @internal $WITH.add_naming($e); $TY; $($BRANCH)*);
metrics!( @internal $WITH; $TY; $($REST)*);
};
($WITH:expr; $TYPE:ty;) => ()
(@internal $WITH:expr; $TYPE:ty;) => ()
}
#[cfg(test)]
mod test {
use core::input::*;

View File

@ -1,39 +1,45 @@
use core::name::Name;
use core::input::Kind;
use core::Value;
use self::Print::*;
use ::LabelValue;
use self::LineToken::*;
use std::io;
use std::sync::Arc;
/// Print commands are steps in the execution of output templates.
pub enum Print {
pub enum LineToken {
/// Print a string.
Literal(String),
/// Lookup and print label value for key, if it exists.
Label(PrintLabel),
LabelExists(String, Vec<LabelToken>),
/// Print metric value as text.
ValueAsText,
/// Print metric value, divided by the given scale, as text.
ScaledValueAsText(Value),
/// Print the newline character.
/// Print the newline character.labels.lookup(key)
NewLine,
}
/// Print commands are steps in the execution of output templates.
pub enum PrintLabel {
/// Lookup and print label value for key, if it exists.
Value(String),
pub enum LabelToken {
/// Print a string.
Literal(String),
/// Print the label key.
LabelKey,
/// Print the label value.
LabelValue,
}
/// An sequence of print commands, embodying an output strategy for a single metric.
pub struct Template {
commands: Vec<Print>
pub struct LineTemplate {
commands: Vec<LineToken>
}
impl Template {
impl LineTemplate {
/// Template execution applies commands in turn, writing to the output.
pub fn print<L: Fn(&str) -> Option<LabelValue>>(&self, output: &mut io::Write, value: Value, lookup: L) -> Result<(), io::Error> {
pub fn print<L>(&self, output: &mut io::Write, value: Value, lookup: L) -> Result<(), io::Error>
where L: Fn(&str) -> Option<Arc<String>>
{
for cmd in &self.commands {
match cmd {
Literal(src) => output.write_all(src.as_ref())?,
@ -43,36 +49,51 @@ impl Template {
output.write_all(format!("{}", scaled).as_ref())?
},
NewLine => writeln!(output)?,
Label(PrintLabel::Value(label_key)) => {
LabelExists(label_key, print_label) => {
if let Some(label_value) = lookup(label_key.as_ref()) {
output.write_all(label_value.as_bytes())?
for label_cmd in print_label {
match label_cmd {
LabelToken::LabelValue =>
output.write_all(label_value.as_bytes())?,
LabelToken::LabelKey =>
output.write_all(label_key.as_bytes())?,
LabelToken::Literal(src) =>
output.write_all(src.as_ref())?,
}
}
}
}
},
};
}
Ok(())
}
}
/// Format output config support.
pub trait Formatting {
/// Specify formatting of output.
fn formatting(&self, format: impl LineFormat + 'static) -> Self;
}
/// Forges metric-specific printers
pub trait Format: Send + Sync {
pub trait LineFormat: Send + Sync {
/// Prepare a template for output of metric values.
fn template(&self, name: &Name, kind: Kind) -> Template;
fn template(&self, name: &Name, kind: Kind) -> LineTemplate;
}
/// A simple metric output format of "MetricName {Value}"
#[derive(Default)]
pub struct LineFormat {
pub struct SimpleFormat {
// TODO make separator configurable
// separator: String,
}
impl Format for LineFormat {
fn template(&self, name: &Name, _kind: Kind) -> Template {
impl LineFormat for SimpleFormat {
fn template(&self, name: &Name, _kind: Kind) -> LineTemplate {
let mut header = name.join(".");
header.push(' ');
Template {
LineTemplate {
commands: vec![
Literal(header),
ValueAsText,
@ -84,3 +105,56 @@ impl Format for LineFormat {
}
#[cfg(test)]
pub mod test {
use super::*;
use ::Labels;
pub struct TestFormat;
impl LineFormat for TestFormat {
fn template(&self, name: &Name, kind: Kind) -> LineTemplate {
let mut header: String = format!("{:?}", kind);
header.push('/');
header.push_str(&name.join("."));
header.push(' ');
LineTemplate {
commands: vec![
Literal(header),
ValueAsText,
Literal(" ".into()),
ScaledValueAsText(1000),
Literal(" ".into()),
LabelExists("test_key".into(), vec![
LabelToken::LabelKey,
LabelToken::Literal("=".into()),
LabelToken::LabelValue]),
NewLine,
]
}
}
}
#[test]
fn print_label_exists() {
let labels: Labels = labels!("test_key" => "456");
let format = TestFormat {};
let mut name = Name::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |key| labels.lookup(key)).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 test_key=456\n", String::from_utf8(out).unwrap());
}
#[test]
fn print_label_not_exists() {
let format = TestFormat {};
let mut name = Name::from("abc");
name = name.prepend("xyz");
let template = format.template(&name, Kind::Counter);
let mut out = vec![];
template.print(&mut out, 123000, |_key| None).unwrap();
assert_eq!("Counter/xyz.abc 123000 123 \n", String::from_utf8(out).unwrap());
}
}

View File

@ -197,7 +197,7 @@ mod bench {
let sd = Graphite::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000, vec![])));
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
#[bench]
@ -206,7 +206,7 @@ mod bench {
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000, vec![])));
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}

View File

@ -5,8 +5,7 @@ use core::name::Name;
use core::error;
use cache::cache_in;
use queue::queue_in;
use ::{Format, LineFormat};
use ::LabelScope;
use output::format::{LineFormat, SimpleFormat, Formatting};
use std::sync::{RwLock, Arc};
use std::io::Write;
@ -16,7 +15,7 @@ use log;
#[derive(Clone)]
pub struct Log {
attributes: Attributes,
format: Arc<Format>,
format: Arc<LineFormat>,
}
impl Input for Log {
@ -38,6 +37,14 @@ impl WithAttributes for Log {
impl Buffered for Log {}
impl Formatting for Log {
fn formatting(&self, format: impl LineFormat + 'static) -> Self {
let mut cloned = self.clone();
cloned.format = Arc::new(format);
cloned
}
}
/// A scope for metrics log output.
#[derive(Clone)]
pub struct LogScope {
@ -52,7 +59,7 @@ impl Log {
pub fn log_to() -> Log {
Log {
attributes: Attributes::default(),
format: Arc::new(LineFormat::default()),
format: Arc::new(SimpleFormat::default()),
}
}
}
@ -78,7 +85,7 @@ impl InputScope for LogScope {
if let Some(_buffering) = self.get_buffering() {
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| LabelScope::lookup(key, &labels)) {
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {
let mut entries = entries.write().expect("TextOutput");
entries.push(buffer)
@ -90,7 +97,7 @@ impl InputScope for LogScope {
// unbuffered
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| LabelScope::lookup(key, &labels)) {
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => log!(log::Level::Debug, "{:?}", &buffer),
Err(err) => debug!("Could not format buffered log metric: {}", err),
}
@ -130,7 +137,7 @@ mod test {
fn test_to_log() {
let c = super::Log::log_to().input();
let m = c.new_metric("test".into(), Kind::Marker);
m.write(33, vec![]);
m.write(33, labels![]);
}
}

View File

@ -2,7 +2,7 @@ pub mod format;
pub mod map;
pub mod text;
pub mod stream;
pub mod log;

View File

@ -260,7 +260,7 @@ mod bench {
let sd = Statsd::send_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000, vec![])));
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
#[bench]
@ -269,7 +269,7 @@ mod bench {
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), Kind::Timer);
b.iter(|| test::black_box(timer.write(2000, vec![])));
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}

View File

@ -8,11 +8,10 @@ use core::attributes::{Attributes, WithAttributes, Buffered, Naming};
use core::name::Name;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
use ::LabelScope;
use ::{Format, LineFormat};
use cache::cache_out;
use queue::queue_out;
use output::format::{LineFormat, SimpleFormat, Formatting};
use std::sync::{RwLock, Arc};
use std::io::{Write, self};
@ -20,45 +19,53 @@ use std::rc::Rc;
use std::cell::RefCell;
/// Buffered metrics text output.
pub struct Text<W: Write + Send + Sync + 'static> {
pub struct Stream<W: Write + Send + Sync + 'static> {
attributes: Attributes,
format: Arc<Format + Send + Sync>,
format: Arc<LineFormat + Send + Sync>,
inner: Arc<RwLock<W>>,
}
impl<W: Write + Send + Sync + 'static> queue_out::QueuedOutput for Text<W> {}
impl<W: Write + Send + Sync + 'static> cache_out::CachedOutput for Text<W> {}
impl<W: Write + Send + Sync + 'static> queue_out::QueuedOutput for Stream<W> {}
impl<W: Write + Send + Sync + 'static> cache_out::CachedOutput for Stream<W> {}
impl<W: Write + Send + Sync + 'static> Text<W> {
impl<W: Write + Send + Sync + 'static> Formatting for Stream<W> {
fn formatting(&self, format: impl LineFormat + 'static) -> Self {
let mut cloned = self.clone();
cloned.format = Arc::new(format);
cloned
}
}
impl<W: Write + Send + Sync + 'static> Stream<W> {
/// Write metric values to provided Write target.
pub fn write_to(write: W) -> Text<W> {
Text {
pub fn write_to(write: W) -> Stream<W> {
Stream {
attributes: Attributes::default(),
format: Arc::new(LineFormat::default()),
format: Arc::new(SimpleFormat::default()),
inner: Arc::new(RwLock::new(write)),
}
}
}
impl Text<io::Stderr> {
impl Stream<io::Stderr> {
/// Write metric values to stdout.
pub fn stderr() -> Text<io::Stderr> {
Text::write_to(io::stderr())
pub fn stderr() -> Stream<io::Stderr> {
Stream::write_to(io::stderr())
}
}
impl Text<io::Stdout> {
impl Stream<io::Stdout> {
/// Write metric values to stdout.
pub fn stdout() -> Text<io::Stdout> {
Text::write_to(io::stdout())
pub fn stdout() -> Stream<io::Stdout> {
Stream::write_to(io::stdout())
}
}
// FIXME manual Clone impl required because auto-derive is borked (https://github.com/rust-lang/rust/issues/26925)
impl<W: Write + Send + Sync + 'static> Clone for Text<W> {
impl<W: Write + Send + Sync + 'static> Clone for Stream<W> {
fn clone(&self) -> Self {
Text {
Stream {
attributes: self.attributes.clone(),
format: self.format.clone(),
inner: self.inner.clone(),
@ -66,14 +73,14 @@ impl<W: Write + Send + Sync + 'static> Clone for Text<W> {
}
}
impl<W: Write + Send + Sync + 'static> WithAttributes for Text<W> {
impl<W: Write + Send + Sync + 'static> WithAttributes for Stream<W> {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl<W: Write + Send + Sync + 'static> Buffered for Text<W> {}
impl<W: Write + Send + Sync + 'static> Buffered for Stream<W> {}
impl<W: Write + Send + Sync + 'static> Output for Text<W> {
impl<W: Write + Send + Sync + 'static> Output for Stream<W> {
type SCOPE = TextScope<W>;
fn output(&self) -> Self::SCOPE {
@ -89,7 +96,7 @@ impl<W: Write + Send + Sync + 'static> Output for Text<W> {
pub struct TextScope<W: Write + Send + Sync + 'static> {
attributes: Attributes,
entries: Rc<RefCell<Vec<Vec<u8>>>>,
output: Text<W>,
output: Stream<W>,
}
@ -120,7 +127,7 @@ impl<W: Write + Send + Sync + 'static> OutputScope for TextScope<W> {
if let Some(_buffering) = self.get_buffering() {
OutputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| LabelScope::lookup(key, &labels)) {
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {
let mut entries = entries.borrow_mut();
entries.push(buffer)
@ -133,7 +140,7 @@ impl<W: Write + Send + Sync + 'static> OutputScope for TextScope<W> {
let output = self.output.clone();
OutputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| LabelScope::lookup(key, &labels)) {
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => {
let mut output = output.inner.write().expect("Metrics Text Output");
if let Err(e) = output.write_all(&buffer).and_then(|_| output.flush()) {
@ -178,8 +185,8 @@ mod test {
#[test]
fn sink_print() {
let c = super::Text::write_to(io::stdout()).output();
let c = super::Stream::write_to(io::stdout()).output();
let m = c.new_metric("test".into(), Kind::Marker);
m.write(33, vec![]);
m.write(33, labels![]);
}
}

View File

@ -9,7 +9,7 @@ use core::{Value, Flush};
use core::metrics;
use cache::cache_in::CachedInput;
use core::error;
use ::Labels;
use ::{ Labels};
use std::sync::Arc;
use std::sync::mpsc;
@ -90,7 +90,7 @@ impl Input for InputQueue {
/// Async commands should be of no concerns to applications.
pub enum InputQueueCmd {
/// Send metric write
Write(InputMetric, Value, Vec<Labels>),
Write(InputMetric, Value, Labels),
/// Send metric flush
Flush(Arc<InputScope + Send + Sync + 'static>),
}
@ -125,8 +125,8 @@ impl InputScope for InputQueueScope {
let name = self.naming_append(name);
let target_metric = self.target.new_metric(name, kind);
let sender = self.sender.clone();
InputMetric::new(move |value, labels| {
// TODO append snapshot (clone) of thread & app labels from this thread & time
InputMetric::new(move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels)) {
metrics::SEND_FAILED.mark();
debug!("Failed to send async metrics: {}", e);

View File

@ -10,7 +10,7 @@ use core::{Value, Flush};
use core::metrics;
use cache::cache_in;
use core::error;
use ::Labels;
use ::{Labels};
use std::rc::Rc;
use std::ops;
@ -95,7 +95,7 @@ impl Input for OutputQueue {
/// Async commands should be of no concerns to applications.
pub enum OutputQueueCmd {
/// Send metric write
Write(Arc<OutputMetric>, Value, Vec<Labels>),
Write(Arc<OutputMetric>, Value, Labels),
/// Send metric flush
Flush(Arc<UnsafeScope>),
}
@ -119,8 +119,8 @@ impl InputScope for OutputQueueScope {
let name = self.naming_append(name);
let target_metric = Arc::new(self.target.new_metric(name, kind));
let sender = self.sender.clone();
InputMetric::new(move |value, labels| {
// TODO append snapshot (clone) of thread & app labels from this thread & time
InputMetric::new(move |value, mut labels| {
labels.save_context();
if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) {
metrics::SEND_FAILED.mark();
debug!("Failed to send async metrics: {}", e);