Fixed prometheus buffer issue

This commit is contained in:
Francis Lalonde 2019-06-21 19:42:57 -04:00
parent 6d6e26d8b2
commit 84a5e4c4d3
2 changed files with 62 additions and 29 deletions

View File

@ -6,11 +6,14 @@ use dipstick::*;
use std::time::Duration;
fn main() {
let metrics = Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example")
let metrics = Prometheus::push_to("http://localhost:9091/metrics/job/prometheus_example")
.expect("Prometheus Socket")
.named("my_app")
.metrics();
AppLabel::set("abc", "456");
ThreadLabel::set("xyz", "123");
loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);

View File

@ -127,7 +127,7 @@ impl PrometheusScope {
}
strbuf.push_str(&value_str);
let buffer = self.buffer.borrow_mut();
let mut buffer = self.buffer.borrow_mut();
if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD {
metrics::PROMETHEUS_OVERFLOW.mark();
warn!(
@ -135,9 +135,19 @@ impl PrometheusScope {
BUFFER_FLUSH_THRESHOLD
);
let _ = self.flush_inner(buffer);
} else if !self.is_buffered() {
buffer = self.buffer.borrow_mut();
}
if !buffer.is_empty() {
// separate from previous entry
buffer.push('\n')
}
buffer.push_str(&strbuf);
if !self.is_buffered() {
if let Err(e) = self.flush_inner(buffer) {
debug!("Could not send to Prometheus {}", e)
debug!("Could not send to statsd {}", e)
}
}
}
@ -147,6 +157,9 @@ impl PrometheusScope {
return Ok(());
}
// println!("{}", buf.as_str());
// buf.clear();
// Ok(())
match minreq::get(self.push_url.as_str())
.with_body(buf.as_str())
.send()
@ -200,30 +213,47 @@ impl Drop for PrometheusScope {
}
}
#[cfg(feature = "bench")]
mod bench {
//#[cfg(test)]
//mod test {
// use super::*;
// use crate::core::input::InputKind;
// use std::io;
// use crate::core::input::Input;
// use crate::core::input::InputScope;
//
// #[test]
// fn sink_print() {
// let sd = Prometheus::push_to("localhost:2003").unwrap().metrics();
// let timer = sd.new_metric("timer".into(), InputKind::Counter);
// sd.write(33, labels![]);
// }
//}
use super::*;
use crate::core::attributes::*;
use crate::core::input::*;
#[bench]
pub fn immediate_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003").unwrap().metrics();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
#[bench]
pub fn buffering_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003")
.unwrap()
.buffered(Buffering::BufferSize(65465))
.metrics();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}
//#[cfg(feature = "bench")]
//mod bench {
//
// use super::*;
// use crate::core::attributes::*;
// use crate::core::input::*;
//
// #[bench]
// pub fn immediate_prometheus(b: &mut test::Bencher) {
// let sd = Prometheus::push_to("localhost:2003").unwrap().metrics();
// let timer = sd.new_metric("timer".into(), InputKind::Timer);
//
// b.iter(|| test::black_box(timer.write(2000, labels![])));
// }
//
// #[bench]
// pub fn buffering_prometheus(b: &mut test::Bencher) {
// let sd = Prometheus::push_to("localhost:2003")
// .unwrap()
// .buffered(Buffering::BufferSize(65465))
// .metrics();
// let timer = sd.new_metric("timer".into(), InputKind::Timer);
//
// b.iter(|| test::black_box(timer.write(2000, labels![])));
// }
//
//}