From 963dfc0fd27585b8fd2c5a9df70e61670c96c70b Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Mon, 31 Dec 2018 17:03:34 -0500 Subject: [PATCH] Level Log Target --- Cargo.toml | 2 +- HANDBOOK.md | 24 ++- HANDBOOK.md.skt.md | 20 +-- README.md | 6 +- README.md.skt.md | 20 +-- examples/bench_bucket.rs | 3 +- examples/bench_bucket_proxy.rs | 2 +- examples/bench_queue.rs | 2 +- examples/bucket2graphite.rs | 2 +- examples/bucket2stdout.rs | 2 +- examples/bucket_cleanup.rs | 2 +- examples/bucket_summary.rs | 2 +- examples/clopwizard.rs | 2 +- examples/custom_publish.rs | 2 +- examples/prometheus.rs | 2 +- examples/raw_log.rs | 2 +- libtest.rmeta | 0 src/bucket/atomic.rs | 69 ++++----- src/core/input.rs | 2 +- src/core/label.rs | 49 ++++++ src/core/metrics.rs | 6 + src/core/scheduler.rs | 24 +++ src/lib.rs | 3 +- src/macros.rs | 20 ++- src/output/graphite.rs | 2 +- src/output/log.rs | 48 ++++-- src/output/prometheus.rs | 267 ++++++++++++++++++++------------- 27 files changed, 368 insertions(+), 217 deletions(-) create mode 100644 libtest.rmeta diff --git a/Cargo.toml b/Cargo.toml index 571a5c7..85de3f0 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ num = { version = "0.2", default-features = false } # FIXME required only for random seed for sampling time = "0.1" -prometheus = { version = "0.4" } +minreq = { version = "1.0.0" } # optional dep for standalone http pull metrics tiny_http = { version = "0.6", optional = true } diff --git a/HANDBOOK.md b/HANDBOOK.md index eb632a2..5b041a3 100755 --- a/HANDBOOK.md +++ b/HANDBOOK.md @@ -51,7 +51,7 @@ While timers internal precision are in nanoseconds, their accuracy depends on pl Timer's default output format is milliseconds but is scalable up or down. ```$rust,skt-run -let app_metrics = metric_scope(to_stdout()); +let app_metrics = Stream::to_stdout().input(); let timer = app_metrics.timer("my_timer"); time!(timer, {/* slow code here */} ); timer.time(|| {/* slow code here */} ); @@ -79,7 +79,7 @@ Names should exclude characters that can interfere with namespaces, separator an A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters. ```$rust,skt-run -let app_metrics = metric_scope(to_stdout()); +let app_metrics = Stream::to_stdout().input(); let db_metrics = app_metrics.add_prefix("database"); let _db_timer = db_metrics.timer("db_timer"); let _db_counter = db_metrics.counter("db_counter"); @@ -107,17 +107,12 @@ Notes about labels: Metric inputs are usually setup statically upon application startup. ```$rust,skt-run -#[macro_use] -extern crate dipstick; - -use dipstick::*; - metrics!("my_app" => { COUNTER_A: Counter = "counter_a"; }); fn main() { - route_aggregate_metrics(to_stdout()); + Proxy::set_default_target(Stream::to_stdout().input()); COUNTER_A.count(11); } ``` @@ -126,15 +121,16 @@ The static metric definition macro is just `lazy_static!` wrapper. ## Dynamic metrics -If necessary, metrics can also be defined "dynamically", with a possibly new name for every value. -This is more flexible but has a higher runtime cost, which may be alleviated with caching. +If necessary, metrics can also be defined "dynamically". +This is more flexible but has a higher runtime cost, which may be alleviated with the optional caching mechanism. ```$rust,skt-run let user_name = "john_day"; -let app_metrics = to_log().with_cache(512); -app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44); +let app_metrics = Log::to_log().cached(512).input(); +app_metrics.gauge(&format!("gauge_for_user_{}", user_name)).value(44); ``` +Alternatively, you may use `Labels` to output context-dependent metrics. ## Metrics Output A metrics library's second job is to help a program emit metric values that can be used in further systems. @@ -184,8 +180,8 @@ If enabled, buffering is usually a best-effort affair, to safely limit the amoun Some outputs such as statsd also have the ability to sample metrics. If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy. -```$rust,skt-fail -let _app_metrics = Statsd::send_to("server:8125")?.with_sampling_rate(0.01); +```$rust,skt-run,no_run +let _app_metrics = Statsd::send_to("server:8125")?.sampled(Sampling::Random(0.01)).input(); ``` diff --git a/HANDBOOK.md.skt.md b/HANDBOOK.md.skt.md index ece0fc5..8a232a5 100644 --- a/HANDBOOK.md.skt.md +++ b/HANDBOOK.md.skt.md @@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th #[macro_use] extern crate dipstick; use dipstick::*; -use std::time::Duration; - -fn main() {{ - {} -}} -``` - -```rust,skt-fail -extern crate dipstick; -use dipstick::*; -use std::result::Result; -use std::time::Duration; - -fn main() {{ - run().ok(); -}} - -fn run() -> Result<(), Error> {{ +fn main() -> std::result::Result<(), Box> {{ {} Ok(()) }} ``` - ```rust,skt-plain {} ``` \ No newline at end of file diff --git a/README.md b/README.md index 3c17427..5b7a966 100755 --- a/README.md +++ b/README.md @@ -46,10 +46,10 @@ Here's a basic aggregating & auto-publish counter metric: ```$rust,skt-run let bucket = AtomicBucket::new(); -bucket.set_target(Stream::stdout()); -bucket.flush_every(Duration::from_secs(3)); +bucket.set_drain(Stream::to_stdout()); +bucket.flush_every(std::time::Duration::from_secs(3)); let counter = bucket.counter("counter_a"); -counter.count(8) +counter.count(8); ``` Persistent apps wanting to declare static metrics will prefer using the `metrics!` macro: diff --git a/README.md.skt.md b/README.md.skt.md index ece0fc5..8a232a5 100644 --- a/README.md.skt.md +++ b/README.md.skt.md @@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th #[macro_use] extern crate dipstick; use dipstick::*; -use std::time::Duration; - -fn main() {{ - {} -}} -``` - -```rust,skt-fail -extern crate dipstick; -use dipstick::*; -use std::result::Result; -use std::time::Duration; - -fn main() {{ - run().ok(); -}} - -fn run() -> Result<(), Error> {{ +fn main() -> std::result::Result<(), Box> {{ {} Ok(()) }} ``` - ```rust,skt-plain {} ``` \ No newline at end of file diff --git a/examples/bench_bucket.rs b/examples/bench_bucket.rs index 41ea0b1..e932b6b 100755 --- a/examples/bench_bucket.rs +++ b/examples/bench_bucket.rs @@ -26,6 +26,7 @@ fn main() { }); } sleep(Duration::from_secs(5)); - bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap(); + bucket.set_stats(stats_all); + bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap(); } diff --git a/examples/bench_bucket_proxy.rs b/examples/bench_bucket_proxy.rs index 0aabc24..c9514b4 100755 --- a/examples/bench_bucket_proxy.rs +++ b/examples/bench_bucket_proxy.rs @@ -30,6 +30,6 @@ fn main() { }); } sleep(Duration::from_secs(5)); - bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap(); + bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap(); } diff --git a/examples/bench_queue.rs b/examples/bench_queue.rs index ab55955..b991589 100755 --- a/examples/bench_queue.rs +++ b/examples/bench_queue.rs @@ -27,6 +27,6 @@ fn main() { }); } sleep(Duration::from_secs(5)); - bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap(); + bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap(); } diff --git a/examples/bucket2graphite.rs b/examples/bucket2graphite.rs index d4be306..991b7db 100755 --- a/examples/bucket2graphite.rs +++ b/examples/bucket2graphite.rs @@ -10,7 +10,7 @@ fn main() { let bucket = AtomicBucket::new().add_prefix("test"); // Bucket::set_default_output(to_stdout()); - bucket.set_flush_to(Graphite::send_to("localhost:2003").expect("Socket") + bucket.set_drain(Graphite::send_to("localhost:2003").expect("Socket") .add_prefix("machine1").add_prefix("application")); bucket.flush_every(Duration::from_secs(3)); diff --git a/examples/bucket2stdout.rs b/examples/bucket2stdout.rs index a4deabd..a9c77b0 100755 --- a/examples/bucket2stdout.rs +++ b/examples/bucket2stdout.rs @@ -11,7 +11,7 @@ fn main() { let metrics = AtomicBucket::new().add_prefix("test"); // Bucket::set_default_output(to_stdout()); - metrics.set_flush_to(Stream::write_to(io::stdout())); + metrics.set_drain(Stream::write_to(io::stdout())); metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/bucket_cleanup.rs b/examples/bucket_cleanup.rs index f17dc7a..2d4cbd0 100755 --- a/examples/bucket_cleanup.rs +++ b/examples/bucket_cleanup.rs @@ -11,7 +11,7 @@ use std::thread::sleep; fn main() { let bucket = AtomicBucket::new(); - AtomicBucket::set_default_flush_to(Stream::write_to(io::stdout())); + AtomicBucket::set_default_drain(Stream::write_to(io::stdout())); let persistent_marker = bucket.marker("persistent"); diff --git a/examples/bucket_summary.rs b/examples/bucket_summary.rs index cff1533..993dc00 100644 --- a/examples/bucket_summary.rs +++ b/examples/bucket_summary.rs @@ -10,7 +10,7 @@ use std::io; fn main() { let app_metrics = AtomicBucket::new(); - app_metrics.set_flush_to(Stream::write_to(io::stdout())); + app_metrics.set_drain(Stream::write_to(io::stdout())); app_metrics.flush_every(Duration::from_secs(3)); diff --git a/examples/clopwizard.rs b/examples/clopwizard.rs index f424d50..aea9d1e 100644 --- a/examples/clopwizard.rs +++ b/examples/clopwizard.rs @@ -33,7 +33,7 @@ fn main() { // send application metrics to aggregator Proxy::default().set_target(all_buckets); - AtomicBucket::set_default_flush_to(Stream::to_stdout()); + AtomicBucket::set_default_drain(Stream::to_stdout()); AtomicBucket::set_default_stats(stats_all); loop { diff --git a/examples/custom_publish.rs b/examples/custom_publish.rs index 66fcc51..ac83d9b 100755 --- a/examples/custom_publish.rs +++ b/examples/custom_publish.rs @@ -42,7 +42,7 @@ fn main() { } // send application metrics to aggregator - AtomicBucket::set_default_flush_to(Stream::to_stderr()); + AtomicBucket::set_default_drain(Stream::to_stderr()); AtomicBucket::set_default_stats(custom_statistics); let app_metrics = AtomicBucket::new(); diff --git a/examples/prometheus.rs b/examples/prometheus.rs index f09a414..84f8fb7 100644 --- a/examples/prometheus.rs +++ b/examples/prometheus.rs @@ -7,7 +7,7 @@ use std::time::Duration; fn main() { let metrics = - Prometheus::send_json_to("localhost:2003") + Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example") .expect("Prometheus Socket") .add_prefix("my_app") .input(); diff --git a/examples/raw_log.rs b/examples/raw_log.rs index 9f9c084..c593c2a 100755 --- a/examples/raw_log.rs +++ b/examples/raw_log.rs @@ -12,7 +12,7 @@ fn main() { pub fn raw_write() { // setup dual metric channels - let metrics_log = dipstick::Log::log_to().input(); + let metrics_log = dipstick::Log::to_log().input(); // define and send metrics using raw channel API let counter = metrics_log.new_metric( diff --git a/libtest.rmeta b/libtest.rmeta new file mode 100644 index 0000000..e69de29 diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index 65687b7..a43ae1c 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -26,14 +26,14 @@ fn initial_stats() -> &'static StatsFn { &stats_summary } -fn initial_output() -> Arc { +fn initial_drain() -> Arc { Arc::new(output_none()) } lazy_static! { static ref DEFAULT_AGGREGATE_STATS: RwLock> = RwLock::new(Arc::new(initial_stats())); - static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = RwLock::new(initial_output()); + static ref DEFAULT_AGGREGATE_OUTPUT: RwLock> = RwLock::new(initial_drain()); } /// Central aggregation structure. @@ -49,7 +49,7 @@ struct InnerAtomicBucket { period_start: TimeHandle, stats: Option Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>, - output: Option>, + drain: Option>, publish_metadata: bool, } @@ -67,17 +67,12 @@ lazy_static! { impl InnerAtomicBucket { pub fn flush(&mut self) -> error::Result<()> { - let stats_fn = match self.stats { - Some(ref stats_fn) => stats_fn.clone(), - None => DEFAULT_AGGREGATE_STATS.read().unwrap().clone(), - }; - - let pub_scope = match self.output { + let pub_scope = match self.drain { Some(ref out) => out.output_dyn(), None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().output_dyn(), }; - self.flush_to(pub_scope.borrow(), stats_fn.as_ref())?; + self.flush_to(pub_scope.borrow())?; // all metrics published! // purge: if bucket is the last owner of the metric, remove it @@ -95,7 +90,7 @@ impl InnerAtomicBucket { /// Take a snapshot of aggregated values and reset them. /// Compute stats on captured values using assigned or default stats function. /// Write stats to assigned or default output. - pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> { + pub fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> { let now = TimeHandle::now(); let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0; @@ -119,9 +114,15 @@ impl InnerAtomicBucket { if self.publish_metadata { snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as isize)])); } + + let stats_fn = match self.stats { + Some(ref stats_fn) => stats_fn.clone(), + None => DEFAULT_AGGREGATE_STATS.read()?.clone(), + }; + for metric in snapshot { for score in metric.2 { - let filtered = stats(metric.1, metric.0.clone(), score); + let filtered = stats_fn(metric.1, metric.0.clone(), score); if let Some((kind, name, value)) = filtered { let metric: OutputMetric = target.new_metric(name, kind); // TODO provide some bucket context through labels? @@ -150,7 +151,7 @@ impl AtomicBucket { metrics: BTreeMap::new(), period_start: TimeHandle::now(), stats: None, - output: None, + drain: None, // TODO add API toggle for metadata publish publish_metadata: false, })) @@ -171,13 +172,13 @@ impl AtomicBucket { } /// Set the default bucket aggregated metrics flush output. - pub fn set_default_flush_to(default_config: impl Output + Send + Sync + 'static) { + pub fn set_default_drain(default_config: impl Output + Send + Sync + 'static) { *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(default_config); } /// Revert the default bucket aggregated metrics flush output. - pub fn unset_default_flush_to() { - *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_output() + pub fn unset_default_drain() { + *DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_drain() } /// Set this bucket's statistics generator. @@ -194,19 +195,19 @@ impl AtomicBucket { } /// Set this bucket's aggregated metrics flush output. - pub fn set_flush_to(&self, new_config: impl Output + Send + Sync + 'static) { - self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config)) + pub fn set_drain(&self, new_drain: impl Output + Send + Sync + 'static) { + self.inner.write().expect("Aggregator").drain = Some(Arc::new(new_drain)) } /// Revert this bucket's flush target to the default output. - pub fn unset_flush_to(&self) { - self.inner.write().expect("Aggregator").output = None + pub fn unset_drain(&self) { + self.inner.write().expect("Aggregator").drain = None } /// Immediately flush the bucket's metrics to the specified scope and stats. - pub fn flush_now_to(&self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> { + pub fn flush_to(&self, publish_scope: &OutputScope) -> error::Result<()> { let mut inner = self.inner.write().expect("Aggregator"); - inner.flush_to(publish_scope, stats_fn) + inner.flush_to(publish_scope) } } @@ -286,7 +287,6 @@ impl AtomicScores { // fetch_add only returns the previous sum, so min & max trail behind by one operation // instead, pickup the slack by comparing again with the final sum upon `snapshot` // this is to avoid making an extra load() on every value - // and should be correct, as no max or min values should be skipped over let prev_sum = self.scores[SUM].fetch_add(value, Relaxed); swap_if(&self.scores[MAX], prev_sum, |new, current| new > current); swap_if(&self.scores[MIN], prev_sum, |new, current| new < current); @@ -449,10 +449,11 @@ mod test { use std::time::Duration; use std::collections::BTreeMap; - fn make_stats(stats_fn: &StatsFn) -> BTreeMap { + fn make_stats(stats_fn: &'static StatsFn) -> BTreeMap { mock_clock_reset(); let metrics = AtomicBucket::new().add_prefix("test"); + metrics.set_stats(stats_fn); let counter = metrics.counter("counter_a"); let counter_b = metrics.counter("counter_b"); @@ -468,7 +469,7 @@ mod test { counter.count(10); counter.count(20); - counter_b.count(-6); + counter_b.count(9); counter_b.count(18); counter_b.count(3); @@ -484,9 +485,9 @@ mod test { mock_clock_advance(Duration::from_secs(3)); - let stats = StatsMap::default(); - metrics.flush_now_to(&stats, stats_fn).unwrap(); - stats.into() + let map = StatsMap::default(); + metrics.flush_to(&map).unwrap(); + map.into() } #[test] @@ -501,11 +502,11 @@ mod test { assert_eq!(map["test.counter_a.rate"], 10); assert_eq!(map["test.counter_b.count"], 3); - assert_eq!(map["test.counter_b.sum"], 15); - assert_eq!(map["test.counter_b.mean"], 5); - assert_eq!(map["test.counter_b.min"], -6); + assert_eq!(map["test.counter_b.sum"], 30); + assert_eq!(map["test.counter_b.mean"], 10); + assert_eq!(map["test.counter_b.min"], 3); assert_eq!(map["test.counter_b.max"], 18); - assert_eq!(map["test.counter_b.rate"], 5); + assert_eq!(map["test.counter_b.rate"], 10); assert_eq!(map["test.timer_a.count"], 2); assert_eq!(map["test.timer_a.sum"], 30_000_000); @@ -531,7 +532,7 @@ mod test { let map = make_stats(&stats_summary); assert_eq!(map["test.counter_a"], 30); - assert_eq!(map["test.counter_b"], 15); + assert_eq!(map["test.counter_b"], 30); assert_eq!(map["test.level_a"], 23596); assert_eq!(map["test.timer_a"], 30_000_000); assert_eq!(map["test.gauge_a"], 15); @@ -543,7 +544,7 @@ mod test { let map = make_stats(&stats_average); assert_eq!(map["test.counter_a"], 15); - assert_eq!(map["test.counter_b"], 5); + assert_eq!(map["test.counter_b"], 10); assert_eq!(map["test.level_a"], 23596); assert_eq!(map["test.timer_a"], 15_000_000); assert_eq!(map["test.gauge_a"], 15); diff --git a/src/core/input.rs b/src/core/input.rs index 9e306c6..0abb16a 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -140,7 +140,7 @@ impl Marker { /// - Records written /// - Apples eaten /// For relative (possibly negative) values, the `Level` counter type can be used. -/// If aggregated, minimum and maximum scores will track the collected values, not their sum. +/// If ag0gregated, minimum and maximum scores will track the collected values, not their sum. #[derive(Debug, Clone)] pub struct Counter { inner: InputMetric, diff --git a/src/core/label.rs b/src/core/label.rs index 57c70d7..9641142 100644 --- a/src/core/label.rs +++ b/src/core/label.rs @@ -52,6 +52,12 @@ impl LabelScope { Some(pairs) => pairs.get(key).cloned() } } + + fn collect(&self, map: &mut HashMap) { + if let Some(pairs) = &self.pairs { + map.extend(pairs.as_ref().clone().into_iter()) + } + } } lazy_static!( @@ -89,6 +95,12 @@ impl ThreadLabel { *map.borrow_mut() = new; }); } + + fn collect(map: &mut HashMap) { + THREAD_LABELS.with(|mop| { + mop.borrow().collect(map) + }); + } } /// Handle metric labels for the whole application (globals). @@ -114,6 +126,10 @@ impl AppLabel { let b = { APP_LABELS.read().expect("Global Labels").unset(key) }; *APP_LABELS.write().expect("App Labels Lock") = b; } + + fn collect(map: &mut HashMap) { + APP_LABELS.read().expect("Global Labels").collect(map) + } } @@ -182,6 +198,39 @@ impl Labels { } } } + + /// Export current state of labels to a map. + /// Note: An iterator would still need to allocate to check for uniqueness of keys. + /// + pub fn into_map(mut self) -> HashMap { + let mut map = HashMap::new(); + match self.scopes.len() { + // no value labels, no saved context labels + // just lookup implicit context + 0 => { + AppLabel::collect(&mut map); + ThreadLabel::collect(&mut map); + } + + // some value labels, no saved context labels + // lookup value label, then lookup implicit context + 1 => { + AppLabel::collect(&mut map); + ThreadLabel::collect(&mut map); + self.scopes[0].collect(&mut map); + }, + + // value + saved context labels + // lookup explicit context in turn + _ => { + self.scopes.reverse(); + for src in self.scopes { + src.collect(&mut map) + } + } + } + map + } } diff --git a/src/core/metrics.rs b/src/core/metrics.rs index 43cb699..8d20b33 100755 --- a/src/core/metrics.rs +++ b/src/core/metrics.rs @@ -14,6 +14,12 @@ metrics!{ pub SEND_FAILED: Marker = "send_failed"; } + "prometheus" => { + pub PROMETHEUS_SEND_ERR: Marker = "send_failed"; + pub PROMETHEUS_OVERFLOW: Marker = "buf_overflow"; + pub PROMETHEUS_SENT_BYTES: Counter = "sent_bytes"; + } + "graphite" => { pub GRAPHITE_SEND_ERR: Marker = "send_failed"; pub GRAPHITE_OVERFLOW: Marker = "buf_overflow"; diff --git a/src/core/scheduler.rs b/src/core/scheduler.rs index 3cf4425..2eba5c4 100644 --- a/src/core/scheduler.rs +++ b/src/core/scheduler.rs @@ -63,3 +63,27 @@ impl ScheduleFlush for T { }) } } + +//use std::net::{SocketAddr, ToSocketAddrs}; +// +//use tiny_http::{Server, StatusCode, self}; +// +//pub fn http_serve(addresses: A) -> CancelHandle { +// let handle = CancelHandle::new(); +// let inner_handle = handle.clone(); +// let server = tiny_http::Server::http("0.0.0.0:0")?; +// +// thread::spawn(move || loop { +// match server.recv_timeout(Duration::from_secs(1)) { +// Ok(Some(req)) => { +// let response = tiny_http::Response::new_empty(StatusCode::from(200)); +// if let Err(err) = req.respond(response) { +// warn!("Metrics response error: {}", err) +// } +// } +// Ok(None) => if inner_handle.is_cancelled() { break; } +// Err(err) => warn!("Metrics request error: {}", err) +// }; +// }); +// handle +//} diff --git a/src/lib.rs b/src/lib.rs index 3e60956..8171518 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,8 +19,7 @@ extern crate num; // FIXME required only for pcg32 seed (for sampling) extern crate time; -//#[cfg(feature="prometheus")] -extern crate prometheus; +//extern crate tiny_http; #[macro_use] mod macros; diff --git a/src/macros.rs b/src/macros.rs index c67f681..b21f851 100755 --- a/src/macros.rs +++ b/src/macros.rs @@ -91,12 +91,19 @@ macro_rules! metrics { metrics!{ $($REST)* } }; - // BRANCH NODE - untyped expr - ($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => { + // Identified Proxy Root + ($e:ident => { $($BRANCH:tt)+ } $($REST:tt)*) => { metrics!{ @internal $e; Proxy; $($BRANCH)* } metrics!{ $($REST)* } }; + // Anonymous Proxy Namespace + ($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => { + lazy_static! { static ref PROXY_METRICS: Proxy = $e.into(); } + metrics!{ @internal PROXY_METRICS; Proxy; $($BRANCH)* } + metrics!{ $($REST)* } + }; + // LEAF NODE - public typed decl ($(#[$attr:meta])* pub $IDENT:ident: $TYPE:ty = $e:expr; $($REST:tt)*) => { metrics!{ @internal Proxy::default(); Proxy; $(#[$attr])* pub $IDENT: $TYPE = $e; } @@ -170,6 +177,15 @@ mod test { T1: Timer = "failed"; }} + metrics!("my_app" => { + COUNTER_A: Counter = "counter_a"; + }); + + #[test] + fn gurp() { + COUNTER_A.count(11); + } + #[test] fn call_new_macro_defined_metrics() { M1.mark(); diff --git a/src/output/graphite.rs b/src/output/graphite.rs index 18297d2..ca9bfee 100755 --- a/src/output/graphite.rs +++ b/src/output/graphite.rs @@ -139,9 +139,9 @@ impl GraphiteScope { let mut sock = self.socket.write().expect("Lock Graphite Socket"); match sock.write_all(buf.as_bytes()) { Ok(()) => { - buf.clear(); metrics::GRAPHITE_SENT_BYTES.count(buf.len()); trace!("Sent {} bytes to graphite", buf.len()); + buf.clear(); Ok(()) } Err(e) => { diff --git a/src/output/log.rs b/src/output/log.rs index 61cf990..d7067ea 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -16,6 +16,8 @@ use log; pub struct Log { attributes: Attributes, format: Arc, + level: log::Level, + target: Option, } impl Input for Log { @@ -25,7 +27,7 @@ impl Input for Log { LogScope { attributes: self.attributes.clone(), entries: Arc::new(RwLock::new(Vec::new())), - output: self.clone(), + log: self.clone(), } } } @@ -50,18 +52,37 @@ impl Formatting for Log { pub struct LogScope { attributes: Attributes, entries: Arc>>>, - output: Log, + log: Log, } impl Log { /// Write metric values to the standard log using `info!`. // TODO parameterize log level, logger - pub fn log_to() -> Log { + pub fn to_log() -> Log { Log { attributes: Attributes::default(), format: Arc::new(SimpleFormat::default()), + level: log::Level::Info, + target: None } } + + /// Sets the log `target` to use when logging metrics. + /// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation. + pub fn level(&self, level: log::Level) -> Self { + let mut cloned = self.clone(); + cloned.level = level; + cloned + } + + /// Sets the log `target` to use when logging metrics. + /// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation. + pub fn target(&self, target: &str) -> Self { + let mut cloned = self.clone(); + cloned.target = Some(target.to_string()); + cloned + } + } impl WithAttributes for LogScope { @@ -77,12 +98,11 @@ impl cache_in::CachedInput for Log {} impl InputScope for LogScope { fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric { let name = self.prefix_append(name); - - let template = self.output.format.template(&name, kind); - + let template = self.log.format.template(&name, kind); let entries = self.entries.clone(); if let Some(_buffering) = self.get_buffering() { + // buffered InputMetric::new(move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { @@ -95,10 +115,16 @@ impl InputScope for LogScope { }) } else { // unbuffered + let level = self.log.level; + let target = self.log.target.clone(); InputMetric::new(move |value, labels| { let mut buffer = Vec::with_capacity(32); match template.print(&mut buffer, value, |key| labels.lookup(key)) { - Ok(()) => log!(log::Level::Debug, "{:?}", &buffer), + Ok(()) => if let Some(target) = &target { + log!(target: target, level, "{:?}", &buffer) + } else { + log!(level, "{:?}", &buffer) + } Err(err) => debug!("Could not format buffered log metric: {}", err), } }) @@ -115,7 +141,11 @@ impl Flush for LogScope { for entry in entries.drain(..) { writeln!(&mut buf, "{:?}", &entry)?; } - log!(log::Level::Debug, "{:?}", &buf); + if let Some(target) = &self.log.target { + log!(target: target, self.log.level, "{:?}", &buf) + } else { + log!(self.log.level, "{:?}", &buf) + } } Ok(()) } @@ -135,7 +165,7 @@ mod test { #[test] fn test_to_log() { - let c = super::Log::log_to().input(); + let c = super::Log::to_log().input(); let m = c.new_metric("test".into(), InputKind::Marker); m.write(33, labels![]); } diff --git a/src/output/prometheus.rs b/src/output/prometheus.rs index 2362308..95fee1f 100755 --- a/src/output/prometheus.rs +++ b/src/output/prometheus.rs @@ -1,58 +1,25 @@ -//! Prometheus-related functionality. -//! Both push and pull are supported. -//! Both protobuf and text format are supported. +//! Send metrics to a Prometheus server. -use core::{Flush}; -use core::input::{InputKind}; -use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed}; +use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed}; use core::name::MetricName; -use core::output::{Output, OutputMetric, OutputScope}; +use core::{Flush, MetricValue}; +use core::input::InputKind; +use core::metrics; +use core::output::{Output, OutputScope, OutputMetric}; use core::error; -use output::socket::RetrySocket; +use queue::queue_out; +use cache::cache_out; +use core::label::Labels; -use std::net::ToSocketAddrs; -use std::sync::{Arc, RwLock}; -use std::fmt::Debug; -use std::io::Write; +use std::rc::Rc; +use std::cell::{RefCell, RefMut}; -use prometheus::{Opts, Registry, IntGauge, IntCounter, Encoder, ProtobufEncoder, TextEncoder,}; - -metrics!{ -} - -#[derive(Clone, Debug)] -enum PrometheusEncoding { - JSON, - PROTOBUF, -} - -/// Prometheus push shared client -/// Holds a shared socket to a Prometheus host. +/// Prometheus output holds a socket to a Prometheus server. +/// The socket is shared between scopes opened from the output. #[derive(Clone, Debug)] pub struct Prometheus { attributes: Attributes, - socket: Arc>, - encoding: PrometheusEncoding, -} - -impl Prometheus { - /// Send metrics to a prometheus server at the address and port provided. - pub fn send_json_to(address: A) -> error::Result { - Ok(Prometheus { - attributes: Attributes::default(), - socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)), - encoding: PrometheusEncoding::JSON, - }) - } - - /// Send metrics to a prometheus server at the address and port provided. - pub fn send_protobuf_to(address: A) -> error::Result { - Ok(Prometheus { - attributes: Attributes::default(), - socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)), - encoding: PrometheusEncoding::PROTOBUF, - }) - } + push_url: String, } impl Output for Prometheus { @@ -61,85 +28,131 @@ impl Output for Prometheus { fn output(&self) -> Self::SCOPE { PrometheusScope { attributes: self.attributes.clone(), - registry: Registry::new(), - socket: self.socket.clone(), - encoding: self.encoding.clone(), + buffer: Rc::new(RefCell::new(String::new())), + push_url: self.push_url.clone(), } } } +impl Prometheus { + /// Send metrics to a Prometheus "push gateway" at the URL provided. + /// URL path must include group identifier labels `job` + /// as shown in https://github.com/prometheus/pushgateway#command-line + /// For example `http://pushgateway.example.org:9091/metrics/job/some_job` + pub fn push_to(url: &str) -> error::Result { + debug!("Pushing to Prometheus {:?}", url); + + Ok(Prometheus { + attributes: Attributes::default(), + push_url: url.to_string(), + }) + } +} + impl WithAttributes for Prometheus { fn get_attributes(&self) -> &Attributes { &self.attributes } fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } } -/// Prometheus push client scope -#[derive(Clone)] +impl Buffered for Prometheus {} + +/// Prometheus Input +#[derive(Debug, Clone)] pub struct PrometheusScope { attributes: Attributes, - registry: Registry, - socket: Arc>, - encoding: PrometheusEncoding, + buffer: Rc>, + push_url: String, } impl OutputScope for PrometheusScope { /// Define a metric of the specified type. - // TODO enable labels fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric { - let name = self.prefix_prepend(name).join("."); - match kind { - InputKind::Counter | InputKind::Timer => { - let counter = self.register_counter(name); - OutputMetric::new(move |value, _labels| counter.inc_by(value as i64)) - }, - InputKind::Marker => { - let counter = self.register_counter(name); - OutputMetric::new(move |_value, _labels| counter.inc()) - }, - InputKind::Gauge | InputKind::Level => self.new_gauge(name) - } + let prefix = self.prefix_prepend(name).join("_"); + + let scale = match kind { + // timers are in µs, but we give Prometheus milliseconds + InputKind::Timer => 1000, + _ => 1, + }; + + let cloned = self.clone(); + let metric = PrometheusMetric { prefix, scale }; + + OutputMetric::new(move |value, labels| { + cloned.print(&metric, value, labels); + }) } } -impl PrometheusScope { - - fn register_counter(&self, name: String) -> IntCounter { - let opts = Opts::new(name, "".into()); - let counter = IntCounter::with_opts(opts).expect("Prometheus IntCounter"); - self.registry.register(Box::new(counter.clone())).expect("Prometheus IntCounter"); - counter - } - - fn new_gauge(&self, name: String) -> OutputMetric { - let opts = Opts::new(name, "".into()); - let gauge = IntGauge::with_opts(opts).expect("Prometheus IntGauge"); - self.registry.register(Box::new(gauge.clone())).expect("Prometheus IntGauge"); - OutputMetric::new(move |value, _labels| - gauge.add(value as i64) - ) - } - -} - impl Flush for PrometheusScope { fn flush(&self) -> error::Result<()> { - let metric_families = self.registry.gather(); - let mut buffer = vec![]; + let buf = self.buffer.borrow_mut(); + self.flush_inner(buf) + } +} - match self.encoding { - PrometheusEncoding::JSON => { - let encoder = TextEncoder::new(); - encoder.encode(&metric_families, &mut buffer)? - }, - PrometheusEncoding::PROTOBUF => { - let encoder = ProtobufEncoder::new(); - encoder.encode(&metric_families, &mut buffer)? - }, +impl PrometheusScope { + fn print(&self, metric: &PrometheusMetric, value: MetricValue, labels: Labels) { + let scaled_value = value / metric.scale; + let value_str = scaled_value.to_string(); + + let mut strbuf = String::new(); + // prometheus format be like `http_requests_total{method="post",code="200"} 1027 1395066363000` + strbuf.push_str(&metric.prefix); + + let labels_map = labels.into_map(); + if !labels_map.is_empty() { + strbuf.push('{'); + let mut i = labels_map.into_iter(); + let mut next = i.next(); + while let Some((k, v)) = next { + strbuf.push_str(&k); + strbuf.push_str("=\""); + strbuf.push_str(&v); + next = i.next(); + if next.is_some() { + strbuf.push_str("\","); + } else { + strbuf.push('"'); + } + } + strbuf.push_str("} "); + } else { + strbuf.push(' '); } + strbuf.push_str(&value_str); - let mut socket = self.socket.write().expect("Lock Prometheus Socket"); - Ok(socket.write_all(&mut buffer)?) + let buffer = self.buffer.borrow_mut(); + if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD { + metrics::PROMETHEUS_OVERFLOW.mark(); + warn!("Prometheus Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD); + let _ = self.flush_inner(buffer); + } else { + if self.get_buffering().is_none() { + if let Err(e) = self.flush_inner(buffer) { + debug!("Could not send to Prometheus {}", e) + } + } + } + } + + fn flush_inner(&self, mut buf: RefMut) -> error::Result<()> { + if buf.is_empty() { return Ok(()) } + + match minreq::get(self.push_url.as_ref()).with_body(buf.as_ref()).send() { + Ok(_res) => { + metrics::PROMETHEUS_SENT_BYTES.count(buf.len()); + trace!("Sent {} bytes to Prometheus", buf.len()); + buf.clear(); + Ok(()) + } + Err(e) => { + metrics::PROMETHEUS_SEND_ERR.mark(); + debug!("Failed to send buffer to Prometheus: {}", e); + Err(e.into()) + } + } } } @@ -147,3 +160,55 @@ impl WithAttributes for PrometheusScope { fn get_attributes(&self) -> &Attributes { &self.attributes } fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes } } + +impl Buffered for PrometheusScope {} + +impl queue_out::QueuedOutput for Prometheus {} +impl cache_out::CachedOutput for Prometheus {} + +/// Its hard to see how a single scope could get more metrics than this. +// TODO make configurable? +const BUFFER_FLUSH_THRESHOLD: usize = 65_536; + +/// Key of a Prometheus metric. +#[derive(Debug, Clone)] +pub struct PrometheusMetric { + prefix: String, + scale: isize, +} + +/// Any remaining buffered data is flushed on Drop. +impl Drop for PrometheusScope { + fn drop(&mut self) { + if let Err(err) = self.flush() { + warn!("Could not flush Prometheus metrics upon Drop: {}", err) + } + } +} + +#[cfg(feature = "bench")] +mod bench { + + use core::attributes::*; + use core::input::*; + use super::*; + use test; + + #[bench] + pub fn immediate_prometheus(b: &mut test::Bencher) { + let sd = Prometheus::push_to("localhost:2003").unwrap().input(); + let timer = sd.new_metric("timer".into(), InputKind::Timer); + + b.iter(|| test::black_box(timer.write(2000, labels![]))); + } + + #[bench] + pub fn buffering_prometheus(b: &mut test::Bencher) { + let sd = Prometheus::push_to("localhost:2003").unwrap() + .buffered(Buffering::BufferSize(65465)).input(); + let timer = sd.new_metric("timer".into(), InputKind::Timer); + + b.iter(|| test::black_box(timer.write(2000, labels![]))); + } + +}