Merge pull request #34 from fralalonde/gagaga

This commit is contained in:
Francis Lalonde 2019-01-08 10:25:48 -05:00 committed by GitHub
commit 3dbe61e7fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 622 additions and 364 deletions

View File

@ -27,7 +27,7 @@ num = { version = "0.2", default-features = false }
# FIXME required only for random seed for sampling
time = "0.1"
prometheus = { version = "0.4" }
minreq = { version = "1.0.0" }
# optional dep for standalone http pull metrics
tiny_http = { version = "0.6", optional = true }

View File

@ -51,7 +51,7 @@ While timers internal precision are in nanoseconds, their accuracy depends on pl
Timer's default output format is milliseconds but is scalable up or down.
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let app_metrics = Stream::to_stdout().input();
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
@ -79,7 +79,7 @@ Names should exclude characters that can interfere with namespaces, separator an
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let app_metrics = Stream::to_stdout().input();
let db_metrics = app_metrics.add_prefix("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
@ -107,17 +107,12 @@ Notes about labels:
Metric inputs are usually setup statically upon application startup.
```$rust,skt-run
#[macro_use]
extern crate dipstick;
use dipstick::*;
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
fn main() {
route_aggregate_metrics(to_stdout());
Proxy::set_default_target(Stream::to_stdout().input());
COUNTER_A.count(11);
}
```
@ -126,15 +121,16 @@ The static metric definition macro is just `lazy_static!` wrapper.
## Dynamic metrics
If necessary, metrics can also be defined "dynamically", with a possibly new name for every value.
This is more flexible but has a higher runtime cost, which may be alleviated with caching.
If necessary, metrics can also be defined "dynamically".
This is more flexible but has a higher runtime cost, which may be alleviated with the optional caching mechanism.
```$rust,skt-run
let user_name = "john_day";
let app_metrics = to_log().with_cache(512);
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
let app_metrics = Log::to_log().cached(512).input();
app_metrics.gauge(&format!("gauge_for_user_{}", user_name)).value(44);
```
Alternatively, you may use `Labels` to output context-dependent metrics.
## Metrics Output
A metrics library's second job is to help a program emit metric values that can be used in further systems.
@ -184,8 +180,8 @@ If enabled, buffering is usually a best-effort affair, to safely limit the amoun
Some outputs such as statsd also have the ability to sample metrics.
If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy.
```$rust,skt-fail
let _app_metrics = Statsd::send_to("server:8125")?.with_sampling_rate(0.01);
```$rust,skt-run,no_run
let _app_metrics = Statsd::send_to("server:8125")?.sampled(Sampling::Random(0.01)).input();
```

View File

@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {{
{}
}}
```
```rust,skt-fail
extern crate dipstick;
use dipstick::*;
use std::result::Result;
use std::time::Duration;
fn main() {{
run().ok();
}}
fn run() -> Result<(), Error> {{
fn main() -> std::result::Result<(), Box<std::error::Error>> {{
{}
Ok(())
}}
```
```rust,skt-plain
{}
```

View File

@ -46,10 +46,10 @@ Here's a basic aggregating & auto-publish counter metric:
```$rust,skt-run
let bucket = AtomicBucket::new();
bucket.set_target(Stream::stdout());
bucket.flush_every(Duration::from_secs(3));
bucket.set_drain(Stream::to_stdout());
bucket.flush_every(std::time::Duration::from_secs(3));
let counter = bucket.counter("counter_a");
counter.count(8)
counter.count(8);
```
Persistent apps wanting to declare static metrics will prefer using the `metrics!` macro:

View File

@ -6,30 +6,12 @@ Use `cargo test --features="skeptic"` to run the examples in the README using th
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {{
{}
}}
```
```rust,skt-fail
extern crate dipstick;
use dipstick::*;
use std::result::Result;
use std::time::Duration;
fn main() {{
run().ok();
}}
fn run() -> Result<(), Error> {{
fn main() -> std::result::Result<(), Box<std::error::Error>> {{
{}
Ok(())
}}
```
```rust,skt-plain
{}
```

View File

@ -1,12 +1,10 @@
//! A sample application asynchronously printing metrics to stdout.
#[macro_use]
extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Stream, Counter, InputScope, QueuedOutput, Input};
use std::io;
use dipstick::{Stream, InputScope, QueuedOutput, Input};
use std::thread;
fn main() {

View File

@ -1,8 +1,8 @@
//! An app demonstrating the basics of the metrics front-end.
//! Defines metrics of each kind and use them to print values to the console in multiple ways.
#[macro_use]
extern crate dipstick;
use std::thread::sleep;
use std::io;
use std::time::Duration;

View File

@ -26,6 +26,7 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.set_stats(stats_all);
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -30,6 +30,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -27,6 +27,6 @@ fn main() {
});
}
sleep(Duration::from_secs(5));
bucket.flush_now_to(&Stream::write_to(io::stdout()).output(), &stats_all).unwrap();
bucket.flush_to(&Stream::write_to(io::stdout()).output()).unwrap();
}

View File

@ -10,7 +10,7 @@ fn main() {
let bucket = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
bucket.set_flush_to(Graphite::send_to("localhost:2003").expect("Socket")
bucket.set_drain(Graphite::send_to("localhost:2003").expect("Socket")
.add_prefix("machine1").add_prefix("application"));
bucket.flush_every(Duration::from_secs(3));

View File

@ -11,7 +11,7 @@ fn main() {
let metrics = AtomicBucket::new().add_prefix("test");
// Bucket::set_default_output(to_stdout());
metrics.set_flush_to(Stream::write_to(io::stdout()));
metrics.set_drain(Stream::write_to(io::stdout()));
metrics.flush_every(Duration::from_secs(3));

View File

@ -11,7 +11,7 @@ use std::thread::sleep;
fn main() {
let bucket = AtomicBucket::new();
AtomicBucket::set_default_flush_to(Stream::write_to(io::stdout()));
AtomicBucket::set_default_drain(Stream::write_to(io::stdout()));
let persistent_marker = bucket.marker("persistent");

View File

@ -10,7 +10,7 @@ use std::io;
fn main() {
let app_metrics = AtomicBucket::new();
app_metrics.set_flush_to(Stream::write_to(io::stdout()));
app_metrics.set_drain(Stream::write_to(io::stdout()));
app_metrics.flush_every(Duration::from_secs(3));

View File

@ -4,12 +4,11 @@ extern crate dipstick;
use std::time::Duration;
use std::thread::sleep;
use std::io;
use dipstick::*;
fn main() {
let input = Stream::write_to(io::stdout()).buffered(Buffering::Unlimited);
let input = Stream::to_stdout().buffered(Buffering::Unlimited);
loop {
println!("\n------- open scope");

View File

@ -7,7 +7,6 @@ extern crate dipstick;
use std::time::Duration;
use dipstick::*;
use std::thread::sleep;
use std::io;
metrics!{
APP = "application" => {
@ -34,7 +33,7 @@ fn main() {
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
AtomicBucket::set_default_flush_to(Stream::write_to(io::stdout()));
AtomicBucket::set_default_drain(Stream::to_stdout());
AtomicBucket::set_default_stats(stats_all);
loop {

View File

@ -42,7 +42,7 @@ fn main() {
}
// send application metrics to aggregator
AtomicBucket::set_default_flush_to(Stream::to_stderr());
AtomicBucket::set_default_drain(Stream::to_stderr());
AtomicBucket::set_default_stats(custom_statistics);
let app_metrics = AtomicBucket::new();

View File

@ -1,12 +1,10 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
use std::io;
// undeclared root (un-prefixed) metrics
metrics! {
@ -37,7 +35,7 @@ metrics!(LIB_METRICS => {
});
fn main() {
dipstick::Proxy::set_default_target(dipstick::Stream::write_to(io::stdout()).input());
dipstick::Proxy::set_default_target(Stream::to_stdout().input());
loop {
ROOT_COUNTER.count(123);

View File

@ -4,7 +4,6 @@ extern crate dipstick;
use dipstick::{MultiInput, Graphite, Stream, Input, InputScope, Prefixed};
use std::time::Duration;
use std::io;
fn main() {
// will output metrics to graphite and to stdout

View File

@ -4,7 +4,6 @@ extern crate dipstick;
use dipstick::*;
use std::time::Duration;
use std::io;
fn main() {
// will output metrics to graphite and to stdout

View File

@ -7,7 +7,7 @@ use std::time::Duration;
fn main() {
let metrics =
Prometheus::send_json_to("localhost:2003")
Prometheus::push_to("http:// prometheus:9091/metrics/job/prometheus_example")
.expect("Prometheus Socket")
.add_prefix("my_app")
.input();

View File

@ -12,7 +12,7 @@ fn main() {
pub fn raw_write() {
// setup dual metric channels
let metrics_log = dipstick::Log::log_to().input();
let metrics_log = dipstick::Log::to_log().input();
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(

0
libtest.rmeta Normal file
View File

View File

@ -26,14 +26,14 @@ fn initial_stats() -> &'static StatsFn {
&stats_summary
}
fn initial_output() -> Arc<OutputDyn + Send + Sync> {
fn initial_drain() -> Arc<OutputDyn + Send + Sync> {
Arc::new(output_none())
}
lazy_static! {
static ref DEFAULT_AGGREGATE_STATS: RwLock<Arc<StatsFn>> = RwLock::new(Arc::new(initial_stats()));
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OutputDyn + Send + Sync>> = RwLock::new(initial_output());
static ref DEFAULT_AGGREGATE_OUTPUT: RwLock<Arc<OutputDyn + Send + Sync>> = RwLock::new(initial_drain());
}
/// Central aggregation structure.
@ -49,7 +49,7 @@ struct InnerAtomicBucket {
period_start: TimeHandle,
stats: Option<Arc<Fn(InputKind, MetricName, ScoreType)
-> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>,
output: Option<Arc<OutputDyn + Send + Sync + 'static>>,
drain: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
}
@ -67,17 +67,12 @@ lazy_static! {
impl InnerAtomicBucket {
pub fn flush(&mut self) -> error::Result<()> {
let stats_fn = match self.stats {
Some(ref stats_fn) => stats_fn.clone(),
None => DEFAULT_AGGREGATE_STATS.read().unwrap().clone(),
};
let pub_scope = match self.output {
let pub_scope = match self.drain {
Some(ref out) => out.output_dyn(),
None => DEFAULT_AGGREGATE_OUTPUT.read().unwrap().output_dyn(),
};
self.flush_to(pub_scope.borrow(), stats_fn.as_ref())?;
self.flush_to(pub_scope.borrow())?;
// all metrics published!
// purge: if bucket is the last owner of the metric, remove it
@ -86,7 +81,7 @@ impl InnerAtomicBucket {
self.metrics.iter()
.filter(|&(_k, v)| Arc::strong_count(v) == 1)
.map(|(k, _v)| k)
.for_each(|k| {purged.remove(k);});
.for_each(|k| { purged.remove(k); });
self.metrics = purged;
Ok(())
@ -95,7 +90,7 @@ impl InnerAtomicBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, target: &OutputScope, stats: &StatsFn) -> error::Result<()> {
pub fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
@ -119,9 +114,15 @@ impl InnerAtomicBucket {
if self.publish_metadata {
snapshot.push((&PERIOD_LENGTH, InputKind::Timer, vec![Sum((duration_seconds * 1000.0) as isize)]));
}
let stats_fn = match self.stats {
Some(ref stats_fn) => stats_fn.clone(),
None => DEFAULT_AGGREGATE_STATS.read()?.clone(),
};
for metric in snapshot {
for score in metric.2 {
let filtered = stats(metric.1, metric.0.clone(), score);
let filtered = stats_fn(metric.1, metric.0.clone(), score);
if let Some((kind, name, value)) = filtered {
let metric: OutputMetric = target.new_metric(name, kind);
// TODO provide some bucket context through labels?
@ -150,7 +151,7 @@ impl AtomicBucket {
metrics: BTreeMap::new(),
period_start: TimeHandle::now(),
stats: None,
output: None,
drain: None,
// TODO add API toggle for metadata publish
publish_metadata: false,
}))
@ -171,13 +172,13 @@ impl AtomicBucket {
}
/// Set the default bucket aggregated metrics flush output.
pub fn set_default_flush_to(default_config: impl Output + Send + Sync + 'static) {
pub fn set_default_drain(default_config: impl Output + Send + Sync + 'static) {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = Arc::new(default_config);
}
/// Revert the default bucket aggregated metrics flush output.
pub fn unset_default_flush_to() {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_output()
pub fn unset_default_drain() {
*DEFAULT_AGGREGATE_OUTPUT.write().unwrap() = initial_drain()
}
/// Set this bucket's statistics generator.
@ -194,19 +195,19 @@ impl AtomicBucket {
}
/// Set this bucket's aggregated metrics flush output.
pub fn set_flush_to(&self, new_config: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").output = Some(Arc::new(new_config))
pub fn set_drain(&self, new_drain: impl Output + Send + Sync + 'static) {
self.inner.write().expect("Aggregator").drain = Some(Arc::new(new_drain))
}
/// Revert this bucket's flush target to the default output.
pub fn unset_flush_to(&self) {
self.inner.write().expect("Aggregator").output = None
pub fn unset_drain(&self) {
self.inner.write().expect("Aggregator").drain = None
}
/// Immediately flush the bucket's metrics to the specified scope and stats.
pub fn flush_now_to(&self, publish_scope: &OutputScope, stats_fn: &StatsFn) -> error::Result<()> {
pub fn flush_to(&self, publish_scope: &OutputScope) -> error::Result<()> {
let mut inner = self.inner.write().expect("Aggregator");
inner.flush_to(publish_scope, stats_fn)
inner.flush_to(publish_scope)
}
}
@ -239,6 +240,12 @@ impl WithAttributes for AtomicBucket {
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
const HIT: usize = 0;
const SUM: usize = 1;
const MAX: usize = 2;
const MIN: usize = 3;
const SCORES_LEN: usize = 4;
/// A metric that holds aggregated values.
/// Some fields are kept public to ease publishing.
#[derive(Debug)]
@ -246,7 +253,7 @@ struct AtomicScores {
/// The kind of metric
kind: InputKind,
/// The actual recorded metric scores
scores: [AtomicIsize; 4],
scores: [AtomicIsize; SCORES_LEN],
}
impl AtomicScores {
@ -264,22 +271,32 @@ impl AtomicScores {
}
#[inline]
fn blank() -> [isize; 4] {
fn blank() -> [isize; SCORES_LEN] {
[0, 0, isize::MIN, isize::MAX]
}
/// Update scores with new value
pub fn update(&self, value: MetricValue) -> () {
// TODO report any concurrent updates / resets for measurement of contention
let value = value;
self.scores[0].fetch_add(1, AcqRel);
// TODO detect & report any concurrent updates / resets for measurement of contention
// Count is tracked for all metrics
self.scores[HIT].fetch_add(1, Relaxed);
match self.kind {
InputKind::Marker => {}
_ => {
// optimization - these fields are unused for Marker stats
self.scores[1].fetch_add(value, AcqRel);
swap_if(&self.scores[2], value, |new, current| new > current);
swap_if(&self.scores[3], value, |new, current| new < current);
InputKind::Level => {
// Level min & max apply to the _sum_ of values
// fetch_add only returns the previous sum, so min & max trail behind by one operation
// instead, pickup the slack by comparing again with the final sum upon `snapshot`
// this is to avoid making an extra load() on every value
let prev_sum = self.scores[SUM].fetch_add(value, Relaxed);
swap_if(&self.scores[MAX], prev_sum, |new, current| new > current);
swap_if(&self.scores[MIN], prev_sum, |new, current| new < current);
}
InputKind::Counter | InputKind::Timer | InputKind::Gauge => {
// gauges are non cumulative, but we keep the sum to compute the mean
// TODO use #![feature(atomic_min_max)] when stabilized
self.scores[SUM].fetch_add(value, Relaxed);
swap_if(&self.scores[MAX], value, |new, current| new > current);
swap_if(&self.scores[MIN], value, |new, current| new < current);
}
}
}
@ -287,16 +304,28 @@ impl AtomicScores {
/// Reset scores to zero, return previous values
fn snapshot(&self, scores: &mut [isize; 4]) -> bool {
// NOTE copy timestamp, count AND sum _before_ testing for data to reduce concurrent discrepancies
scores[0] = self.scores[0].swap(0, AcqRel);
scores[1] = self.scores[1].swap(0, AcqRel);
scores[HIT] = self.scores[HIT].swap(0, AcqRel);
scores[SUM] = self.scores[SUM].swap(0, AcqRel);
// if hit count is zero, then no values were recorded.
if scores[0] == 0 {
// if hit count is zero, no values were recorded.
if scores[HIT] == 0 {
return false;
}
scores[2] = self.scores[2].swap(isize::MIN, AcqRel);
scores[3] = self.scores[3].swap(isize::MAX, AcqRel);
scores[MAX] = self.scores[MAX].swap(isize::MIN, AcqRel);
scores[MIN] = self.scores[MIN].swap(isize::MAX, AcqRel);
if self.kind == InputKind::Level {
// fetch_add only returns the previous sum, so min & max trail behind by one operation
// pickup the slack by comparing one last time against the final sum
if scores[SUM] > scores[MAX] {
scores[MAX] = scores[SUM];
}
if scores[SUM] < scores[MIN] {
scores[MIN] = scores[SUM];
}
}
true
}
@ -308,33 +337,43 @@ impl AtomicScores {
let mut snapshot = Vec::new();
match self.kind {
InputKind::Marker => {
snapshot.push(Count(scores[0]));
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
snapshot.push(Count(scores[HIT]));
snapshot.push(Rate(scores[HIT] as f64 / duration_seconds))
}
InputKind::Gauge => {
snapshot.push(Max(scores[2]));
snapshot.push(Min(scores[3]));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
snapshot.push(Max(scores[MAX]));
snapshot.push(Min(scores[MIN]));
snapshot.push(Mean(scores[SUM] as f64 / scores[HIT] as f64));
}
InputKind::Timer => {
snapshot.push(Count(scores[0]));
snapshot.push(Sum(scores[1]));
snapshot.push(Count(scores[HIT]));
snapshot.push(Sum(scores[SUM]));
snapshot.push(Max(scores[2]));
snapshot.push(Min(scores[3]));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
snapshot.push(Max(scores[MAX]));
snapshot.push(Min(scores[MIN]));
snapshot.push(Mean(scores[SUM] as f64 / scores[HIT] as f64));
// timer rate uses the COUNT of timer calls per second (not SUM)
snapshot.push(Rate(scores[0] as f64 / duration_seconds))
snapshot.push(Rate(scores[HIT] as f64 / duration_seconds))
}
InputKind::Counter => {
snapshot.push(Count(scores[0]));
snapshot.push(Sum(scores[1]));
snapshot.push(Count(scores[HIT]));
snapshot.push(Sum(scores[SUM]));
snapshot.push(Max(scores[2]));
snapshot.push(Min(scores[3]));
snapshot.push(Mean(scores[1] as f64 / scores[0] as f64));
snapshot.push(Max(scores[MAX]));
snapshot.push(Min(scores[MIN]));
snapshot.push(Mean(scores[SUM] as f64 / scores[HIT] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[1] as f64 / duration_seconds))
snapshot.push(Rate(scores[SUM] as f64 / duration_seconds))
}
InputKind::Level => {
snapshot.push(Count(scores[HIT]));
snapshot.push(Sum(scores[SUM]));
snapshot.push(Max(scores[MAX]));
snapshot.push(Min(scores[MIN]));
snapshot.push(Mean(scores[SUM] as f64 / scores[HIT] as f64));
// counter rate uses the SUM of values per second (e.g. to get bytes/s)
snapshot.push(Rate(scores[SUM] as f64 / duration_seconds))
}
}
Some(snapshot)
@ -367,7 +406,7 @@ mod bench {
#[bench]
fn update_marker(b: &mut test::Bencher) {
let metric = AtomicScores::new(InputKind::Marker);
b.iter(|| test::black_box(metric.update(1.0)));
b.iter(|| test::black_box(metric.update(1)));
}
#[bench]
@ -410,14 +449,17 @@ mod test {
use std::time::Duration;
use std::collections::BTreeMap;
fn make_stats(stats_fn: &StatsFn) -> BTreeMap<String, MetricValue> {
fn make_stats(stats_fn: &'static StatsFn) -> BTreeMap<String, MetricValue> {
mock_clock_reset();
let metrics = AtomicBucket::new().add_prefix("test");
metrics.set_stats(stats_fn);
let counter = metrics.counter("counter_a");
let counter_b = metrics.counter("counter_b");
let timer = metrics.timer("timer_a");
let gauge = metrics.gauge("gauge_a");
let level = metrics.level("level_a");
let marker = metrics.marker("marker_a");
marker.mark();
@ -427,18 +469,25 @@ mod test {
counter.count(10);
counter.count(20);
counter_b.count(9);
counter_b.count(18);
counter_b.count(3);
timer.interval_us(10_000_000);
timer.interval_us(20_000_000);
gauge.value(10);
gauge.value(20);
level.adjust(789);
level.adjust(-7789);
level.adjust(77788);
mock_clock_advance(Duration::from_secs(3));
// TODO expose & use flush_to()
let stats = StatsMap::default();
metrics.flush_now_to(&stats, stats_fn).unwrap();
stats.into()
let map = StatsMap::default();
metrics.flush_to(&map).unwrap();
map.into()
}
#[test]
@ -448,8 +497,17 @@ mod test {
assert_eq!(map["test.counter_a.count"], 2);
assert_eq!(map["test.counter_a.sum"], 30);
assert_eq!(map["test.counter_a.mean"], 15);
assert_eq!(map["test.counter_a.min"], 10);
assert_eq!(map["test.counter_a.max"], 20);
assert_eq!(map["test.counter_a.rate"], 10);
assert_eq!(map["test.counter_b.count"], 3);
assert_eq!(map["test.counter_b.sum"], 30);
assert_eq!(map["test.counter_b.mean"], 10);
assert_eq!(map["test.counter_b.min"], 3);
assert_eq!(map["test.counter_b.max"], 18);
assert_eq!(map["test.counter_b.rate"], 10);
assert_eq!(map["test.timer_a.count"], 2);
assert_eq!(map["test.timer_a.sum"], 30_000_000);
assert_eq!(map["test.timer_a.min"], 10_000_000);
@ -461,6 +519,10 @@ mod test {
assert_eq!(map["test.gauge_a.min"], 10);
assert_eq!(map["test.gauge_a.max"], 20);
assert_eq!(map["test.level_a.mean"], 23596);
assert_eq!(map["test.level_a.min"], -7000);
assert_eq!(map["test.level_a.max"], 70788);
assert_eq!(map["test.marker_a.count"], 3);
assert_eq!(map["test.marker_a.rate"], 1);
}
@ -470,6 +532,8 @@ mod test {
let map = make_stats(&stats_summary);
assert_eq!(map["test.counter_a"], 30);
assert_eq!(map["test.counter_b"], 30);
assert_eq!(map["test.level_a"], 23596);
assert_eq!(map["test.timer_a"], 30_000_000);
assert_eq!(map["test.gauge_a"], 15);
assert_eq!(map["test.marker_a"], 3);
@ -480,6 +544,8 @@ mod test {
let map = make_stats(&stats_average);
assert_eq!(map["test.counter_a"], 15);
assert_eq!(map["test.counter_b"], 10);
assert_eq!(map["test.level_a"], 23596);
assert_eq!(map["test.timer_a"], 15_000_000);
assert_eq!(map["test.gauge_a"], 15);
assert_eq!(map["test.marker_a"], 3);

View File

@ -11,9 +11,9 @@ pub enum ScoreType {
Count(isize),
/// Sum of metric values reported.
Sum(isize),
/// Biggest value reported.
/// Biggest value observed.
Max(isize),
/// Smallest value reported.
/// Smallest value observed.
Min(isize),
/// Average value (hit count / sum, non-atomic)
Mean(f64),
@ -76,7 +76,7 @@ pub fn stats_summary(kind: InputKind, name: MetricName, score: ScoreType)
ScoreType::Sum(sum) => Some((kind, name, sum)),
_ => None,
},
InputKind::Gauge => match score {
InputKind::Gauge | InputKind::Level => match score {
ScoreType::Mean(mean) => Some((InputKind::Gauge, name, mean.round() as MetricValue)),
_ => None,
},

View File

@ -21,14 +21,14 @@ impl TimeHandle {
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_us(self) -> MetricValue {
pub fn elapsed_us(self) -> u64 {
let duration = now() - self.0;
(duration.as_secs() * 1_000_000) as MetricValue + duration.subsec_micros() as MetricValue
(duration.as_secs() * 1_000_000) + duration.subsec_micros() as u64
}
/// Get the elapsed time in microseconds since TimeHandle was obtained.
pub fn elapsed_ms(self) -> MetricValue {
self.elapsed_us() / 1000
(self.elapsed_us() / 1000) as isize
}
}

View File

@ -59,6 +59,10 @@ pub trait InputScope: Flush {
self.new_metric(name.into(), InputKind::Gauge).into()
}
/// Define a level.
fn level(&self, name: &str) -> Level {
self.new_metric(name.into(), InputKind::Level).into()
}
}
/// A metric is actually a function that knows to write a metric value to a metric output.
@ -91,9 +95,11 @@ impl InputMetric {
pub enum InputKind {
/// Handling one item at a time.
Marker,
/// Handling quantities or multiples.
/// Handling cumulative observed quantities.
Counter,
/// Reporting instant measurement of a resource at a point in time.
/// Handling quantity fluctuations.
Level,
/// Reporting instant measurement of a resource at a point in time (non-cumulative).
Gauge,
/// Measuring a time interval, internal to the app or provided by an external source.
Timer,
@ -107,6 +113,7 @@ impl<'a> From<&'a str> for InputKind {
"Counter" => InputKind::Counter,
"Gauge" => InputKind::Gauge,
"Timer" => InputKind::Timer,
"Level" => InputKind::Level,
_ => panic!("No InputKind '{}' defined", s)
}
}
@ -127,7 +134,13 @@ impl Marker {
}
}
/// A counter that sends values to the metrics backend
/// A counter of absolute observed values (non-negative amounts).
/// Used to count to count things that can not be undone:
/// - Bytes sent
/// - Records written
/// - Apples eaten
/// For relative (possibly negative) values, the `Level` counter type can be used.
/// If ag0gregated, minimum and maximum scores will track the collected values, not their sum.
#[derive(Debug, Clone)]
pub struct Counter {
inner: InputMetric,
@ -135,7 +148,24 @@ pub struct Counter {
impl Counter {
/// Record a value count.
pub fn count<V: ToPrimitive>(&self, count: V) {
pub fn count(&self, count: usize) {
self.inner.write(count as isize, labels![])
}
}
/// A counter of fluctuating resources accepting positive and negative values.
/// Can be used as a stateful `Gauge` or a as `Counter` of possibly decreasing amounts.
/// - Size of messages in a queue
/// - Strawberries on a conveyor belt
/// If aggregated, minimum and maximum scores will track the sum of values, not the collected values themselves.
#[derive(Debug, Clone)]
pub struct Level {
inner: InputMetric,
}
impl Level {
/// Record a positive or negative value count
pub fn adjust<V: ToPrimitive>(&self, count: V) {
self.inner.write(count.to_isize().unwrap(), labels![])
}
}
@ -167,8 +197,8 @@ pub struct Timer {
impl Timer {
/// Record a microsecond interval for this timer
/// Can be used in place of start()/stop() if an external time interval source is used
pub fn interval_us<V: ToPrimitive>(&self, interval_us: V) -> V {
self.inner.write(interval_us.to_isize().unwrap(), labels![]);
pub fn interval_us(&self, interval_us: u64) -> u64 {
self.inner.write(interval_us as isize, labels![]);
interval_us
}
@ -188,7 +218,7 @@ impl Timer {
/// Returns the microsecond interval value that was recorded.
pub fn stop(&self, start_time: TimeHandle) -> MetricValue {
let elapsed_us = start_time.elapsed_us();
self.interval_us(elapsed_us)
self.interval_us(elapsed_us) as isize
}
/// Record the time taken to execute the provided closure
@ -223,3 +253,9 @@ impl From<InputMetric> for Marker {
Marker { inner: metric }
}
}
impl From<InputMetric> for Level {
fn from(metric: InputMetric) -> Level {
Level { inner: metric }
}
}

View File

@ -52,6 +52,12 @@ impl LabelScope {
Some(pairs) => pairs.get(key).cloned()
}
}
fn collect(&self, map: &mut HashMap<String, LabelValue>) {
if let Some(pairs) = &self.pairs {
map.extend(pairs.as_ref().clone().into_iter())
}
}
}
lazy_static!(
@ -89,6 +95,12 @@ impl ThreadLabel {
*map.borrow_mut() = new;
});
}
fn collect(map: &mut HashMap<String, LabelValue>) {
THREAD_LABELS.with(|mop| {
mop.borrow().collect(map)
});
}
}
/// Handle metric labels for the whole application (globals).
@ -98,21 +110,25 @@ pub struct AppLabel;
impl AppLabel {
/// Retrieve a value from the app scope.
pub fn get(key: &str) -> Option<Arc<String>> {
APP_LABELS.read().expect("Global Labels").get(key)
APP_LABELS.read().expect("App Labels Lock").get(key)
}
/// Set a new value for the app scope.
/// Replaces any previous value for the key.
pub fn set<S: Into<String>>(key: S, value: S) {
let b = { APP_LABELS.read().expect("Global Labels").set(key.into(), Arc::new(value.into())) };
*APP_LABELS.write().expect("Global Labels") = b;
*APP_LABELS.write().expect("App Labels Lock") = b;
}
/// Unset a value for the app scope.
/// Has no effect if key was not set.
pub fn unset(key: &str) {
let b = { APP_LABELS.read().expect("Global Labels").unset(key) };
*APP_LABELS.write().expect("Global Labels") = b;
*APP_LABELS.write().expect("App Labels Lock") = b;
}
fn collect(map: &mut HashMap<String, LabelValue>) {
APP_LABELS.read().expect("Global Labels").collect(map)
}
}
@ -182,6 +198,39 @@ impl Labels {
}
}
}
/// Export current state of labels to a map.
/// Note: An iterator would still need to allocate to check for uniqueness of keys.
///
pub fn into_map(mut self) -> HashMap<String, LabelValue> {
let mut map = HashMap::new();
match self.scopes.len() {
// no value labels, no saved context labels
// just lookup implicit context
0 => {
AppLabel::collect(&mut map);
ThreadLabel::collect(&mut map);
}
// some value labels, no saved context labels
// lookup value label, then lookup implicit context
1 => {
AppLabel::collect(&mut map);
ThreadLabel::collect(&mut map);
self.scopes[0].collect(&mut map);
},
// value + saved context labels
// lookup explicit context in turn
_ => {
self.scopes.reverse();
for src in self.scopes {
src.collect(&mut map)
}
}
}
map
}
}
@ -189,44 +238,56 @@ impl Labels {
pub mod test {
use super::*;
use std::sync::Mutex;
/// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently.
/// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite.
/// Instead we use a local mutex to make sure the label tests run in sequence.
lazy_static!{
static ref TEST_SEQUENCE: Mutex<()> = Mutex::new(());
}
#[test]
fn context_labels() {
let _lock = TEST_SEQUENCE.lock().expect("Test Sequence");
AppLabel::set("abc", "456");
ThreadLabel::set("abc", "123");
assert_eq!(Arc::new("123".into()), labels!().lookup("abc").unwrap());
assert_eq!(Arc::new("123".into()), labels!().lookup("abc").expect("ThreadLabel Value"));
ThreadLabel::unset("abc");
assert_eq!(Arc::new("456".into()), labels!().lookup("abc").expect("AppLabel Value"));
AppLabel::unset("abc");
assert_eq!(false, labels!().lookup("abc").is_some());
assert_eq!(true, labels!().lookup("abc").is_none());
}
#[test]
fn labels_macro() {
let _lock = TEST_SEQUENCE.lock().expect("Test Sequence");
let labels = labels!{
"abc" => "789",
"xyz" => "123"
};
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
assert_eq!(Arc::new("123".into()), labels.lookup("xyz").unwrap());
assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value"));
assert_eq!(Arc::new("123".into()), labels.lookup("xyz").expect("Label Value"));
}
#[test]
fn value_labels() {
let _lock = TEST_SEQUENCE.lock().expect("Test Sequence");
let labels = labels!{ "abc" => "789" };
assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value"));
AppLabel::set("abc", "456");
assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value"));
ThreadLabel::set("abc", "123");
let mut labels = labels!{
"abc" => "789",
};
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
ThreadLabel::unset("abc");
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
AppLabel::unset("abc");
assert_eq!(Arc::new("789".into()), labels.lookup("abc").unwrap());
labels = labels![];
assert_eq!(false, labels.lookup("abc").is_some());
assert_eq!(Arc::new("789".into()), labels.lookup("abc").expect("Label Value"));
}
}

View File

@ -1,70 +1,75 @@
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
inner: Arc<Mutex<LockScope>>
}
impl WithAttributes for LockingScopeBox {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {
let _guard = mutex.lock().expect("OutputMetric Lock");
raw_metric.write(value, labels)
} )
}
}
impl Flush for LockingScopeBox {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("OutputScope Lock").flush()
}
}
/// Blanket impl that provides RawOutputs their dynamic flavor.
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingScopeBox;
fn input(&self) -> Self::SCOPE {
LockingScopeBox {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(LockScope(self.output_dyn())))
}
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
struct LockScope(Rc<OutputScope + 'static> );
impl ops::Deref for LockScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
unsafe impl Send for LockScope {}
unsafe impl Sync for LockScope {}
//! Default locking strategy for shared concurrent output.
//! This makes all outputs also immediately usable as inputs.
//! The alternatives are queuing or thread local.
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingOutput {
attributes: Attributes,
inner: Arc<Mutex<LockedOutputScope>>
}
impl WithAttributes for LockingOutput {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingOutput {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
// lock when creating metrics
let raw_metric = self.inner.lock().expect("OutputScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {
// lock when collecting values
let _guard = mutex.lock().expect("OutputScope Lock");
raw_metric.write(value, labels)
} )
}
}
impl Flush for LockingOutput {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("OutputScope Lock").flush()
}
}
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingOutput;
fn input(&self) -> Self::SCOPE {
LockingOutput {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(LockedOutputScope(self.output_dyn())))
}
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread at a time or dragons may occur.
#[derive(Clone)]
struct LockedOutputScope(Rc<OutputScope + 'static> );
impl ops::Deref for LockedOutputScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
unsafe impl Send for LockedOutputScope {}
unsafe impl Sync for LockedOutputScope {}

View File

@ -14,6 +14,12 @@ metrics!{
pub SEND_FAILED: Marker = "send_failed";
}
"prometheus" => {
pub PROMETHEUS_SEND_ERR: Marker = "send_failed";
pub PROMETHEUS_OVERFLOW: Marker = "buf_overflow";
pub PROMETHEUS_SENT_BYTES: Counter = "sent_bytes";
}
"graphite" => {
pub GRAPHITE_SEND_ERR: Marker = "send_failed";
pub GRAPHITE_OVERFLOW: Marker = "buf_overflow";

View File

@ -3,7 +3,7 @@ pub mod name;
pub mod attributes;
pub mod input;
pub mod output;
pub mod out_lock;
pub mod locking;
pub mod clock;
pub mod void;
pub mod proxy;

View File

@ -70,3 +70,27 @@ impl<T: InputScope + Send + Sync + Clone + 'static> ScheduleFlush for T {
})
}
}
//use std::net::{SocketAddr, ToSocketAddrs};
//
//use tiny_http::{Server, StatusCode, self};
//
//pub fn http_serve<A: ToSocketAddrs, F: Fn()>(addresses: A) -> CancelHandle {
// let handle = CancelHandle::new();
// let inner_handle = handle.clone();
// let server = tiny_http::Server::http("0.0.0.0:0")?;
//
// thread::spawn(move || loop {
// match server.recv_timeout(Duration::from_secs(1)) {
// Ok(Some(req)) => {
// let response = tiny_http::Response::new_empty(StatusCode::from(200));
// if let Err(err) = req.respond(response) {
// warn!("Metrics response error: {}", err)
// }
// }
// Ok(None) => if inner_handle.is_cancelled() { break; }
// Err(err) => warn!("Metrics request error: {}", err)
// };
// });
// handle
//}

View File

@ -19,8 +19,7 @@ extern crate num;
// FIXME required only for pcg32 seed (for sampling)
extern crate time;
//#[cfg(feature="prometheus")]
extern crate prometheus;
//extern crate tiny_http;
#[macro_use]
mod macros;
@ -33,7 +32,7 @@ pub use core::name::{MetricName, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, InputKind};
pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric};
pub use core::scheduler::{ScheduleFlush, CancelHandle};
pub use core::out_lock::{LockingScopeBox};
pub use core::locking::LockingOutput;
pub use core::error::{Result};
pub use core::clock::{TimeHandle};
pub use core::label::{Labels, AppLabel, ThreadLabel};

View File

@ -91,12 +91,19 @@ macro_rules! metrics {
metrics!{ $($REST)* }
};
// BRANCH NODE - untyped expr
($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => {
// Identified Proxy Root
($e:ident => { $($BRANCH:tt)+ } $($REST:tt)*) => {
metrics!{ @internal $e; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// Anonymous Proxy Namespace
($e:expr => { $($BRANCH:tt)+ } $($REST:tt)*) => {
lazy_static! { static ref PROXY_METRICS: Proxy = $e.into(); }
metrics!{ @internal PROXY_METRICS; Proxy; $($BRANCH)* }
metrics!{ $($REST)* }
};
// LEAF NODE - public typed decl
($(#[$attr:meta])* pub $IDENT:ident: $TYPE:ty = $e:expr; $($REST:tt)*) => {
metrics!{ @internal Proxy::default(); Proxy; $(#[$attr])* pub $IDENT: $TYPE = $e; }
@ -170,6 +177,15 @@ mod test {
T1: Timer = "failed";
}}
metrics!("my_app" => {
COUNTER_A: Counter = "counter_a";
});
#[test]
fn gurp() {
COUNTER_A.count(11);
}
#[test]
fn call_new_macro_defined_metrics() {
M1.mark();

View File

@ -139,9 +139,9 @@ impl GraphiteScope {
let mut sock = self.socket.write().expect("Lock Graphite Socket");
match sock.write_all(buf.as_bytes()) {
Ok(()) => {
buf.clear();
metrics::GRAPHITE_SENT_BYTES.count(buf.len());
trace!("Sent {} bytes to graphite", buf.len());
buf.clear();
Ok(())
}
Err(e) => {

View File

@ -16,6 +16,8 @@ use log;
pub struct Log {
attributes: Attributes,
format: Arc<LineFormat>,
level: log::Level,
target: Option<String>,
}
impl Input for Log {
@ -25,7 +27,7 @@ impl Input for Log {
LogScope {
attributes: self.attributes.clone(),
entries: Arc::new(RwLock::new(Vec::new())),
output: self.clone(),
log: self.clone(),
}
}
}
@ -50,18 +52,37 @@ impl Formatting for Log {
pub struct LogScope {
attributes: Attributes,
entries: Arc<RwLock<Vec<Vec<u8>>>>,
output: Log,
log: Log,
}
impl Log {
/// Write metric values to the standard log using `info!`.
// TODO parameterize log level, logger
pub fn log_to() -> Log {
pub fn to_log() -> Log {
Log {
attributes: Attributes::default(),
format: Arc::new(SimpleFormat::default()),
level: log::Level::Info,
target: None
}
}
/// Sets the log `target` to use when logging metrics.
/// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation.
pub fn level(&self, level: log::Level) -> Self {
let mut cloned = self.clone();
cloned.level = level;
cloned
}
/// Sets the log `target` to use when logging metrics.
/// See the (log!)[https://docs.rs/log/0.4.6/log/macro.log.html] documentation.
pub fn target(&self, target: &str) -> Self {
let mut cloned = self.clone();
cloned.target = Some(target.to_string());
cloned
}
}
impl WithAttributes for LogScope {
@ -77,12 +98,11 @@ impl cache_in::CachedInput for Log {}
impl InputScope for LogScope {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let template = self.output.format.template(&name, kind);
let template = self.log.format.template(&name, kind);
let entries = self.entries.clone();
if let Some(_buffering) = self.get_buffering() {
// buffered
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
@ -95,10 +115,16 @@ impl InputScope for LogScope {
})
} else {
// unbuffered
let level = self.log.level;
let target = self.log.target.clone();
InputMetric::new(move |value, labels| {
let mut buffer = Vec::with_capacity(32);
match template.print(&mut buffer, value, |key| labels.lookup(key)) {
Ok(()) => log!(log::Level::Debug, "{:?}", &buffer),
Ok(()) => if let Some(target) = &target {
log!(target: target, level, "{:?}", &buffer)
} else {
log!(level, "{:?}", &buffer)
}
Err(err) => debug!("Could not format buffered log metric: {}", err),
}
})
@ -115,7 +141,11 @@ impl Flush for LogScope {
for entry in entries.drain(..) {
writeln!(&mut buf, "{:?}", &entry)?;
}
log!(log::Level::Debug, "{:?}", &buf);
if let Some(target) = &self.log.target {
log!(target: target, self.log.level, "{:?}", &buf)
} else {
log!(self.log.level, "{:?}", &buf)
}
}
Ok(())
}
@ -135,7 +165,7 @@ mod test {
#[test]
fn test_to_log() {
let c = super::Log::log_to().input();
let c = super::Log::to_log().input();
let m = c.new_metric("test".into(), InputKind::Marker);
m.write(33, labels![]);
}

View File

@ -1,58 +1,25 @@
//! Prometheus-related functionality.
//! Both push and pull are supported.
//! Both protobuf and text format are supported.
//! Send metrics to a Prometheus server.
use core::{Flush};
use core::input::{InputKind};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed};
use core::attributes::{Buffered, Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::{Flush, MetricValue};
use core::input::InputKind;
use core::metrics;
use core::output::{Output, OutputScope, OutputMetric};
use core::error;
use output::socket::RetrySocket;
use queue::queue_out;
use cache::cache_out;
use core::label::Labels;
use std::net::ToSocketAddrs;
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use std::io::Write;
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use prometheus::{Opts, Registry, IntGauge, IntCounter, Encoder, ProtobufEncoder, TextEncoder,};
metrics!{
}
#[derive(Clone, Debug)]
enum PrometheusEncoding {
JSON,
PROTOBUF,
}
/// Prometheus push shared client
/// Holds a shared socket to a Prometheus host.
/// Prometheus output holds a socket to a Prometheus server.
/// The socket is shared between scopes opened from the output.
#[derive(Clone, Debug)]
pub struct Prometheus {
attributes: Attributes,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
}
impl Prometheus {
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_json_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::JSON,
})
}
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_protobuf_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::PROTOBUF,
})
}
push_url: String,
}
impl Output for Prometheus {
@ -61,88 +28,131 @@ impl Output for Prometheus {
fn output(&self) -> Self::SCOPE {
PrometheusScope {
attributes: self.attributes.clone(),
registry: Registry::new(),
socket: self.socket.clone(),
encoding: self.encoding.clone(),
buffer: Rc::new(RefCell::new(String::new())),
push_url: self.push_url.clone(),
}
}
}
impl Prometheus {
/// Send metrics to a Prometheus "push gateway" at the URL provided.
/// URL path must include group identifier labels `job`
/// as shown in https://github.com/prometheus/pushgateway#command-line
/// For example `http://pushgateway.example.org:9091/metrics/job/some_job`
pub fn push_to(url: &str) -> error::Result<Prometheus> {
debug!("Pushing to Prometheus {:?}", url);
Ok(Prometheus {
attributes: Attributes::default(),
push_url: url.to_string(),
})
}
}
impl WithAttributes for Prometheus {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// Prometheus push client scope
#[derive(Clone)]
impl Buffered for Prometheus {}
/// Prometheus Input
#[derive(Debug, Clone)]
pub struct PrometheusScope {
attributes: Attributes,
registry: Registry,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
buffer: Rc<RefCell<String>>,
push_url: String,
}
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_prepend(name).join(".");
match kind {
InputKind::Counter => {
let opts = Opts::new(name, "".to_string());
let counter = IntCounter::with_opts(opts).expect("Prometheus Counter");
self.registry.register(Box::new(counter.clone())).expect("Registered Prometheus Counter");
OutputMetric::new(move |value, _labels|
counter.inc_by(value as i64)
)
},
InputKind::Marker => {
let opts = Opts::new(name, "".to_string());
let marker = IntCounter::with_opts(opts).expect("Prometheus Counter");
self.registry.register(Box::new(marker.clone())).expect("Registered Prometheus Marker");
OutputMetric::new(move |_value, _labels|
marker.inc()
)
},
InputKind::Timer => {
let opts = Opts::new(name, "".to_string());
let timer = IntCounter::with_opts(opts).expect("Prometheus Histogram");
self.registry.register(Box::new(timer.clone())).expect("Registered Prometheus Timer");
OutputMetric::new(move |value, _labels|
timer.inc_by(value as i64)
)
},
InputKind::Gauge => {
let opts = Opts::new(name, "".to_string());
let gauge = IntGauge::with_opts(opts).expect("Prometheus Gauge");
self.registry.register(Box::new(gauge.clone())).expect("Registered Prometheus Gauge");;
OutputMetric::new(move |value, _labels|
gauge.add(value as i64)
)
},
}
let prefix = self.prefix_prepend(name).join("_");
let scale = match kind {
// timers are in µs, but we give Prometheus milliseconds
InputKind::Timer => 1000,
_ => 1,
};
let cloned = self.clone();
let metric = PrometheusMetric { prefix, scale };
OutputMetric::new(move |value, labels| {
cloned.print(&metric, value, labels);
})
}
}
impl Flush for PrometheusScope {
fn flush(&self) -> error::Result<()> {
let metric_families = self.registry.gather();
let mut buffer = vec![];
let buf = self.buffer.borrow_mut();
self.flush_inner(buf)
}
}
match self.encoding {
PrometheusEncoding::JSON => {
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
PrometheusEncoding::PROTOBUF => {
let encoder = ProtobufEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
impl PrometheusScope {
fn print(&self, metric: &PrometheusMetric, value: MetricValue, labels: Labels) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let mut strbuf = String::new();
// prometheus format be like `http_requests_total{method="post",code="200"} 1027 1395066363000`
strbuf.push_str(&metric.prefix);
let labels_map = labels.into_map();
if !labels_map.is_empty() {
strbuf.push('{');
let mut i = labels_map.into_iter();
let mut next = i.next();
while let Some((k, v)) = next {
strbuf.push_str(&k);
strbuf.push_str("=\"");
strbuf.push_str(&v);
next = i.next();
if next.is_some() {
strbuf.push_str("\",");
} else {
strbuf.push('"');
}
}
strbuf.push_str("} ");
} else {
strbuf.push(' ');
}
strbuf.push_str(&value_str);
let mut socket = self.socket.write().expect("Lock Prometheus Socket");
Ok(socket.write_all(&mut buffer)?)
let buffer = self.buffer.borrow_mut();
if strbuf.len() + buffer.len() > BUFFER_FLUSH_THRESHOLD {
metrics::PROMETHEUS_OVERFLOW.mark();
warn!("Prometheus Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD);
let _ = self.flush_inner(buffer);
} else {
if self.get_buffering().is_none() {
if let Err(e) = self.flush_inner(buffer) {
debug!("Could not send to Prometheus {}", e)
}
}
}
}
fn flush_inner(&self, mut buf: RefMut<String>) -> error::Result<()> {
if buf.is_empty() { return Ok(()) }
match minreq::get(self.push_url.as_ref()).with_body(buf.as_ref()).send() {
Ok(_res) => {
metrics::PROMETHEUS_SENT_BYTES.count(buf.len());
trace!("Sent {} bytes to Prometheus", buf.len());
buf.clear();
Ok(())
}
Err(e) => {
metrics::PROMETHEUS_SEND_ERR.mark();
debug!("Failed to send buffer to Prometheus: {}", e);
Err(e.into())
}
}
}
}
@ -150,3 +160,55 @@ impl WithAttributes for PrometheusScope {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl Buffered for PrometheusScope {}
impl queue_out::QueuedOutput for Prometheus {}
impl cache_out::CachedOutput for Prometheus {}
/// Its hard to see how a single scope could get more metrics than this.
// TODO make configurable?
const BUFFER_FLUSH_THRESHOLD: usize = 65_536;
/// Key of a Prometheus metric.
#[derive(Debug, Clone)]
pub struct PrometheusMetric {
prefix: String,
scale: isize,
}
/// Any remaining buffered data is flushed on Drop.
impl Drop for PrometheusScope {
fn drop(&mut self) {
if let Err(err) = self.flush() {
warn!("Could not flush Prometheus metrics upon Drop: {}", err)
}
}
}
#[cfg(feature = "bench")]
mod bench {
use core::attributes::*;
use core::input::*;
use super::*;
use test;
#[bench]
pub fn immediate_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003").unwrap().input();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
#[bench]
pub fn buffering_prometheus(b: &mut test::Bencher) {
let sd = Prometheus::push_to("localhost:2003").unwrap()
.buffered(Buffering::BufferSize(65465)).input();
let timer = sd.new_metric("timer".into(), InputKind::Timer);
b.iter(|| test::black_box(timer.write(2000, labels![])));
}
}

View File

@ -86,7 +86,7 @@ impl OutputScope for StatsdScope {
suffix.push('|');
suffix.push_str(match kind {
InputKind::Marker | InputKind::Counter => "c",
InputKind::Gauge => "g",
InputKind::Gauge | InputKind::Level => "g",
InputKind::Timer => "ms",
});