Split lib from examples, moved core:: to crate root

This commit is contained in:
Francis Lalonde 2017-08-06 12:01:21 -04:00
parent 2eacbb27fb
commit c48c9d8f81
18 changed files with 224 additions and 230 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target/
**/*.rs.bk
Cargo.lock

58
Cargo.lock generated
View File

@ -1,6 +1,6 @@
[root]
name = "dipstick"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"cached 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -11,6 +11,14 @@ dependencies = [
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "aggregate"
version = "0.0.0"
dependencies = [
"dipstick 0.2.0",
"scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bitflags"
version = "0.7.0"
@ -37,7 +45,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "cfg-if"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -72,7 +80,7 @@ name = "iovec"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -92,12 +100,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lazycell"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libc"
version = "0.2.24"
version = "0.2.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -124,14 +132,16 @@ dependencies = [
[[package]]
name = "mio"
version = "0.6.9"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -154,9 +164,9 @@ name = "net2"
version = "0.2.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -232,7 +242,7 @@ name = "num_cpus"
version = "1.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -240,13 +250,13 @@ name = "rand"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "redox_syscall"
version = "0.1.26"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -262,7 +272,7 @@ dependencies = [
"futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -289,21 +299,21 @@ version = "0.1.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-core"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -356,7 +366,7 @@ dependencies = [
"checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d"
"checksum bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8b24f16593f445422331a5eed46b72f7f171f910fead4f2ea8f17e727e9c5c14"
"checksum cached 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c431f1628b1621353ca82e852b2984f6f25eca8ddd92a347a18083f5eedf2989"
"checksum cfg-if 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d0c47d456a36ebf0536a6705c83c1cbbcb9255fbc1d905a6ded104f479268a29"
"checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de"
"checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299"
"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
"checksum futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4b63a4792d4f8f686defe3b39b92127fea6344de5d38202b2ee5a11bbbf29d6a"
@ -364,12 +374,12 @@ dependencies = [
"checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3b37545ab726dd833ec6420aaba8231c5b320814b9029ad585555d2a03e94fbf"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
"checksum libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)" = "38f5c2b18a287cf78b4097db62e20f43cace381dc76ae5c0a3073067f78b7ddc"
"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b"
"checksum libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)" = "8a014d9226c2cc402676fbe9ea2e15dd5222cd1dd57f576b5b283178c944a264"
"checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b"
"checksum magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf0336886480e671965f794bc9b6fce88503563013d1bfb7a502c81fe3ac527"
"checksum magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40d014c7011ac470ae28e2f76a02bfea4a8480f73e701353b49ad7a8d75f4699"
"checksum mio 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)" = "9e965267d4d58496fc4f740e9861118367f13570cadf66316ed2c3f2f14d87c7"
"checksum mio 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "dbd91d3bfbceb13897065e97b2ef177a09a438cb33612b2d371bf568819a9313"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum net2 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)" = "94101fd932816f97eb9a5116f6c1a11511a1fed7db21c5ccd823b2dc11abf566"
"checksum num 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "a311b77ebdc5dd4cf6449d81e4135d9f0e3b153839ac90e648a8ef538f923525"
@ -381,14 +391,14 @@ dependencies = [
"checksum num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "99843c856d68d8b4313b03a17e33c4bb42ae8f6610ea81b28abe076ac721b9b0"
"checksum num_cpus 1.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aec53c34f2d0247c5ca5d32cca1478762f301740468ee9ee6dcb7a0dd7a0c584"
"checksum rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "eb250fd207a4729c976794d03db689c9be1d634ab5a1c9da9492a13d8fecbcdf"
"checksum redox_syscall 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "9df6a71a1e67be2104410736b2389fb8e383c1d7e9e792d629ff13c02867147a"
"checksum redox_syscall 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "3c9309631a35303bffb47e397198e3668cb544fe8834cd3da2a744441e70e524"
"checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda"
"checksum scheduled-executor 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e50853ff7d0b411f597d2ed9d4347b4a7583f3abced3e24d2b7fbfde492c901"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum thread-local-object 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7da3caa820d0308c84c8654f6cafd81cc3195d45433311cbe22fcf44fc8be071"
"checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520"
"checksum tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6a20ba4738d283cac7495ca36e045c80c2a8df3e05dd0909b17a06646af5a7ed"
"checksum tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e85d419699ec4b71bfe35bbc25bb8771e52eff0471a7f75c853ad06e200b4f86"
"checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db"
"checksum traitobject 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "efd1f82c56340fdf16f2a953d7bda4f8fdffba13d93b00844c25572110b26079"
"checksum unsafe-any 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f30360d7979f5e9c6e6cea48af192ea8fab4afb3cf72597154b8f08935bc9c7f"

View File

@ -1,18 +1,5 @@
[package]
name = "dipstick"
version = "0.2.0"
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
[dependencies]
# not yet required
#error-chain = "0.10"
lazy_static = "0.2.8"
cached = "0.4.1"
log = "0.3.8"
time = "0.1"
scheduled-executor = "0.3.0"
thread-local-object = "0.1.0"
num = "0.1.40"
[features]
bench = []
[workspace]
members = [
"lib/",
"examples/aggregate",
]

View File

@ -0,0 +1,9 @@
[package]
name = "aggregate"
version = "0.0.0"
workspace = "../../"
[dependencies]
dipstick = { path = '../../lib' }
scheduled-executor = "0.3.0"

View File

@ -1,48 +1,19 @@
#![cfg_attr(feature = "bench", feature(test))]
#[cfg(feature = "bench")]
extern crate test;
extern crate time;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
#[macro_use] extern crate dipstick;
extern crate scheduled_executor;
extern crate thread_local_object;
extern crate cached;
extern crate num;
#[macro_use]
pub mod core;
pub mod dual;
pub mod dispatch;
pub mod sampling;
pub mod aggregate;
pub mod publish;
pub mod statsd;
pub mod logging;
pub mod cache;
pub mod pcg32;
use dual::DualSink;
use dispatch::{DirectDispatch, DirectCount, DirectTimer};
use sampling::SamplingSink;
use statsd::StatsdSink;
use logging::LoggingSink;
use aggregate::MetricAggregator;
use publish::AggregatePublisher;
use core::{MetricType, MetricSink, MetricWriter, MetricDispatch, CountMetric, GaugeMetric,
use dipstick::dual::DualSink;
use dipstick::dispatch::{DirectDispatch, DirectCount, DirectTimer};
use dipstick::sampling::SamplingSink;
use dipstick::statsd::StatsdSink;
use dipstick::logging::LoggingSink;
use dipstick::aggregate::MetricAggregator;
use dipstick::publish::AggregatePublisher;
use dipstick::{MetricKind, MetricSink, MetricWriter, MetricDispatch, CountMetric, GaugeMetric,
TimerMetric, EventMetric};
use std::thread::sleep;
use scheduled_executor::CoreExecutor;
use std::time::Duration;
use cache::MetricCache;
use dipstick::cache::MetricCache;
fn main() {
sample_scheduled_statsd_aggregation()
@ -119,7 +90,7 @@ pub fn raw_write() {
let metrics_log = LoggingSink::new("metrics");
// define and send metrics using raw channel API
let counter = metrics_log.new_metric(MetricType::Count, "count_a", core::FULL_SAMPLING_RATE);
let counter = metrics_log.new_metric(MetricKind::Count, "count_a", dipstick::FULL_SAMPLING_RATE);
metrics_log.new_writer().write(&counter, 1);
}
@ -132,10 +103,10 @@ pub fn counter_to_log() {
const STATSD_SAMPLING_RATE: f64 = 0.0001;
lazy_static! {
pub static ref METRICS: DirectDispatch<SamplingSink<StatsdSink>> = DirectDispatch::new(
SamplingSink::new(StatsdSink::new("localhost:8125", env!("CARGO_PKG_NAME")).unwrap(), STATSD_SAMPLING_RATE));
pub static ref SERVICE_RESPONSE_TIME: DirectTimer<SamplingSink<StatsdSink>> = METRICS.new_timer("service.response.time");
pub static ref SERVICE_RESPONSE_BYTES: DirectCount<SamplingSink<StatsdSink>> = METRICS.new_count("service.response.bytes");
}
//lazy_static! {
// pub static ref METRICS: DirectDispatch<SamplingSink<StatsdSink>> = DirectDispatch::new(
// SamplingSink::new(StatsdSink::new("localhost:8125", env!("CARGO_PKG_NAME")).unwrap(), STATSD_SAMPLING_RATE));
//
// pub static ref SERVICE_RESPONSE_TIME: DirectTimer<SamplingSink<StatsdSink>> = METRICS.new_timer("service.response.time");
// pub static ref SERVICE_RESPONSE_BYTES: DirectCount<SamplingSink<StatsdSink>> = METRICS.new_count("service.response.bytes");
//}

16
lib/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "dipstick"
version = "0.2.0"
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
[dependencies]
lazy_static = "0.2.8"
cached = "0.4.1"
log = "0.3.8"
time = "0.1"
scheduled-executor = "0.3.0"
thread-local-object = "0.1.0"
num = "0.1.40"
[features]
bench = []

View File

@ -1,4 +1,4 @@
use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
use super::{MetricKind, Rate, Value, MetricWriter, MetricKey, MetricSink};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::usize;
@ -29,13 +29,13 @@ pub enum AggregateScore {
/// A metric that holds aggregated values
#[derive(Debug)]
pub struct AggregateMetricKey {
pub m_type: MetricType,
pub struct AggregateKey {
pub kind: MetricKind,
pub name: String,
score: AtomicScore,
}
impl AggregateMetricKey {
impl AggregateKey {
/// Update scores with value
pub fn write(&self, value: usize) -> () {
match &self.score {
@ -106,13 +106,13 @@ impl AggregateMetricKey {
}
}
impl MetricKey for Arc<AggregateMetricKey> {}
impl MetricKey for Arc<AggregateKey> {}
#[derive(Debug)]
pub struct AggregateWrite();
impl MetricWriter<Arc<AggregateMetricKey>> for AggregateWrite {
fn write(&self, metric: &Arc<AggregateMetricKey>, value: Value) {
impl MetricWriter<Arc<AggregateKey>> for AggregateWrite {
fn write(&self, metric: &Arc<AggregateKey>, value: Value) {
metric.write(value as usize);
}
}
@ -123,12 +123,12 @@ lazy_static! {
}
#[derive(Debug)]
pub struct AggregateSource(Arc<RwLock<Vec<Arc<AggregateMetricKey>>>>);
pub struct AggregateSource(Arc<RwLock<Vec<Arc<AggregateKey>>>>);
impl AggregateSource {
pub fn for_each<F>(&self, ops: F)
where
F: Fn(&AggregateMetricKey),
F: Fn(&AggregateKey),
{
for metric in self.0.read().unwrap().iter() {
ops(&metric)
@ -138,7 +138,7 @@ impl AggregateSource {
#[derive(Debug)]
pub struct MetricAggregator {
metrics: Arc<RwLock<Vec<Arc<AggregateMetricKey>>>>,
metrics: Arc<RwLock<Vec<Arc<AggregateKey>>>>,
}
impl MetricAggregator {
@ -155,24 +155,21 @@ impl MetricAggregator {
}
}
pub struct AggregateSink(Arc<RwLock<Vec<Arc<AggregateMetricKey>>>>);
pub struct AggregateSink(Arc<RwLock<Vec<Arc<AggregateKey>>>>);
impl MetricSink for AggregateSink {
type Metric = Arc<AggregateMetricKey>;
type Metric = Arc<AggregateKey>;
type Writer = AggregateWrite;
fn new_metric<S: AsRef<str>>(
&self,
m_type: MetricType,
name: S,
sampling: Rate,
) -> Arc<AggregateMetricKey> {
#[allow(unused_variables)]
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate)
-> Self::Metric {
let name = name.as_ref().to_string();
let metric = Arc::new(AggregateMetricKey {
m_type,
let metric = Arc::new(AggregateKey {
kind,
name,
score: match m_type {
MetricType::Event => AtomicScore::Event { hit: AtomicUsize::new(0) },
score: match kind {
MetricKind::Event => AtomicScore::Event { hit: AtomicUsize::new(0) },
_ => AtomicScore::Value {
hit: AtomicUsize::new(0),
sum: AtomicUsize::new(0),
@ -186,8 +183,8 @@ impl MetricSink for AggregateSink {
metric
}
fn new_writer(&self) -> AggregateWrite {
// TODO return AGGREGATE_WRITE
fn new_writer(&self) -> Self::Writer {
// TODO return AGGREGATE_WRITE or a immutable field at least
AggregateWrite()
}
}

View File

@ -1,4 +1,4 @@
pub use core::{MetricType, Rate, Value, MetricSink, MetricKey, MetricWriter};
use super::{MetricKind, Rate, Value, MetricSink, MetricKey, MetricWriter};
use cached::{SizedCache, Cached};
use std::sync::Arc;
use std::sync::RwLock;
@ -51,26 +51,26 @@ impl<C: MetricSink> MetricSink for MetricCache<C> {
type Metric = CachedKey<C>;
type Writer = CachedMetricWriter<C>;
fn new_metric<S>(&self, m_type: MetricType, name: S, sampling: Rate) -> CachedKey<C>
where
S: AsRef<str>,
{
#[allow(unused_variables)]
fn new_metric<S>(&self, kind: MetricKind, name: S, sampling: Rate) -> Self::Metric
where S: AsRef<str> {
// TODO use ref for key, not owned
let key = name.as_ref().to_string();
{
let mut cache = self.cache.write().unwrap();
let cached_metric = cache.cache_get(&key);
if let Some(cached_metric) = cached_metric {
return CachedKey(cached_metric.clone());
}
let mut cache = self.cache.write().unwrap();
let cached_metric = cache.cache_get(&key);
if let Some(cached_metric) = cached_metric {
return CachedKey(cached_metric.clone());
}
let target_metric = self.target.new_metric(m_type, name, sampling);
let target_metric = self.target.new_metric(kind, name, sampling);
let new_metric = Arc::new(target_metric);
let mut cache = self.cache.write().unwrap();
cache.cache_set(key, new_metric.clone());
CachedKey(new_metric)
}
fn new_writer(&self) -> CachedMetricWriter<C> {
fn new_writer(&self) -> Self::Writer {
CachedMetricWriter { target: self.target.new_writer() }
}
}

View File

@ -1,7 +1,7 @@
pub use core::{MetricType, Value, MetricWriter, MetricSink, MetricDispatch,
EventMetric, CountMetric, GaugeMetric, TimerMetric};
use super::{MetricKind, Value, MetricWriter, MetricSink, MetricDispatch,
EventMetric, CountMetric, GaugeMetric, TimerMetric};
use std::sync::Arc;
pub use num::ToPrimitive;
use num::ToPrimitive;
/// Base struct for all direct dispatch metrics
struct DirectMetric<C: MetricSink + 'static> {
@ -94,7 +94,7 @@ impl<C: MetricSink> MetricDispatch for DirectDispatch<C> {
type Timer = DirectTimer<C>;
fn new_event<S: AsRef<str>>(&self, name: S) -> Self::Event {
let metric = self.target.new_metric(MetricType::Event, self.add_prefix(name), 1.0);
let metric = self.target.new_metric(MetricKind::Event, self.add_prefix(name), 1.0);
DirectEvent(DirectMetric {
metric,
writer: self.writer.clone(),
@ -102,7 +102,7 @@ impl<C: MetricSink> MetricDispatch for DirectDispatch<C> {
}
fn new_count<S: AsRef<str>>(&self, name: S) -> Self::Count {
let metric = self.target.new_metric(MetricType::Count, self.add_prefix(name), 1.0);
let metric = self.target.new_metric(MetricKind::Count, self.add_prefix(name), 1.0);
DirectCount(DirectMetric {
metric,
writer: self.writer.clone(),
@ -110,7 +110,7 @@ impl<C: MetricSink> MetricDispatch for DirectDispatch<C> {
}
fn new_timer<S: AsRef<str>>(&self, name: S) -> Self::Timer {
let metric = self.target.new_metric(MetricType::Time, self.add_prefix(name), 1.0);
let metric = self.target.new_metric(MetricKind::Time, self.add_prefix(name), 1.0);
DirectTimer(DirectMetric {
metric,
writer: self.writer.clone(),
@ -118,7 +118,7 @@ impl<C: MetricSink> MetricDispatch for DirectDispatch<C> {
}
fn new_gauge<S: AsRef<str>>(&self, name: S) -> Self::Gauge {
let metric = self.target.new_metric(MetricType::Gauge, self.add_prefix(name), 1.0);
let metric = self.target.new_metric(MetricKind::Gauge, self.add_prefix(name), 1.0);
DirectGauge(DirectMetric {
metric,
writer: self.writer.clone(),

View File

@ -1,4 +1,4 @@
pub use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
use super::{MetricKind, Rate, Value, MetricWriter, MetricKey, MetricSink};
#[derive(Debug)]
pub struct DualKey<M1: MetricKey, M2: MetricKey> {
@ -48,18 +48,15 @@ impl<C1: MetricSink, C2: MetricSink> MetricSink for DualSink<C1, C2> {
type Metric = DualKey<C1::Metric, C2::Metric>;
type Writer = DualWriter<C1, C2>;
fn new_metric<S: AsRef<str>>(
&self,
m_type: MetricType,
name: S,
sampling: Rate,
) -> DualKey<C1::Metric, C2::Metric> {
let metric_1 = self.channel_a.new_metric(m_type, &name, sampling);
let metric_2 = self.channel_b.new_metric(m_type, &name, sampling);
#[allow(unused_variables)]
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate)
-> Self::Metric {
let metric_1 = self.channel_a.new_metric(kind, &name, sampling);
let metric_2 = self.channel_b.new_metric(kind, &name, sampling);
DualKey { metric_1, metric_2 }
}
fn new_writer(&self) -> DualWriter<C1, C2> {
fn new_writer(&self) -> Self::Writer {
DualWriter {
channel_a: self.channel_a.new_writer(),
channel_b: self.channel_b.new_writer(),

View File

@ -1,4 +1,45 @@
use time;
#![cfg_attr(feature = "bench", feature(test))]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
variant_size_differences,
)]
#![feature(fn_traits)]
#[cfg(feature = "bench")]
extern crate test;
extern crate time;
extern crate cached;
extern crate thread_local_object;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
extern crate num;
extern crate scheduled_executor;
pub mod dual;
pub mod dispatch;
pub mod sampling;
pub mod aggregate;
pub mod publish;
pub mod statsd;
pub mod logging;
pub mod pcg32;
pub mod cache;
use num::ToPrimitive;
//////////////////
@ -27,7 +68,7 @@ pub type Rate = f64;
pub const FULL_SAMPLING_RATE: Rate = 1.0;
#[derive(Debug, Copy, Clone)]
pub enum MetricType {
pub enum MetricKind {
Event,
Count,
Gauge,
@ -137,8 +178,8 @@ pub trait ScopingDispatch {
type Scope: DispatchScope;
fn with_scope<F>(&mut self, operations: F)
where
F: Fn(&Self::Scope);
where
F: Fn(&Self::Scope);
}
/// Metric sources allow a group of metrics to be defined and written as one.
@ -176,7 +217,7 @@ pub trait MetricSink {
type Writer: MetricWriter<Self::Metric>;
/// Define a new sink-specific metric that can be used for writing values.
fn new_metric<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> Self::Metric;
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate) -> Self::Metric;
/// Open a metric writer to write metrics to.
/// Some sinks reuse the same writer while others allocate resources for every new writer.

View File

@ -1,4 +1,4 @@
pub use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
use super::{MetricKind, Rate, Value, MetricWriter, MetricKey, MetricSink};
//////////// Log Channel
@ -39,11 +39,13 @@ impl MetricSink for LoggingSink {
type Metric = LoggingKey;
type Writer = LoggingWriter;
fn new_metric<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> LoggingKey {
LoggingKey { prefix: format!("{:?}:{}{}", m_type, self.prefix, name.as_ref()) }
#[allow(unused_variables)]
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate)
-> Self::Metric {
LoggingKey { prefix: format!("{:?}:{}{}", kind, self.prefix, name.as_ref()) }
}
fn new_writer(&self) -> LoggingWriter {
fn new_writer(&self) -> Self::Writer {
LoggingWriter {}
}
}

View File

@ -1,7 +1,7 @@
//// Aggregate Source
pub use core::{MetricSink, MetricType, MetricWriter};
pub use aggregate::{AggregateSource, AggregateScore};
use super::{MetricSink, MetricKind, MetricWriter};
use aggregate::{AggregateSource, AggregateScore};
use std::time::Duration;
use scheduled_executor::CoreExecutor;
@ -46,47 +46,47 @@ impl<C: MetricSink + Sync> AggregatePublisher<C> {
}
AggregateScore::Event { hit } => {
let name = format!("{}.hit", &metric.name);
let temp_metric = self.target.new_metric(MetricType::Count, name, 1.0);
let temp_metric = self.target.new_metric(MetricKind::Count, name, 1.0);
scope.write(&temp_metric, hit);
}
AggregateScore::Value { hit, sum, max, min } => {
if hit > 0 {
match &metric.m_type {
&MetricType::Count |
&MetricType::Time |
&MetricType::Gauge => {
match &metric.kind {
&MetricKind::Count |
&MetricKind::Time |
&MetricKind::Gauge => {
// NOTE best-effort averaging
// - hit and sum are not incremented nor read as one
// - integer division is not rounding
// assuming values will still be good enough to be useful
let name = format!("{}.avg", &metric.name);
let temp_metric = self.target.new_metric(metric.m_type, name, 1.0);
let temp_metric = self.target.new_metric(metric.kind, name, 1.0);
scope.write(&temp_metric, sum / hit);
}
_ => (),
}
match &metric.m_type {
match &metric.kind {
// do not report gauges sum and hit, they are meaningless
&MetricType::Count |
&MetricType::Time => {
&MetricKind::Count |
&MetricKind::Time => {
let name = format!("{}.sum", &metric.name);
let temp_metric = self.target.new_metric(metric.m_type, name, 1.0);
let temp_metric = self.target.new_metric(metric.kind, name, 1.0);
scope.write(&temp_metric, sum);
let name = format!("{}.hit", &metric.name);
let temp_metric = self.target.new_metric(metric.m_type, name, 1.0);
let temp_metric = self.target.new_metric(metric.kind, name, 1.0);
scope.write(&temp_metric, hit);
}
_ => (),
}
let name = format!("{}.max", &metric.name);
let temp_metric = self.target.new_metric(MetricType::Gauge, name, 1.0);
let temp_metric = self.target.new_metric(MetricKind::Gauge, name, 1.0);
scope.write(&temp_metric, max);
let name = format!("{}.min", &metric.name);
let temp_metric = self.target.new_metric(MetricType::Gauge, name, 1.0);
let temp_metric = self.target.new_metric(MetricKind::Gauge, name, 1.0);
scope.write(&temp_metric, min);
}
}

View File

@ -1,4 +1,4 @@
pub use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
use super::{MetricKind, Rate, Value, MetricWriter, MetricKey, MetricSink, FULL_SAMPLING_RATE};
use pcg32;
#[derive(Debug)]
@ -41,16 +41,20 @@ impl<C: MetricSink> MetricSink for SamplingSink<C> {
type Metric = SamplingKey<C::Metric>;
type Writer = SamplingWriter<C>;
fn new_metric<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate)
-> SamplingKey<C::Metric> {
let pm = self.target.new_metric(m_type, name, self.sampling_rate);
#[allow(unused_variables)]
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate)
-> Self::Metric {
// TODO override only if FULL_SAMPLING else warn!()
assert_eq!(sampling, FULL_SAMPLING_RATE, "Overriding previously set sampling rate");
let pm = self.target.new_metric(kind, name, self.sampling_rate);
SamplingKey {
target: pm,
int_sampling_rate: pcg32::to_int_rate(self.sampling_rate),
}
}
fn new_writer(&self) -> SamplingWriter<C> {
fn new_writer(&self) -> Self::Writer {
SamplingWriter { target: self.target.new_writer() }
}
}

View File

@ -1,4 +1,4 @@
pub use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
use super::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink};
#[derive(Debug)]
pub struct ScopeKey<M: MetricKey> {
@ -72,16 +72,16 @@ impl<C: MetricSink> MetricSink for ScopeSink<C> {
type Metric = ScopeKey<C::Metric>;
type Writer = ScopeWriter<C>;
fn new_metric<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate)
-> ScopeKey<C::Metric> {
let pm = self.target.new_metric(m_type, name, self.sampling_rate);
#[allow(unused_variables)]
fn new_metric<S: AsRef<str>>(&self, kind: MetricType, name: S, sampling: Rate)
-> Self::Metric {
let pm = self.target.new_metric(kind, name, self.sampling_rate);
ScopeKey {
target: pm,
}
}
fn new_writer(&self) -> ScopeWriter<C> {
fn new_writer(&self) -> Self::Writer {
self.writer.thread_writer.set(self.target.new_writer())
// TODO drop target_writer on scope_writer drop (or something)
}

View File

@ -1,5 +1,5 @@
pub use core::{MetricType, Rate, Value, MetricWriter, MetricKey, MetricSink, FULL_SAMPLING_RATE};
pub use std::io::Result;
use super::{MetricKind, Rate, Value, MetricWriter, MetricKey, MetricSink, FULL_SAMPLING_RATE};
use std::io::Result;
use std::net::UdpSocket;
use std::cell::RefCell;
@ -108,7 +108,8 @@ impl MetricSink for StatsdSink {
type Metric = StatsdKey;
type Writer = StatsdWriter;
fn new_metric<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: Rate) -> StatsdKey {
fn new_metric<S: AsRef<str>>(&self, kind: MetricKind, name: S, sampling: Rate)
-> Self::Metric {
let mut prefix = String::with_capacity(32);
prefix.push_str(&self.prefix);
prefix.push_str(name.as_ref());
@ -116,10 +117,10 @@ impl MetricSink for StatsdSink {
let mut suffix = String::with_capacity(16);
suffix.push('|');
suffix.push_str(match m_type {
MetricType::Event | MetricType::Count => "c",
MetricType::Gauge => "g",
MetricType::Time => "ms",
suffix.push_str(match kind {
MetricKind::Event | MetricKind::Count => "c",
MetricKind::Gauge => "g",
MetricKind::Time => "ms",
});
if sampling < FULL_SAMPLING_RATE {
@ -127,15 +128,15 @@ impl MetricSink for StatsdSink {
suffix.push_str(&sampling.to_string());
}
let scale = match m_type {
MetricType::Time => 1000,
let scale = match kind {
MetricKind::Time => 1000,
_ => 1
};
StatsdKey { prefix, suffix, scale }
}
fn new_writer(&self) -> StatsdWriter {
fn new_writer(&self) -> Self::Writer {
StatsdWriter { socket: self.socket.clone() }
}
}

View File

@ -1,42 +0,0 @@
#![cfg_attr(feature = "bench", feature(test))]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
variant_size_differences,
)]
#![feature(fn_traits)]
#[cfg(feature = "bench")]
extern crate test;
extern crate time;
extern crate cached;
extern crate thread_local_object;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
extern crate num;
extern crate scheduled_executor;
pub mod core;
pub mod dual;
pub mod dispatch;
pub mod sampling;
pub mod aggregate;
pub mod publish;
pub mod statsd;
pub mod logging;
pub mod pcg32;
pub mod cache;