Merge pull request #71 from grippy/grippy/prometheus

Prometheus Fixes
This commit is contained in:
Francis Lalonde 2019-06-22 07:51:05 -04:00 committed by GitHub
commit e800701072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 8 deletions

View File

@ -0,0 +1,60 @@
//! Use the proxy to pipeline metrics to multiple outputs
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
/// Create a pipeline that branches out...
/// The key here is to use AtomicBucket to read
/// from the proxy and aggregate and flush metrics
///
/// Proxy
/// -> AtomicBucket
/// -> MultiOutput
/// -> Prometheus
/// -> Statsd
/// -> stdout
metrics! {
pub PROXY: Proxy = "my_proxy" => {}
}
fn main() {
// Placeholder to collect output targets
// This will prefix all metrics with "my_stats"
// before flushing them.
let mut targets = MultiOutput::new().named("my_stats");
// Skip the metrics here... we just use this for the output
// Follow the same pattern for Statsd, Graphite, etc.
let prometheus = Prometheus::push_to("http://localhost:9091/metrics/job/dipstick_example")
.expect("Prometheus Socket");
targets = targets.add_target(prometheus);
// Add stdout
targets = targets.add_target(Stream::to_stdout());
// Create the bucket and drain targets
let bucket = AtomicBucket::new();
bucket.drain(targets);
// Crucial, set the flush interval, otherwise risk hammering targets
bucket.flush_every(Duration::from_secs(3));
// Now wire up the proxy target with the bucket and you're all set
let proxy = Proxy::default();
proxy.target(bucket.clone());
// Example using the macro! Proxy sugar
PROXY.target(bucket.named("global"));
loop {
// Using the default proxy
proxy.counter("beans").count(100);
proxy.timer("braincells").interval_us(420);
// global example
PROXY.counter("my_proxy_counter").count(123);
PROXY.timer("my_proxy_timer").interval_us(2000000);
std::thread::sleep(Duration::from_millis(100));
}
}

View File

@ -138,13 +138,13 @@ impl PrometheusScope {
buffer = self.buffer.borrow_mut();
}
buffer.push_str(&strbuf);
if !buffer.is_empty() {
// separate from previous entry
buffer.push('\n')
}
buffer.push_str(&strbuf);
if !self.is_buffered() {
if let Err(e) = self.flush_inner(buffer) {
debug!("Could not send to statsd {}", e)
@ -157,16 +157,17 @@ impl PrometheusScope {
return Ok(());
}
// println!("{}", buf.as_str());
// buf.clear();
// Ok(())
match minreq::get(self.push_url.as_str())
match minreq::post(self.push_url.as_str())
.with_body(buf.as_str())
.send()
{
Ok(_res) => {
metrics::PROMETHEUS_SENT_BYTES.count(buf.len());
trace!("Sent {} bytes to Prometheus", buf.len());
trace!(
"Sent {} bytes to Prometheus (resp status code: {})",
buf.len(),
_res.status_code
);
buf.clear();
Ok(())
}
@ -229,7 +230,6 @@ impl Drop for PrometheusScope {
// }
//}
//#[cfg(feature = "bench")]
//mod bench {
//