From 84a5e4c4d311e1e0ed2abb1bd8e8fd85a5a51a9e Mon Sep 17 00:00:00 2001 From: Francis Lalonde Date: Fri, 21 Jun 2019 19:42:57 -0400 Subject: [PATCH] Fixed prometheus buffer issue --- examples/prometheus.rs | 5 ++- src/output/prometheus.rs | 86 +++++++++++++++++++++++++++------------- 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/examples/prometheus.rs b/examples/prometheus.rs index 49a2e4a..d41056e 100644 --- a/examples/prometheus.rs +++ b/examples/prometheus.rs @@ -6,11 +6,14 @@ use dipstick::*; use std::time::Duration; fn main() { - let metrics = Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example") + let metrics = Prometheus::push_to("http://localhost:9091/metrics/job/prometheus_example") .expect("Prometheus Socket") .named("my_app") .metrics(); + AppLabel::set("abc", "456"); + ThreadLabel::set("xyz", "123"); + loop { metrics.counter("counter_a").count(123); metrics.timer("timer_a").interval_us(2000000); diff --git a/src/output/prometheus.rs b/src/output/prometheus.rs index 6d5e9c6..8ec82bb 100755 --- a/src/output/prometheus.rs +++ b/src/output/prometheus.rs @@ -127,7 +127,7 @@ impl PrometheusScope { } strbuf.push_str(&value_str); - let buffer = self.buffer.borrow_mut(); + let mut buffer = self.buffer.borrow_mut(); if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD { metrics::PROMETHEUS_OVERFLOW.mark(); warn!( @@ -135,9 +135,19 @@ impl PrometheusScope { BUFFER_FLUSH_THRESHOLD ); let _ = self.flush_inner(buffer); - } else if !self.is_buffered() { + buffer = self.buffer.borrow_mut(); + } + + if !buffer.is_empty() { + // separate from previous entry + buffer.push('\n') + } + + buffer.push_str(&strbuf); + + if !self.is_buffered() { if let Err(e) = self.flush_inner(buffer) { - debug!("Could not send to Prometheus {}", e) + debug!("Could not send to statsd {}", e) } } } @@ -147,6 +157,9 @@ impl PrometheusScope { return Ok(()); } +// println!("{}", buf.as_str()); +// buf.clear(); +// Ok(()) match minreq::get(self.push_url.as_str()) .with_body(buf.as_str()) .send() @@ -200,30 +213,47 @@ impl Drop for PrometheusScope { } } -#[cfg(feature = "bench")] -mod bench { +//#[cfg(test)] +//mod test { +// use super::*; +// use crate::core::input::InputKind; +// use std::io; +// use crate::core::input::Input; +// use crate::core::input::InputScope; +// +// #[test] +// fn sink_print() { +// let sd = Prometheus::push_to("localhost:2003").unwrap().metrics(); +// let timer = sd.new_metric("timer".into(), InputKind::Counter); +// sd.write(33, labels![]); +// } +//} - use super::*; - use crate::core::attributes::*; - use crate::core::input::*; - #[bench] - pub fn immediate_prometheus(b: &mut test::Bencher) { - let sd = Prometheus::push_to("localhost:2003").unwrap().metrics(); - let timer = sd.new_metric("timer".into(), InputKind::Timer); - - b.iter(|| test::black_box(timer.write(2000, labels![]))); - } - - #[bench] - pub fn buffering_prometheus(b: &mut test::Bencher) { - let sd = Prometheus::push_to("localhost:2003") - .unwrap() - .buffered(Buffering::BufferSize(65465)) - .metrics(); - let timer = sd.new_metric("timer".into(), InputKind::Timer); - - b.iter(|| test::black_box(timer.write(2000, labels![]))); - } - -} +//#[cfg(feature = "bench")] +//mod bench { +// +// use super::*; +// use crate::core::attributes::*; +// use crate::core::input::*; +// +// #[bench] +// pub fn immediate_prometheus(b: &mut test::Bencher) { +// let sd = Prometheus::push_to("localhost:2003").unwrap().metrics(); +// let timer = sd.new_metric("timer".into(), InputKind::Timer); +// +// b.iter(|| test::black_box(timer.write(2000, labels![]))); +// } +// +// #[bench] +// pub fn buffering_prometheus(b: &mut test::Bencher) { +// let sd = Prometheus::push_to("localhost:2003") +// .unwrap() +// .buffered(Buffering::BufferSize(65465)) +// .metrics(); +// let timer = sd.new_metric("timer".into(), InputKind::Timer); +// +// b.iter(|| test::black_box(timer.write(2000, labels![]))); +// } +// +//}