This commit is contained in:
Francis Lalonde 2017-06-27 06:04:02 -04:00
commit 009dd5cdd1
6 changed files with 578 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target/
**/*.rs.bk

61
Cargo.lock generated Normal file
View File

@ -0,0 +1,61 @@
[root]
name = "bladergator"
version = "0.1.0"
dependencies = [
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libc"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "redox_syscall"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "time"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3b37545ab726dd833ec6420aaba8231c5b320814b9029ad585555d2a03e94fbf"
"checksum libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)" = "38f5c2b18a287cf78b4097db62e20f43cace381dc76ae5c0a3073067f78b7ddc"
"checksum redox_syscall 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "3041aeb6000db123d2c9c751433f526e1f404b23213bd733167ab770c3989b4d"
"checksum time 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "ffd7ccbf969a892bf83f1e441126968a07a3941c24ff522a26af9f9f4585d1a3"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"

8
Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "bladergator"
version = "0.1.0"
authors = ["Francis Lalonde <francis.lalonde@adgear.com>"]
[dependencies]
lazy_static = "0.2.8"
time = "0.1"

299
src/main.rs Normal file
View File

@ -0,0 +1,299 @@
#![cfg_attr(feature = "bench", feature(test))]
#[cfg(feature="bench")]
extern crate test;
extern crate time;
extern crate lazy_static;
use test::test::Bencher;
mod pcg32;
use pcg32::pcg32_random;
//////////////////
// DEFINITIONS
type ValueType = f32;
type TimeType = u64;
type RateType = f32;
//type ChannelTimebase = [u32; 2]; // u32 base (alternating) + u32 offset = u64 time
enum MetricType {
Event,
Count,
Gauge,
Time,
}
///////////////////
// GLOBALS
lazy_static! {
static ref SKIP_SCOPE: CloseScope = SkipScope {};
}
//////////////////
// CONTRACT
// INSTRUMENTATION (API CONTRACT)
trait Event {
fn mark(&self);
}
trait Value {
fn value(&self, value: ValueType);
}
trait Time {
fn start() -> TimeType {}
fn time(&self, start_time: TimeType);
}
trait Scope {
fn open_scope(&self) -> OpenedScope;
}
trait TagEvent {
fn tag_event(&self, tags: Option<&[S]>);}
trait TagValue {
fn tag_value(&self, value: ValueType, tags: Option<&[S]>);
}
trait OpenedScope {
fn close_scope(self);
}
// CHANNEL
/// Base instruments
trait Meter {
fn new_event<S: AsRef<str>>(&self, name: S) -> Event;
fn new_count<S: AsRef<str>>(&self, name: S) -> Value;
fn new_timer<S: AsRef<str>>(&self, name: S) -> Value;
fn new_gauge<S: AsRef<str>>(&self, name: S) -> Value;
fn new_scope<S: AsRef<str>>(&self, name: S) -> Scope;
}
/// Per-instrument sampling
trait SampleMeter {
fn new_sample_event<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Event;
fn new_sample_count<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_timer<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_gauge<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_scope<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Scope;
}
/// Tag instruments
trait TagMeter {
fn new_tag_event<S: AsRef<str>>(&self, name: S) -> TagEvent;
fn new_tag_count<S: AsRef<str>>(&self, name: S) -> TagValue;
fn new_tag_timer<S: AsRef<str>>(&self, name: S) -> TagValue;
fn new_tag_gauge<S: AsRef<str>>(&self, name: S) -> TagValue;
}
// (SPI CONTRACT)
// OUTPUT
type ValueOut = Fn(ValueType) -> ();
type TagValueOut = Fn(ValueType, Option<&[AsRef<str>]>) -> ();
trait ChannelOutput {
fn new_value<S: AsRef<str>>(&self, name: S, m_type: MetricType, sampling: RateType) -> ValueOut;
fn new_tag_value<S: AsRef<str>>(&self, m_type: MetricType, name: S) -> TagValueOut;
fn new_scope<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Scope;
fn write<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType, value: ValueType, tags: Option<&[S]>) {
self.new_value(m_type, name, sampling).call_mut(value)
}
fn tag_write<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType, value: ValueType, tags: Option<&[S]>) {
self.new_tag_value(m_type, name, sampling).call_mut(value, tags)
}
fn open_scope<S: AsRef<str>>(&self, scope_name: S, sampling: RateType) {
self.new_scope(name, sampling).open_scope()
}
fn in_scope<S: AsRef<str>>(&self, scope_name: S, sampling: RateType, mut block: FnMut(ChannelOutput) -> ()) {
let scope = self.new_scope(name, sampling).open_scope();
block.call_mut(self);
scope.close_scope()
}
}
/// A convenience macro to wrap a block or an expression with a start / stop timer.
/// Elapsed time is sent to the supplied statsd client after the computation has been performed.
/// Expression result (if any) is transparently returned.
#[macro_export]
macro_rules! time {
($client: expr, $key: expr, $body: block) => (
let start_time = $client.start_time();
$body
$client.stop_time($key, start_time);
);
}
//////////////////
// IMPLEMENTATION
// SKIP SCOPE
struct SkipScope {}
impl OpenedScope for SkipScope {
fn close_scope(self) {}
}
struct Metric {
out: ValueOut,
}
struct SampleMetric {
sampling: RateType,
out: ValueOut,
}
struct TagMetric {
out: TagValueOut,
}
impl Event for Metric {
fn mark(&self) {
self.out.call(1.0);
}
}
impl Value for Metric {
fn value(&self, value: ValueType) {
self.out.call(value);
}
}
impl Scope for Metric {
fn open_scope(&self) {
self.out.open_scope()
}
}
impl Event for SampleMetric {
fn mark(&self) {
if pcg32_random() < sampling {
self.out.call(1.0);
}
}
}
impl Value for SampleMetric {
fn value(&self, value: ValueType) {
if pcg32_random() < sampling {
self.out.call(value);
}
}
}
impl Scope for SampleMetric {
fn open_scope(&self) {
if pcg32_random() < sampling {
out.open_scope()
} else {
SKIP_SCOPE
}
}
}
impl TagEvent for TagMetric {
fn tag_event(&self, tags: Option<&[S]>) {
self.out.call(1.0, tags);
}
}
impl TagValue for TagMetric {
fn tag_value(&self, value: ValueType, tags: Option<&[S]>) {
self.out.call(value, tags);
}
}
/// A point in time from which elapsed time can be determined
pub struct StartTime (u64);
impl StartTime {
/// The number of milliseconds elapsed between now and this StartTime
fn elapsed_ms(self) -> u64 {
(time::precise_time_ns() - self.0) / 1_000_000
}
}
/// eager aggregation
/// expand every new_* to many new_*
struct AggregatingBuffer {
}
/// lazy aggregation
/// expand every new_* to many new_*
struct BufferAggregator {
}
struct Joined {
}
// flush when scope closed
// unscoped passthru
struct ScopeBuffer {
}
// flush every n metrics
struct CountBuffer {
}
// flush every n millis
struct TimeBuffer {
}
// flush every n metrics
struct Buffer {
}
// separate thread
struct Async {
}
struct RandomSampler {
}
struct TimeSampler {
}
#[test]
mod test {
}
#[bench]
fn bench_trait(b: &mut Bencher) {
b.iter(|| {});
}

43
src/pcg32.rs Normal file
View File

@ -0,0 +1,43 @@
/// PCG32 random number generation for fast sampling
// TODO use https://github.com/codahale/pcg instead?
use std::cell::RefCell;
use time;
fn seed() -> u64 {
let seed = 5573589319906701683_u64;
let seed = seed.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407)
.wrapping_add(time::precise_time_ns());
seed.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407)
}
fn pcg32_random() -> u32 {
thread_local! {
static PCG32_STATE: RefCell<u64> = RefCell::new(seed());
}
PCG32_STATE.with(|state| {
let oldstate: u64 = *state.borrow();
// XXX could generate the increment from the thread ID
*state.borrow_mut() = oldstate.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
((((oldstate >> 18) ^ oldstate) >> 27) as u32)
.rotate_right((oldstate >> 59) as u32)
})
}
/// Convert a floating point sampling rate to an integer so that a fast integer RNG can be used
/// Float rate range is between 1.0 (send 100% of the samples) and 0.0 (_no_ samples taken)
/// . | float rate | int rate | percentage
/// ---- | ---------- | -------- | ----
/// all | 1.0 | 0x0 | 100%
/// none | 0.0 | 0xFFFFFFFF | 0%
fn to_int_rate(float_rate: f64) -> u32 {
assert!(float_rate <= 1.0 && float_rate >= 0.0);
((1.0 - float_rate) * ::std::u32::MAX as f64) as u32
}
fn accept_sample(int_rate: u32) -> bool {
pcg32::random() > int_rate
}

165
src/statsd.rs Normal file
View File

@ -0,0 +1,165 @@
use std::net::UdpSocket;
use std::io::Result;
/// Use a safe maximum size for UDP to prevent fragmentation.
const MAX_UDP_PAYLOAD: usize = 576;
pub const FULL_SAMPLING_RATE: f64 = 1.0;
pub trait SendStats: Sized {
fn send_stats(&self, str: String);
}
/// Real implementation, send a UDP packet for every stat
impl SendStats for UdpSocket {
fn send_stats(&self, str: String) {
match self.send(str.as_bytes()) {
Ok(_) => {}, // TODO count packets sent for batch reporting
_ => {}// TODO count send errors for batch reporting
}
}
}
/// A client to send application metrics to a statsd server over UDP.
/// Multiple instances may be required if different sampling rates or prefix a required within the same application.
pub struct StatsdOutlet<S: SendStats> {
sender: S,
prefix: String,
}
pub type StatsdClient = StatsdOutlet<UdpSocket>;
impl Statsd {
/// Create a new `StatsdClient` sending packets to the specified `address`.
/// Sent metric keys will be prepended with `prefix`.
/// Subsampling is performed according to `float_rate` where
/// - 1.0 is full sampling and
/// - 0.0 means _no_ samples will be taken
/// See crate method `to_int_rate` for more details and a nice table
pub fn new(address: &str, prefix_str: &str) -> Result<StatsdClient> {
let udp_socket = UdpSocket::bind("0.0.0.0:0")?; // NB: CLOEXEC by default
udp_socket.set_nonblocking(true)?;
udp_socket.connect(address)?;
StatsdOutlet::outlet(udp_socket, prefix_str, float_rate)
}
}
impl<S: SendStats> ChannelOutput for StatsdOutlet<S> {
/// Create a new `StatsdClient` sending packets to the specified `address`.
/// Sent metric keys will be prepended with `prefix`.
/// Subsampling is performed according to `float_rate` where
/// - 1.0 is full sampling and
/// - 0.0 means _no_ samples will be taken
/// See crate method `to_int_rate` for more details and a nice table
fn outlet(sender: S, prefix_str: &str, float_rate: f64) -> Result<StatsdOutlet<S>> {
assert!(float_rate <= 1.0 && float_rate >= 0.0);
let prefix = prefix_str.to_string();
let rate_suffix = if float_rate < 1.0 { format!("|@{}", float_rate)} else { "".to_string() };
Ok(StatsdOutlet {
sender,
prefix,
// time_suffix: format!("|ms{}", rate_suffix),
// gauge_suffix: format!("|g{}", rate_suffix),
// count_suffix: format!("|c{}", rate_suffix)
})
}
/// Report to statsd a count of items.
pub fn count(&self, key: &str, value: u64) {
if accept_sample(self.int_rate) {
let count = &value.to_string();
self.send( &[key, ":", count, &self.count_suffix] )
}
}
/// Report to statsd a non-cumulative (instant) count of items.
pub fn gauge(&self, key: &str, value: u64) {
if accept_sample(self.int_rate) {
let count = &value.to_string();
self.send( &[key, ":", count, &self.gauge_suffix] )
}
}
/// Report to statsd a time interval of items.
pub fn time_interval_ms(&self, key: &str, interval_ms: u64) {
if accept_sample(self.int_rate) {
self.send_time_ms(key, interval_ms);
}
}
/// Query current time to use eventually with `stop_time()`
pub fn start_time(&self) -> StartTime {
StartTime( time::precise_time_ns() )
}
/// An efficient timer that skips querying for stop time if sample will not be collected.
/// Caveat : Random sampling overhead of a few ns will be included in any reported time interval.
pub fn stop_time(&self, key: &str, start_time: StartTime) {
if accept_sample(self.int_rate) {
self.send_time_ms(key, start_time.elapsed_ms());
}
}
fn send_time_ms(&self, key: &str, interval_ms: u64) {
let value = &interval_ms.to_string();
self.send( &[key, ":", value, &self.time_suffix] )
}
/// Concatenate text parts into a single buffer and send it over UDP
fn send(&self, strings: &[&str]) {
let mut str = String::with_capacity(MAX_UDP_PAYLOAD);
str.push_str(&self.prefix);
for s in strings { str.push_str(s); }
self.sender.send_stats(str)
}
}
struct StatsdMetric {
port: Statsd,
prefix: String,
}
impl Event for StatsdMetric {
fn mark(&self) {
// static string "barry:1|c|@0.999"
port.send(prefix)
}
}
impl Value for StatsdMetric {
fn value(&self, value: ValueType) {
// insert value between prefix and suffix "barry:44|ms|@0.999"
port.send(format!("{}:{}|{}", prefix, value, suffix))
}
}
impl Scope for StatsdMetric {
fn open_scope(&self) -> OpenedScope {
// static string "barry:1|c|@0.999"
port.open_scope()
}
}
impl ChannelOut for Statsd {
fn new_value<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value {
let mut type_string = "|c".to_string();
if sampling < 1.0 {
type_string.push(format!("|{}", sampling));
}
StatsdMetric {
port: self,
name_string: name,
type_string: type_string
}
}
fn new_gauge<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Gauge {
}
fn new_timer<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Timer {
}
}