Fix statsd sampling format missing pipe, add statsd benchmark

This commit is contained in:
Francis Lalonde 2017-12-06 10:56:20 -05:00
parent 2ce8575f02
commit 1dbb21f8b0
4 changed files with 50 additions and 34 deletions

View File

@ -8,14 +8,14 @@ fn main() {
let metrics = metrics(
// Metric caching allows re-use of the counter, skipping cost of redefining it on each use.
cache(12, (
to_statsd("localhost:8125", "myapp.").expect(
"Could not connect to statsd",
),
sample(0.01, to_statsd("localhost:8125", "myapp.").expect("Could not connect to statsd")),
to_stdout(),
)),
);
loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);
std::thread::sleep_ms(40);
}
}

View File

@ -258,7 +258,7 @@ where
#[cfg(feature = "bench")]
mod microbench {
mod bench {
use ::*;
use test;

View File

@ -53,7 +53,7 @@ pub enum Kind {
Timer,
}
/// Scope creation function.
/// Scope creation functio
/// Returns a callback function to send commands to the metric scope.
/// Used to write values to the scope or flush the scope buffer (if applicable).
/// Simple applications may use only one scope.

View File

@ -57,6 +57,31 @@ impl Drop for ScopeBuffer {
}
impl ScopeBuffer {
fn write (&mut self, metric: &StatsdMetric, value: Value) {
let scaled_value = value / metric.scale;
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
if entry_len > self.buffer.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
}
let remaining = self.buffer.capacity() - self.buffer.len();
if entry_len + 1 > remaining {
// buffer is full, flush before appending
self.flush();
} else {
if !self.buffer.is_empty() {
// separate from previous entry
self.buffer.push('\n')
}
self.buffer.push_str(&metric.prefix);
self.buffer.push_str(&value_str);
self.buffer.push_str(&metric.suffix);
}
}
fn flush(&mut self) {
if !self.buffer.is_empty() {
match self.socket.send(self.buffer.as_bytes()) {
@ -97,11 +122,12 @@ impl Sink<StatsdMetric> for StatsdSink {
});
if sampling < FULL_SAMPLING_RATE {
suffix.push('@');
suffix.push_str("|@");
suffix.push_str(&sampling.to_string());
}
let scale = match kind {
// timers are in µs, statsd wants ms
Kind::Timer => 1000,
_ => 1,
};
@ -122,37 +148,27 @@ impl Sink<StatsdMetric> for StatsdSink {
Arc::new(move |cmd| {
if let Ok(mut buf) = buf.try_write() {
match cmd {
Scope::Write(metric, value) => {
let scaled_value = if metric.scale != 1 {
value / metric.scale
} else {
value
};
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
if entry_len > buf.buffer.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
}
let remaining = buf.buffer.capacity() - buf.buffer.len();
if entry_len + 1 > remaining {
// buffer is full, flush before appending
buf.flush();
} else {
if !buf.buffer.is_empty() {
// separate from previous entry
buf.buffer.push('\n')
}
buf.buffer.push_str(&metric.prefix);
buf.buffer.push_str(&value_str);
buf.buffer.push_str(&metric.suffix);
}
},
Scope::Write(metric, value) => buf.write(metric, value),
Scope::Flush => buf.flush(),
}
}
})
}
}
#[cfg(feature = "bench")]
mod bench {
use super::*;
use test;
#[bench]
pub fn timer_statsd(b: &mut test::Bencher) {
let sd = to_statsd("localhost:8125", "a.").unwrap();
let timer = sd.new_metric(Kind::Timer, "timer", 1000000.0);
let scope = sd.new_scope(false);
b.iter(|| test::black_box(scope.as_ref()(Scope::Write(&timer, 2000))));
}
}