Level Log Target

This commit is contained in:
Francis Lalonde 2018-12-31 17:03:34 -05:00
parent 456f096981
commit 963dfc0fd2
27 changed files with 368 additions and 217 deletions

View File

@ -27,7 +27,7 @@ num = { version = "0.2", default-features = false }
# FIXME required only for random seed for sampling
time = "0.1"
prometheus = { version = "0.4" }
minreq = { version = "1.0.0" }
# optional dep for standalone http pull metrics
tiny_http = { version = "0.6", optional = true }

View File

@ -51,7 +51,7 @@ While timers internal precision are in nanoseconds, their accuracy depends on pl
Timer's default output format is milliseconds but is scalable up or down.
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let app_metrics = Stream::to_stdout().input();
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
@ -79,7 +79,7 @@ Names should exclude characters that can interfere with namespaces, separator an
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let app_metrics = Stream::to_stdout().input();
let db_metrics = app_metrics.add_prefix("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
@ -107,17 +107,12 @@ Notes about labels:
Metric inputs are usually setup statically upon application startup.
```$rust,skt-run
#[macro_use]
extern crate dipstick;
use dipstick::*;
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
fn main() {
route_aggregate_metrics(to_stdout());
Proxy::set_default_target(Stream::to_stdout().input());
COUNTER_A.count(11);
}
```
@ -126,15 +121,16 @@ The static metric definition macro is just `lazy_static!` wrapper.
## Dynamic metrics
If necessary, metrics can also be defined "dynamically", with a possibly new name for every value.
This is more flexible but has a higher runtime cost, which may be alleviated with caching.
If necessary, metrics can also be defined "dynamically".
This is more flexible but has a higher runtime cost, which may be alleviated with the optional caching mechanism.
```$rust,skt-run
let user_name = "john_day";
let app_metrics = to_log().with_cache(512);
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
let app_metrics = Log::to_log().cached(512).input();
app_metrics.gauge(&format!("gauge_for_user_{}", user_name)).value(44);
```
Alternatively, you may use `Labels` to output context-dependent metrics.
## Metrics Output
A metrics library's second job is to help a program emit metric values that can be used in further systems.
@ -184,8 +180,8 @@ If enabled, buffering is usually a best-effort affair, to safely limit the amoun
Some outputs such as statsd also have the ability to sample metrics.
If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy.
```$rust,skt-fail
let _app_metrics = Statsd::send_to("server:8125")?.with_sampling_rate(0.01);
```$rust,skt-run,no_run
let _app_metrics = Statsd::send_to("server:8125")?.sampled(Sampling::Random(0.01)).input();
```

View File

@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {{
{}
}}
```
```rust,skt-fail
extern crate dipstick;
use dipstick::*;
use std::result::Result;
use std::time::Duration;
fn main() {{
run().ok();
}}
fn run() -> Result<(), Error> {{
fn main() -> std::result::Result<(), Box<std::error::Error>> {{
{}
Ok(())
}}
```
```rust,skt-plain
{}
```

View File

@ -46,10 +46,10 @@ Here's a basic aggregating & auto-publish counter metric:
```$rust,skt-run
let bucket = AtomicBucket::new();
bucket.set_target(Stream::stdout());
bucket.flush_every(Duration::from_secs(3));
bucket.set_drain(Stream::to_stdout());
bucket.flush_every(std::time::Duration::from_secs(3));
let counter = bucket.counter("counter_a");
counter.count(8)
counter.count(8);
```
Persistent apps wanting to declare static metrics will prefer using the `metrics!` macro:

View File

@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {{
{}
}}
```
```rust,skt-fail
extern crate dipstick;
use dipstick::*;
use std::result::Result;
use std::time::Duration;
fn main() {{
run().ok();
}}
fn run() -> Result<(), Error> {{
fn main() -> std::result::Result<(), Box<std::error::Error>> {{
{}
Ok(())
}}
```
```rust,skt-plain
{}
```

View File

@ -26,6 +26,7 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.set_stats(stats_all);
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -30,6 +30,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -27,6 +27,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -10,7 +10,7 @@ fn main() {
let bucket = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
bucket.set_flush_to(Graphite::send_to("localhost:2003").expect("Socket")
bucket.set_drain(Graphite::send_to("localhost:2003").expect("Socket")
.add_prefix("machine1").add_prefix("application"));
bucket.flush_every(Duration::from_secs(3));

View File

@ -11,7 +11,7 @@ fn main() {
let metrics = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
metrics.set_flush_to(Stream::write_to(io::stdout()));
metrics.set_drain(Stream::write_to(io::stdout()));
metrics.flush_every(Duration::from_secs(3));

View File

@ -11,7 +11,7 @@ use std::thread::sleep;
fn main() {
let bucket = AtomicBucket::new();
AtomicBucket::set_default_flush_to(Stream::write_to(io::stdout()));
AtomicBucket::set_default_drain(Stream::write_to(io::stdout()));
let persistent_marker = bucket.marker("persistent");

View File

@ -10,7 +10,7 @@ use std::io;
fn main() {
let app_metrics = AtomicBucket::new();
app_metrics.set_flush_to(Stream::write_to(io::stdout()));
app_metrics.set_drain(Stream::write_to(io::stdout()));
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -33,7 +33,7 @@ fn main() {
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
AtomicBucket::set_default_flush_to(Stream::to_stdout());
AtomicBucket::set_default_drain(Stream::to_stdout());
AtomicBucket::set_default_stats(stats_all);
loop {

View File

@ -42,7 +42,7 @@ fn main() {
}
// send application metrics to aggregator
AtomicBucket::set_default_flush_to(Stream::to_stderr());
AtomicBucket::set_default_drain(Stream::to_stderr());
AtomicBucket::set_default_stats(custom_statistics);
let app_metrics = AtomicBucket::new();

View File

@ -7,7 +7,7 @@ use std::time::Duration;
fn main() {
let metrics =
Prometheus::send_json_to("localhost:2003")
Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example")
.expect("Prometheus Socket")
.add_prefix("my_app")
.input();

View File

@ -12,7 +12,7 @@ fn main() {
pub fn raw_write() {
// setup dual metric channels
let metrics_log = dipstick::Log::log_to().input();
let metrics_log = dipstick::Log::to_log().input();
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(

0
libtest.rmeta Normal file
View File

View File

@ -26,14 +26,14 @@ fn initial_stats() -> &'static StatsFn {
&stats_summary
}
fn initial_output() -> Arc<OutputDyn + Send + Sync> {
fn initial_drain() -> Arc<OutputDyn + Send + Sync> {
Arc::new(output_none())
}
lazy_static! {
static ref DEFAULT_AGGREGATE_STATS: RwLock<Arc<StatsFn>> = RwLock::new(Arc::new(initial_stats()));
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OutputDyn + Send + Sync>> = RwLock::new(initial_output());
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OutputDyn + Send + Sync>> = RwLock::new(initial_drain());
}
/// Central aggregation structure.
@ -49,7 +49,7 @@ struct InnerAtomicBucket {
period_start: TimeHandle,
stats: Option<Arc<Fn(InputKind, MetricName, ScoreType)
-> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>,
output: Option<Arc<OutputDyn + Send + Sync + 'static>>,
drain: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
}
@ -67,17 +67,12 @@ lazy_static! {
impl InnerAtomicBucket {
pub fn flush(&mut self) -> error::Result<()> {
let stats_fn = match self.stats {
Some(ref stats_fn) => stats_fn.clone(),
None => DEFAULT_AGGREGATE_STATS.read().unwrap().clone(),
};
let pub_scope = match self.output {
let pub_scope = match self.drain {
Some(ref out) => out.output_dyn(),
None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().output_dyn(),
};
self.flush_to(pub_scope.borrow(), stats_fn.as_ref())?;
self.flush_to(pub_scope.borrow())?;
// all metrics published!
// purge: if bucket is the last owner of the metric, remove it
@ -95,7 +90,7 @@ impl InnerAtomicBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> {
pub fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
@ -119,9 +114,15 @@ impl InnerAtomicBucket {
if self.publish_metadata {
snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as isize)]));
}
let stats_fn = match self.stats {
Some(ref stats_fn) => stats_fn.clone(),
None => DEFAULT_AGGREGATE_STATS.read()?.clone(),
};
for metric in snapshot {
for score in metric.2 {
let filtered = stats(metric.1, metric.0.clone(), score);
let filtered = stats_fn(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: OutputMetric = target.new_metric(name, kind);
// TODO provide some bucket context through labels?
@ -150,7 +151,7 @@ impl AtomicBucket {
metrics: BTreeMap::new(),
period_start: TimeHandle::now(),
stats: None,
output: None,
drain: None,
// TODO add API toggle for metadata publish
publish_metadata: false,
}))
@ -171,13 +172,13 @@ impl AtomicBucket {
}
/// Set the default bucket aggregated metrics flush output.
pub fn set_default_flush_to(default_config: impl Output + Send + Sync + 'static) {
pub fn set_default_drain(default_config: impl Output + Send + Sync + 'static) {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(default_config);
}
/// Revert the default bucket aggregated metrics flush output.
pub fn unset_default_flush_to() {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_output()
pub fn unset_default_drain() {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_drain()
}
/// Set this bucket's statistics generator.
@ -194,19 +195,19 @@ impl AtomicBucket {
}
/// Set this bucket's aggregated metrics flush output.
pub fn set_flush_to(&self, new_config: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config))
pub fn set_drain(&self, new_drain: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").drain = Some(Arc::new(new_drain))
}
/// Revert this bucket's flush target to the default output.
pub fn unset_flush_to(&self) {
self.inner.write().expect("Aggregator").output = None
pub fn unset_drain(&self) {
self.inner.write().expect("Aggregator").drain = None
}
/// Immediately flush the bucket's metrics to the specified scope and stats.
pub fn flush_now_to(&self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> {
pub fn flush_to(&self, publish_scope: &OutputScope) -> error::Result<()> {
let mut inner = self.inner.write().expect("Aggregator");
inner.flush_to(publish_scope, stats_fn)
inner.flush_to(publish_scope)
}
}
@ -286,7 +287,6 @@ impl AtomicScores {
// fetch_add only returns the previous sum, so min & max trail behind by one operation
// instead, pickup the slack by comparing again with the final sum upon `snapshot`
// this is to avoid making an extra load() on every value
// and should be correct, as no max or min values should be skipped over
let prev_sum = self.scores[SUM].fetch_add(value, Relaxed);
swap_if(&self.scores[MAX], prev_sum, |new, current| new > current);
swap_if(&self.scores[MIN], prev_sum, |new, current| new < current);
@ -449,10 +449,11 @@ mod test {
use std::time::Duration;
use std::collections::BTreeMap;
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, MetricValue> {
fn make_stats(stats_fn: &'static StatsFn) -> BTreeMap<String, MetricValue> {
mock_clock_reset();
let metrics = AtomicBucket::new().add_prefix("test");
metrics.set_stats(stats_fn);
let counter = metrics.counter("counter_a");
let counter_b = metrics.counter("counter_b");
@ -468,7 +469,7 @@ mod test {
counter.count(10);
counter.count(20);
counter_b.count(-6);
counter_b.count(9);
counter_b.count(18);
counter_b.count(3);
@ -484,9 +485,9 @@ mod test {
mock_clock_advance(Duration::from_secs(3));
let stats = StatsMap::default();
metrics.flush_now_to(&stats, stats_fn).unwrap();
stats.into()
let map = StatsMap::default();
metrics.flush_to(&map).unwrap();
map.into()
}
#[test]
@ -501,11 +502,11 @@ mod test {
assert_eq!(map["test.counter_a.rate"], 10);
assert_eq!(map["test.counter_b.count"], 3);
assert_eq!(map["test.counter_b.sum"], 15);
assert_eq!(map["test.counter_b.mean"], 5);
assert_eq!(map["test.counter_b.min"], -6);
assert_eq!(map["test.counter_b.sum"], 30);
assert_eq!(map["test.counter_b.mean"], 10);
assert_eq!(map["test.counter_b.min"], 3);
assert_eq!(map["test.counter_b.max"], 18);
assert_eq!(map["test.counter_b.rate"], 5);
assert_eq!(map["test.counter_b.rate"], 10);
assert_eq!(map["test.timer_a.count"], 2);
assert_eq!(map["test.timer_a.sum"], 30_000_000);
@ -531,7 +532,7 @@ mod test {
let map = make_stats(&stats_summary);
assert_eq!(map["test.counter_a"], 30);
assert_eq!(map["test.counter_b"], 15);
assert_eq!(map["test.counter_b"], 30);
assert_eq!(map["test.level_a"], 23596);
assert_eq!(map["test.timer_a"], 30_000_000);
assert_eq!(map["test.gauge_a"], 15);
@ -543,7 +544,7 @@ mod test {
let map = make_stats(&stats_average);
assert_eq!(map["test.counter_a"], 15);
assert_eq!(map["test.counter_b"], 5);
assert_eq!(map["test.counter_b"], 10);
assert_eq!(map["test.level_a"], 23596);
assert_eq!(map["test.timer_a"], 15_000_000);
assert_eq!(map["test.gauge_a"], 15);

View File

@ -140,7 +140,7 @@ impl Marker {
/// - Records written
/// - Apples eaten
/// For relative (possibly negative) values, the `Level` counter type can be used.
/// If aggregated, minimum and maximum scores will track the collected values, not their sum.
/// If ag0gregated, minimum and maximum scores will track the collected values, not their sum.
#[derive(Debug, Clone)]
pub struct Counter {
inner: InputMetric,

View File

@ -52,6 +52,12 @@ impl LabelScope {
Some(pairs) => pairs.get(key).cloned()
}
}
fn collect(&self, map: &mut HashMap<String, LabelValue>) {
if let Some(pairs) = &self.pairs {
map.extend(pairs.as_ref().clone().into_iter())
}
}
}
lazy_static!(
@ -89,6 +95,12 @@ impl ThreadLabel {
*map.borrow_mut() = new;
});
}
fn collect(map: &mut HashMap<String, LabelValue>) {
THREAD_LABELS.with(|mop| {
mop.borrow().collect(map)
});
}
}
/// Handle metric labels for the whole application (globals).
@ -114,6 +126,10 @@ impl AppLabel {
let b = { APP_LABELS.read().expect("Global Labels").unset(key) };
*APP_LABELS.write().expect("App Labels Lock") = b;
}
fn collect(map: &mut HashMap<String, LabelValue>) {
APP_LABELS.read().expect("Global Labels").collect(map)
}
}
@ -182,6 +198,39 @@ impl Labels {
}
}
}
/// Export current state of labels to a map.
/// Note: An iterator would still need to allocate to check for uniqueness of keys.
///
pub fn into_map(mut self) -> HashMap<String, LabelValue> {
let mut map = HashMap::new();
match self.scopes.len() {
// no value labels, no saved context labels
// just lookup implicit context
0 => {
AppLabel::collect(&mut map);
ThreadLabel::collect(&mut map);
}
// some value labels, no saved context labels
// lookup value label, then lookup implicit context
1 => {
AppLabel::collect(&mut map);
ThreadLabel::collect(&mut map);
self.scopes[0].collect(&mut map);
},
// value + saved context labels
// lookup explicit context in turn
_ => {
self.scopes.reverse();
for src in self.scopes {
src.collect(&mut map)
}
}
}
map
}
}

View File

@ -14,6 +14,12 @@ metrics!{
pub SEND_FAILED: Marker = "send_failed";
}
"prometheus" => {
pub PROMETHEUS_SEND_ERR: Marker = "send_failed";
pub PROMETHEUS_OVERFLOW: Marker = "buf_overflow";
pub PROMETHEUS_SENT_BYTES: Counter = "sent_bytes";
}
"graphite" => {
pub GRAPHITE_SEND_ERR: Marker = "send_failed";
pub GRAPHITE_OVERFLOW: Marker = "buf_overflow";

View File

@ -63,3 +63,27 @@ impl<T: InputScope + Send + Sync + Clone + 'static> ScheduleFlush for T {
})
}
}
//use std::net::{SocketAddr, ToSocketAddrs};
//
//use tiny_http::{Server, StatusCode, self};
//
//pub fn http_serve<A: ToSocketAddrs, F: Fn()>(addresses: A) -> CancelHandle {
// let handle = CancelHandle::new();
// let inner_handle = handle.clone();
// let server = tiny_http::Server::http("0.0.0.0:0")?;
//
// thread::spawn(move || loop {
// match server.recv_timeout(Duration::from_secs(1)) {
// Ok(Some(req)) => {
// let response = tiny_http::Response::new_empty(StatusCode::from(200));
// if let Err(err) = req.respond(response) {
// warn!("Metrics response error: {}", err)
// }
// }
// Ok(None) => if inner_handle.is_cancelled() { break; }
// Err(err) => warn!("Metrics request error: {}", err)
// };
// });
// handle
//}

View File

@ -19,8 +19,7 @@ extern crate num;
// FIXME required only for pcg32 seed (for sampling)
extern crate time;
//#[cfg(feature="prometheus")]
extern crate prometheus;
//extern crate tiny_http;
#[macro_use]
mod macros;

View File

@ -91,12 +91,19 @@ macro_rules! metrics {
metrics!{ $($REST)* }
};
// BRANCH NODE - untyped expr
($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => {
// Identified Proxy Root
($e:ident => { $($BRANCH:tt)+ } $($REST:tt)*) => {
metrics!{ @internal $e; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// Anonymous Proxy Namespace
($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => {
lazy_static! { static ref PROXY_METRICS: Proxy = $e.into(); }
metrics!{ @internal PROXY_METRICS; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// LEAF NODE - public typed decl
($(#[$attr:meta])* pub $IDENT:ident: $TYPE:ty = $e:expr; $($REST:tt)*) => {
metrics!{ @internal Proxy::default(); Proxy; $(#[$attr])* pub $IDENT: $TYPE = $e; }
@ -170,6 +177,15 @@ mod test {
T1: Timer = "failed";
}}
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
#[test]
fn gurp() {
COUNTER_A.count(11);
}
#[test]
fn call_new_macro_defined_metrics() {
M1.mark();

View File

@ -139,9 +139,9 @@ impl GraphiteScope {
let mut sock = self.socket.write().expect("Lock Graphite Socket");
match sock.write_all(buf.as_bytes()) {
Ok(()) => {
buf.clear();
metrics::GRAPHITE_SENT_BYTES.count(buf.len());
trace!("Sent {} bytes to graphite", buf.len());
buf.clear();
Ok(())
}
Err(e) => {

View File

@ -16,6 +16,8 @@ use log;
pub struct Log {
attributes: Attributes,
format: Arc<LineFormat>,
level: log::Level,
target: Option<String>,
}
impl Input for Log {
@ -25,7 +27,7 @@ impl Input for Log {
LogScope {
attributes: self.attributes.clone(),
entries: Arc::new(RwLock::new(Vec::new())),
output: self.clone(),
log: self.clone(),
}
}
}
@ -50,18 +52,37 @@ impl Formatting for Log {
pub struct LogScope {
attributes: Attributes,
entries: Arc<RwLock<Vec<Vec<u8>>>>,
output: Log,
log: Log,
}
impl Log {
/// Write metric values to the standard log using `info!`.
// TODO parameterize log level, logger
pub fn log_to() -> Log {
pub fn to_log() -> Log {
Log {
attributes: Attributes::default(),
format: Arc::new(SimpleFormat::default()),
level: log::Level::Info,
target: None
}
}
/// Sets the log `target` to use when logging metrics.
/// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation.
pub fn level(&self, level: log::Level) -> Self {
let mut cloned = self.clone();
cloned.level = level;
cloned
}
/// Sets the log `target` to use when logging metrics.
/// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation.
pub fn target(&self, target: &str) -> Self {
let mut cloned = self.clone();
cloned.target = Some(target.to_string());
cloned
}
}
impl WithAttributes for LogScope {
@ -77,12 +98,11 @@ impl cache_in::CachedInput for Log {}
impl InputScope for LogScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
let template = self.log.format.template(&name, kind);
let entries = self.entries.clone();
if let Some(_buffering) = self.get_buffering() {
// buffered
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
@ -95,10 +115,16 @@ impl InputScope for LogScope {
})
} else {
// unbuffered
let level = self.log.level;
let target = self.log.target.clone();
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => log!(log::Level::Debug, "{:?}", &buffer),
Ok(()) => if let Some(target) = &target {
log!(target: target, level, "{:?}", &buffer)
} else {
log!(level, "{:?}", &buffer)
}
Err(err) => debug!("Could not format buffered log metric: {}", err),
}
})
@ -115,7 +141,11 @@ impl Flush for LogScope {
for entry in entries.drain(..) {
writeln!(&mut buf, "{:?}", &entry)?;
}
log!(log::Level::Debug, "{:?}", &buf);
if let Some(target) = &self.log.target {
log!(target: target, self.log.level, "{:?}", &buf)
} else {
log!(self.log.level, "{:?}", &buf)
}
}
Ok(())
}
@ -135,7 +165,7 @@ mod test {
#[test]
fn test_to_log() {
let c = super::Log::log_to().input();
let c = super::Log::to_log().input();
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}

View File

@ -1,58 +1,25 @@
//! Prometheus-related functionality.
//! Both push and pull are supported.
//! Both protobuf and text format are supported.
//! Send metrics to a Prometheus server.
use core::{Flush};
use core::input::{InputKind};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed};
use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
use output::socket::RetrySocket;
use queue::queue_out;
use cache::cache_out;
use core::label::Labels;
use std::net::ToSocketAddrs;
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use std::io::Write;
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use prometheus::{Opts, Registry, IntGauge, IntCounter, Encoder, ProtobufEncoder, TextEncoder,};
metrics!{
}
#[derive(Clone, Debug)]
enum PrometheusEncoding {
JSON,
PROTOBUF,
}
/// Prometheus push shared client
/// Holds a shared socket to a Prometheus host.
/// Prometheus output holds a socket to a Prometheus server.
/// The socket is shared between scopes opened from the output.
#[derive(Clone, Debug)]
pub struct Prometheus {
attributes: Attributes,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
}
impl Prometheus {
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_json_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::JSON,
})
}
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_protobuf_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::PROTOBUF,
})
}
push_url: String,
}
impl Output for Prometheus {
@ -61,85 +28,131 @@ impl Output for Prometheus {
fn output(&self) -> Self::SCOPE {
PrometheusScope {
attributes: self.attributes.clone(),
registry: Registry::new(),
socket: self.socket.clone(),
encoding: self.encoding.clone(),
buffer: Rc::new(RefCell::new(String::new())),
push_url: self.push_url.clone(),
}
}
}
impl Prometheus {
/// Send metrics to a Prometheus "push gateway" at the URL provided.
/// URL path must include group identifier labels `job`
/// as shown in https://github.com/prometheus/pushgateway#command-line
/// For example `http://pushgateway.example.org:9091/metrics/job/some_job`
pub fn push_to(url: &str) -> error::Result<Prometheus> {
debug!("Pushing to Prometheus {:?}", url);
Ok(Prometheus {
attributes: Attributes::default(),
push_url: url.to_string(),
})
}
}
impl WithAttributes for Prometheus {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// Prometheus push client scope
#[derive(Clone)]
impl Buffered for Prometheus {}
/// Prometheus Input
#[derive(Debug, Clone)]
pub struct PrometheusScope {
attributes: Attributes,
registry: Registry,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
buffer: Rc<RefCell<String>>,
push_url: String,
}
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
// TODO enable labels
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_prepend(name).join(".");
match kind {
InputKind::Counter | InputKind::Timer => {
let counter = self.register_counter(name);
OutputMetric::new(move |value, _labels| counter.inc_by(value as i64))
},
InputKind::Marker => {
let counter = self.register_counter(name);
OutputMetric::new(move |_value, _labels| counter.inc())
},
InputKind::Gauge | InputKind::Level => self.new_gauge(name)
}
let prefix = self.prefix_prepend(name).join("_");
let scale = match kind {
// timers are in µs, but we give Prometheus milliseconds
InputKind::Timer => 1000,
_ => 1,
};
let cloned = self.clone();
let metric = PrometheusMetric { prefix, scale };
OutputMetric::new(move |value, labels| {
cloned.print(&metric, value, labels);
})
}
}
impl PrometheusScope {
fn register_counter(&self, name: String) -> IntCounter {
let opts = Opts::new(name, "".into());
let counter = IntCounter::with_opts(opts).expect("Prometheus IntCounter");
self.registry.register(Box::new(counter.clone())).expect("Prometheus IntCounter");
counter
}
fn new_gauge(&self, name: String) -> OutputMetric {
let opts = Opts::new(name, "".into());
let gauge = IntGauge::with_opts(opts).expect("Prometheus IntGauge");
self.registry.register(Box::new(gauge.clone())).expect("Prometheus IntGauge");
OutputMetric::new(move |value, _labels|
gauge.add(value as i64)
)
}
}
impl Flush for PrometheusScope {
fn flush(&self) -> error::Result<()> {
let metric_families = self.registry.gather();
let mut buffer = vec![];
let buf = self.buffer.borrow_mut();
self.flush_inner(buf)
}
}
match self.encoding {
PrometheusEncoding::JSON => {
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
PrometheusEncoding::PROTOBUF => {
let encoder = ProtobufEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
impl PrometheusScope {
fn print(&self, metric: &PrometheusMetric, value: MetricValue, labels: Labels) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let mut strbuf = String::new();
// prometheus format be like `http_requests_total{method="post",code="200"} 1027 1395066363000`
strbuf.push_str(&metric.prefix);
let labels_map = labels.into_map();
if !labels_map.is_empty() {
strbuf.push('{');
let mut i = labels_map.into_iter();
let mut next = i.next();
while let Some((k, v)) = next {
strbuf.push_str(&k);
strbuf.push_str("=\"");
strbuf.push_str(&v);
next = i.next();
if next.is_some() {
strbuf.push_str("\",");
} else {
strbuf.push('"');
}
}
strbuf.push_str("} ");
} else {
strbuf.push(' ');
}
strbuf.push_str(&value_str);
let mut socket = self.socket.write().expect("Lock Prometheus Socket");
Ok(socket.write_all(&mut buffer)?)
let buffer = self.buffer.borrow_mut();
if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD {
metrics::PROMETHEUS_OVERFLOW.mark();
warn!("Prometheus Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD);
let _ = self.flush_inner(buffer);
} else {
if self.get_buffering().is_none() {
if let Err(e) = self.flush_inner(buffer) {
debug!("Could not send to Prometheus {}", e)
}
}
}
}
fn flush_inner(&self, mut buf: RefMut<String>) -> error::Result<()> {
if buf.is_empty() { return Ok(()) }
match minreq::get(self.push_url.as_ref()).with_body(buf.as_ref()).send() {
Ok(_res) => {
metrics::PROMETHEUS_SENT_BYTES.count(buf.len());
trace!("Sent {} bytes to Prometheus", buf.len());
buf.clear();
Ok(())
}
Err(e) => {
metrics::PROMETHEUS_SEND_ERR.mark();
debug!("Failed to send buffer to Prometheus: {}", e);
Err(e.into())
}
}
}
}
@ -147,3 +160,55 @@ impl WithAttributes for PrometheusScope {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Buffered for PrometheusScope {}
impl queue_out::QueuedOutput for Prometheus {}
impl cache_out::CachedOutput for Prometheus {}
/// Its hard to see how a single scope could get more metrics than this.
// TODO make configurable?
const BUFFER_FLUSH_THRESHOLD: usize = 65_536;
/// Key of a Prometheus metric.
#[derive(Debug, Clone)]
pub struct PrometheusMetric {
prefix: String,
scale: isize,
}
/// Any remaining buffered data is flushed on Drop.
impl Drop for PrometheusScope {
fn drop(&mut self) {
if let Err(err) = self.flush() {
warn!("Could not flush Prometheus metrics upon Drop: {}", err)
}
}
}
#[cfg(feature = "bench")]
mod bench {
use core::attributes::*;
use core::input::*;
use super::*;
use test;
#[bench]
pub fn immediate_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
#[bench]
pub fn buffering_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}