renamed core structs, optimized statsd, added statsd sampling field

This commit is contained in:
Francis Lalonde 2017-07-21 10:08:52 -04:00
parent 4669be5a3b
commit 98fe76d761
10 changed files with 125 additions and 139 deletions

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use core::{MetricType, RateType, Value, MetricWriter, SinkMetric, MetricSink};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::usize;
@ -71,13 +71,13 @@ impl AggregateMetric {
}
}
impl DefinedMetric for Arc<AggregateMetric> {
impl SinkMetric for Arc<AggregateMetric> {
}
pub struct AggregateWrite {
}
impl MetricWrite<Arc<AggregateMetric>> for AggregateWrite {
impl MetricWriter<Arc<AggregateMetric>> for AggregateWrite {
fn write(&self, metric: &Arc<AggregateMetric>, value: Value) {
metric.write(value as usize);
}
@ -112,7 +112,7 @@ impl AggregateChannel {
}
impl MetricChannel for AggregateChannel {
impl MetricSink for AggregateChannel {
type Metric = Arc<AggregateMetric>;
type Write = AggregateWrite;
@ -144,7 +144,7 @@ impl MetricChannel for AggregateChannel {
mod bench {
use super::AggregateChannel;
use core::{MetricType, MetricChannel, MetricWrite};
use core::{MetricType, MetricSink, MetricWriter};
use test::Bencher;
#[bench]

View File

@ -1,15 +1,15 @@
//// Aggregate Source
use core::{MetricChannel, MetricType, MetricWrite, MetricPublish};
use core::{MetricSink, MetricType, MetricWriter, MetricSource};
use aggregate::sink::{ScoreIterator, AggregateScore};
/// publisher from aggregate metrics to target channel
pub struct AggregateSource<C: MetricChannel> {
pub struct AggregateSource<C: MetricSink> {
target: C,
scores: ScoreIterator,
}
impl <C: MetricChannel> AggregateSource<C> {
impl <C: MetricSink> AggregateSource<C> {
/// create new publisher from aggregate metrics to target channel
pub fn new(target: C, scores: ScoreIterator) -> AggregateSource<C> {
@ -17,7 +17,7 @@ impl <C: MetricChannel> AggregateSource<C> {
}
}
impl <C: MetricChannel> MetricPublish for AggregateSource<C> {
impl <C: MetricSink> MetricSource for AggregateSource<C> {
/// define and write metrics from aggregated scores to the target channel
fn publish(&self) {

View File

@ -1,26 +1,6 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use cached::SizedCache;
////////////
//pub struct InstrumentCacheMetric<M: DefinedMetric> {
// target: M
//}
//
//impl <M: DefinedMetric> DefinedMetric for InstrumentCacheMetric<M> {}
//
//pub struct InstrumentCacheWrite<C: MetricChannel> {
// target: C,
//}
//
//impl <C: MetricChannel> MetricWrite<InstrumentCacheMetric<<C as MetricChannel>::Metric>> for InstrumentCacheWrite<C> {
//
// fn write(&self, metric: &InstrumentCacheMetric<<C as MetricChannel>::Metric>, value: Value) {
// debug!("InstrumentCache");
// self.target.write(|scope| scope.write(&metric.target, value))
// }
//}
pub struct InstrumentCacheChannel<C: MetricChannel> {
target: C,
cache: SizedCache<String, C::Metric>,

View File

@ -64,7 +64,7 @@ pub trait MetricDispatch {
fn scope<F>(&self, operations: F) where F: Fn(&Self::Scope);
}
pub trait MetricPublish {
pub trait MetricSource {
fn publish(&self);
}
@ -82,15 +82,15 @@ macro_rules! time {
// CHANNEL
pub trait DefinedMetric {}
pub trait SinkMetric {}
pub trait MetricWrite<M: DefinedMetric> {
pub trait MetricWriter<M: SinkMetric> {
fn write(&self, metric: &M, value: Value);
}
pub trait MetricChannel {
type Metric: DefinedMetric;
type Write: MetricWrite<Self::Metric>;
pub trait MetricSink {
type Metric: SinkMetric;
type Write: MetricWriter<Self::Metric>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> Self::Metric;
fn write<F>(&self, operations: F) where F: Fn(&Self::Write);
}

View File

@ -1,47 +1,47 @@
use core::{MetricType, Value, MetricWrite, MetricChannel, MetricDispatch, EventMetric, ValueMetric, TimerMetric, MetricScope};
use core::{MetricType, Value, MetricWriter, MetricSink, MetricDispatch, EventMetric, ValueMetric, TimerMetric, MetricScope};
use std::rc::Rc;
use std::cell::RefCell;
use thread_local::ThreadLocal;
////////////
pub struct DirectEvent<C: MetricChannel> {
metric: <C as MetricChannel>::Metric,
pub struct DirectEvent<C: MetricSink> {
metric: <C as MetricSink>::Metric,
target: Rc<C>,
}
pub struct DirectValue<C: MetricChannel> {
metric: <C as MetricChannel>::Metric,
pub struct DirectValue<C: MetricSink> {
metric: <C as MetricSink>::Metric,
target: Rc<C>,
}
pub struct DirectTimer<C: MetricChannel> {
metric: <C as MetricChannel>::Metric,
pub struct DirectTimer<C: MetricSink> {
metric: <C as MetricSink>::Metric,
target: Rc<C>,
}
pub struct DirectScope {
}
impl <C: MetricChannel> EventMetric for DirectEvent<C> {
impl <C: MetricSink> EventMetric for DirectEvent<C> {
fn event(&self) {
self.target.write(|scope| scope.write(&self.metric, 1))
}
}
impl <C: MetricChannel> ValueMetric for DirectValue<C> {
impl <C: MetricSink> ValueMetric for DirectValue<C> {
fn value(&self, value: Value) {
self.target.write(|scope| scope.write(&self.metric, value))
}
}
impl <C: MetricChannel> ValueMetric for DirectTimer<C> {
impl <C: MetricSink> ValueMetric for DirectTimer<C> {
fn value(&self, value: Value) {
self.target.write(|scope| scope.write(&self.metric, value))
}
}
impl <C: MetricChannel> TimerMetric for DirectTimer<C> {}
impl <C: MetricSink> TimerMetric for DirectTimer<C> {}
impl MetricScope for DirectScope {
fn set_property<S: AsRef<str>>(&self, key: S, value: S) -> &Self {
@ -49,13 +49,11 @@ impl MetricScope for DirectScope {
}
}
pub struct DirectDispatch<C: MetricChannel> {
pub struct DirectDispatch<C: MetricSink> {
target: Rc<C>
}
impl <C: MetricChannel> DirectDispatch<C> {
impl <C: MetricSink> DirectDispatch<C> {
pub fn new(target: C) -> DirectDispatch<C> {
DirectDispatch { target: Rc::new(target) }
}
@ -65,7 +63,7 @@ thread_local! {
static DISPATCH_SCOPE: RefCell<DirectScope> = RefCell::new(DirectScope {});
}
impl <C: MetricChannel> MetricDispatch for DirectDispatch<C> {
impl <C: MetricSink> MetricDispatch for DirectDispatch<C> {
type Event = DirectEvent<C>;
type Value = DirectValue<C>;
type Timer = DirectTimer<C>;

View File

@ -1,40 +1,35 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use core::{MetricType, RateType, Value, MetricWriter, SinkMetric, MetricSink};
////////////
pub struct DualMetric<M1: DefinedMetric, M2: DefinedMetric> {
pub struct DualMetric<M1: SinkMetric, M2: SinkMetric> {
metric_1: M1,
metric_2: M2,
}
impl <M1: DefinedMetric, M2: DefinedMetric> DefinedMetric for DualMetric<M1, M2> {}
impl <M1: SinkMetric, M2: SinkMetric> SinkMetric for DualMetric<M1, M2> {}
pub struct DualWrite<C1: MetricChannel, C2: MetricChannel> {
pub struct DualWriter<C1: MetricSink, C2: MetricSink> {
channel_a: C1,
channel_b: C2,
}
impl <C1: MetricChannel, C2: MetricChannel> MetricWrite<DualMetric<<C1 as MetricChannel>::Metric, <C2 as MetricChannel>::Metric>> for DualWrite<C1, C2> {
fn write(&self, metric: &DualMetric<<C1 as MetricChannel>::Metric, <C2 as MetricChannel>::Metric>, value: Value) {
println!("Channel A");
impl <C1: MetricSink, C2: MetricSink> MetricWriter<DualMetric<<C1 as MetricSink>::Metric, <C2 as MetricSink>::Metric>> for DualWriter<C1, C2> {
fn write(&self, metric: &DualMetric<<C1 as MetricSink>::Metric, <C2 as MetricSink>::Metric>, value: Value) {
self.channel_a.write(|scope| scope.write(&metric.metric_1, value));
println!("Channel B");
self.channel_b.write(|scope| scope.write(&metric.metric_2, value));
}
}
pub struct DualChannel<C1: MetricChannel, C2: MetricChannel> {
write: DualWrite<C1, C2>
pub struct DualSink<C1: MetricSink, C2: MetricSink> {
write: DualWriter<C1, C2>
}
impl <C1: MetricChannel, C2: MetricChannel> DualChannel<C1, C2> {
pub fn new(channel_a: C1, channel_b: C2) -> DualChannel<C1, C2> {
DualChannel { write: DualWrite {channel_a, channel_b}}
impl <C1: MetricSink, C2: MetricSink> DualSink<C1, C2> {
pub fn new(channel_a: C1, channel_b: C2) -> DualSink<C1, C2> {
DualSink { write: DualWriter {channel_a, channel_b}}
}
}
impl <C1: MetricChannel, C2: MetricChannel> MetricChannel for DualChannel<C1, C2> {
impl <C1: MetricSink, C2: MetricSink> MetricSink for DualSink<C1, C2> {
type Metric = DualMetric<C1::Metric, C2::Metric>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> DualMetric<C1::Metric, C2::Metric> {
@ -43,7 +38,7 @@ impl <C1: MetricChannel, C2: MetricChannel> MetricChannel for DualChannel<C1, C2
DualMetric { metric_1, metric_2 }
}
type Write = DualWrite<C1, C2>;
type Write = DualWriter<C1, C2>;
fn write<F>(&self, operations: F )
where F: Fn(&Self::Write) {

View File

@ -26,15 +26,14 @@ pub mod mlog;
//pub mod cache;
pub mod pcg32;
use dual::DualChannel;
use dual::DualSink;
use dispatch::DirectDispatch;
use sampling::SamplingChannel;
use statsd::StatsdChannel;
use mlog::LogChannel;
use sampling::RandomSamplingSink;
use statsd::StatsdSink;
use mlog::LogSink;
use aggregate::sink::{AggregateChannel};
use aggregate::source::{AggregateSource};
use core::{MetricType, MetricChannel, MetricWrite, MetricDispatch, ValueMetric, TimerMetric, MetricPublish};
use std::sync::atomic::{Ordering};
use core::{MetricType, MetricSink, MetricWriter, MetricDispatch, ValueMetric, TimerMetric, MetricSource};
use std::thread::sleep;
use scheduled_executor::{CoreExecutor};
use std::time::Duration;
@ -57,7 +56,7 @@ pub fn sample_scheduled_statsd_aggregation() {
let timer = app_metrics.new_timer("timer_a");
// send aggregated metrics to statsd
let statsd = StatsdChannel::new("localhost:8125", "hello.").unwrap();
let statsd = StatsdSink::new("localhost:8125", "hello.").unwrap();
let aggregate_metrics = AggregateSource::new(statsd, scores);
// collect every three seconds
@ -79,17 +78,17 @@ pub fn sample_scheduled_statsd_aggregation() {
pub fn logging_and_statsd() {
let statsd = StatsdChannel::new("localhost:8125", "goodbye.").unwrap();
let logging = LogChannel::new("metrics");
let logging_and_statsd = DualChannel::new( logging, statsd );
let statsd = StatsdSink::new("localhost:8125", "goodbye.").unwrap();
let logging = LogSink::new("metrics");
let logging_and_statsd = DualSink::new(logging, statsd );
DirectDispatch::new(logging_and_statsd);
}
pub fn sampling_statsd() {
let statsd = StatsdChannel::new("localhost:8125", "goodbye.").unwrap();
let sampling_statsd = SamplingChannel::new(statsd, 0.1);
let statsd = StatsdSink::new("localhost:8125", "goodbye.").unwrap();
let sampling_statsd = RandomSamplingSink::new(statsd, 0.1);
DirectDispatch::new(sampling_statsd);
}
@ -97,7 +96,7 @@ pub fn sampling_statsd() {
pub fn raw_write() {
// setup dual metric channels
let metrics_log = LogChannel::new("metrics");
let metrics_log = LogSink::new("metrics");
// define and send metrics using raw channel API
let counter = metrics_log.define(MetricType::Count, "count_a", 1.0);
@ -105,7 +104,7 @@ pub fn raw_write() {
}
pub fn counter_to_log() {
let metrics_log = LogChannel::new("metrics");
let metrics_log = LogSink::new("metrics");
let metrics = DirectDispatch::new(metrics_log);
let counter = metrics.new_count("count_a");
counter.value(1);

View File

@ -1,4 +1,4 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use core::{MetricType, RateType, Value, MetricWriter, SinkMetric, MetricSink};
//////////// Log Channel
@ -6,37 +6,37 @@ pub struct LogMetric {
prefix: String
}
impl DefinedMetric for LogMetric {}
impl SinkMetric for LogMetric {}
pub struct LogWrite {}
pub struct LogWriter {}
impl MetricWrite<LogMetric> for LogWrite {
impl MetricWriter<LogMetric> for LogWriter {
fn write(&self, metric: &LogMetric, value: Value) {
// TODO format faster
info!("{}:{}", metric.prefix, value)
}
}
pub struct LogChannel {
pub struct LogSink {
prefix: String,
write: LogWrite
write: LogWriter
}
impl LogChannel {
pub fn new<S: AsRef<str>>(prefix: S) -> LogChannel {
impl LogSink {
pub fn new<S: AsRef<str>>(prefix: S) -> LogSink {
let prefix = prefix.as_ref().to_string();
LogChannel { prefix, write: LogWrite {}}
LogSink { prefix, write: LogWriter {}}
}
}
impl MetricChannel for LogChannel {
impl MetricSink for LogSink {
type Metric = LogMetric;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> LogMetric {
LogMetric { prefix: format!("{:?}:{}{}", m_type, self.prefix, name.as_ref())}
}
type Write = LogWrite;
type Write = LogWriter;
fn write<F>(&self, metrics: F ) where F: Fn(&Self::Write) {
metrics(&self.write)

View File

@ -1,45 +1,45 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use core::{MetricType, RateType, Value, MetricWriter, SinkMetric, MetricSink};
use pcg32;
pub struct SamplingMetric<M: DefinedMetric> {
pub struct RandomSamplingMetric<M: SinkMetric> {
target: M,
int_sampling_rate: u32,
}
impl <M: DefinedMetric> DefinedMetric for SamplingMetric<M> {}
impl <M: SinkMetric> SinkMetric for RandomSamplingMetric<M> {}
pub struct SamplingWrite<C: MetricChannel> {
pub struct RandomSamplingWriter<C: MetricSink> {
target: C,
}
impl <C: MetricChannel> MetricWrite<SamplingMetric<<C as MetricChannel>::Metric>> for SamplingWrite<C> {
impl <C: MetricSink> MetricWriter<RandomSamplingMetric<<C as MetricSink>::Metric>> for RandomSamplingWriter<C> {
fn write(&self, metric: &SamplingMetric<<C as MetricChannel>::Metric>, value: Value) {
fn write(&self, metric: &RandomSamplingMetric<<C as MetricSink>::Metric>, value: Value) {
if pcg32::accept_sample(metric.int_sampling_rate) {
self.target.write(|scope| scope.write(&metric.target, value))
}
}
}
pub struct SamplingChannel<C: MetricChannel> {
write: SamplingWrite<C>,
pub struct RandomSamplingSink<C: MetricSink> {
write: RandomSamplingWriter<C>,
sampling_rate: RateType,
}
impl <C: MetricChannel> SamplingChannel<C> {
pub fn new(target: C, sampling_rate: RateType) -> SamplingChannel<C> {
SamplingChannel { write: SamplingWrite { target }, sampling_rate}
impl <C: MetricSink> RandomSamplingSink<C> {
pub fn new(target: C, sampling_rate: RateType) -> RandomSamplingSink<C> {
RandomSamplingSink { write: RandomSamplingWriter { target }, sampling_rate}
}
}
impl <C: MetricChannel> MetricChannel for SamplingChannel<C> {
type Metric = SamplingMetric<C::Metric>;
type Write = SamplingWrite<C>;
impl <C: MetricSink> MetricSink for RandomSamplingSink<C> {
type Metric = RandomSamplingMetric<C::Metric>;
type Write = RandomSamplingWriter<C>;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> SamplingMetric<C::Metric> {
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> RandomSamplingMetric<C::Metric> {
let pm = self.write.target.define(m_type, name, self.sampling_rate);
SamplingMetric { target: pm, int_sampling_rate: pcg32::to_int_rate(self.sampling_rate) }
RandomSamplingMetric { target: pm, int_sampling_rate: pcg32::to_int_rate(self.sampling_rate) }
}
fn write<F>(&self, operations: F ) where F: Fn(&Self::Write) {

View File

@ -1,17 +1,14 @@
use core::{MetricType, RateType, Value, MetricWrite, DefinedMetric, MetricChannel};
use core::{MetricType, RateType, Value, MetricWriter, SinkMetric, MetricSink};
use std::net::UdpSocket;
use std::io::Result;
use std::cell::RefCell;
////////////
pub struct StatsdMetric {
m_type: MetricType,
name: String,
sample: RateType
prefix: String,
suffix: String,
}
impl DefinedMetric for StatsdMetric {}
impl SinkMetric for StatsdMetric {}
/// Use a safe maximum size for UDP to prevent fragmentation.
const MAX_UDP_PAYLOAD: usize = 576;
@ -20,9 +17,8 @@ thread_local! {
static SEND_BUFFER: RefCell<String> = RefCell::new(String::with_capacity(MAX_UDP_PAYLOAD));
}
pub struct StatsdWrite {
pub struct StatsdWriter {
socket: UdpSocket,
prefix: String,
}
fn flush(payload: &mut String, socket: &UdpSocket) {
@ -32,61 +28,79 @@ fn flush(payload: &mut String, socket: &UdpSocket) {
payload.clear();
}
impl MetricWrite<StatsdMetric> for StatsdWrite {
impl MetricWriter<StatsdMetric> for StatsdWriter {
fn write(&self, metric: &StatsdMetric, value: Value) {
// TODO add metric sample rate
// TODO preformat per metric
let entry = format!("{}{}:{}|{}", self.prefix, metric.name, value, match metric.m_type {
MetricType::Event | MetricType::Count => "c",
MetricType::Gauge => "g",
MetricType::Time => "ms"
});
let value_str = value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
SEND_BUFFER.with(|cell| {
let ref mut buf = cell.borrow_mut();
if entry.len() > buf.capacity() {
if entry_len > buf.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
}
let remaining = buf.capacity() - buf.len();
if entry.len() + 1 > remaining {
if entry_len + 1 > remaining {
// buffer is full, flush before appending
flush(buf, &self.socket);
} else {
if !buf.is_empty() {
// separate from previous entry
buf.push('\n')
}
buf.push_str(&entry);
buf.push_str(&metric.prefix);
buf.push_str(&value_str);
buf.push_str(&metric.suffix);
}
});
}
}
pub struct StatsdChannel {
write: StatsdWrite
/// Allows sending metrics to a statsd server
pub struct StatsdSink {
write: StatsdWriter,
prefix: String,
}
impl StatsdChannel {
/// Create a new statsd channel
pub fn new<S: AsRef<str>>(address: &str, prefix_str: S) -> Result<StatsdChannel> {
impl StatsdSink {
/// Create a new statsd sink to the specified address with the specified prefix
pub fn new<S: AsRef<str>>(address: &str, prefix_str: S) -> Result<StatsdSink> {
let socket = UdpSocket::bind("0.0.0.0:0")?; // NB: CLOEXEC by default
socket.set_nonblocking(true)?;
socket.connect(address)?;
Ok(StatsdChannel { write: StatsdWrite { socket, prefix: prefix_str.as_ref().to_string() }})
Ok(StatsdSink { write: StatsdWriter { socket }, prefix: prefix_str.as_ref().to_string()})
}
}
impl MetricChannel for StatsdChannel {
impl MetricSink for StatsdSink {
type Metric = StatsdMetric;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> StatsdMetric {
StatsdMetric {m_type, name: name.as_ref().to_string(), sample}
let mut prefix = String::with_capacity(32);
prefix.push_str(&self.prefix);
prefix.push_str(name.as_ref());
prefix.push(':');
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match m_type {
MetricType::Event | MetricType::Count => "c",
MetricType::Gauge => "g",
MetricType::Time => "ms"
});
if (sample < 1.0) {
suffix.push('@');
suffix.push_str(&sample.to_string());
}
StatsdMetric {prefix, suffix}
}
type Write = StatsdWrite;
type Write = StatsdWriter;
fn write<F>(&self, metrics: F ) where F: Fn(&Self::Write) {
metrics(&self.write);