mirror of https://github.com/fralalonde/dipstick
Shorten Send/Recv delegate
This commit is contained in:
parent
d4274cf49c
commit
4c4b82a9da
|
@ -103,7 +103,7 @@ app_metrics!("my_app" => {
|
||||||
});
|
});
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
send_delegated_metrics(to_stdout());
|
send_app_metrics(to_stdout());
|
||||||
COUNTER_A.count(11);
|
COUNTER_A.count(11);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
@ -42,7 +42,7 @@ app_metrics!(LIB_METRICS => {
|
||||||
});
|
});
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
send_delegated_metrics(to_stdout());
|
send_app_metrics(to_stdout());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
ROOT_COUNTER.count(123);
|
ROOT_COUNTER.count(123);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
//! Maintain aggregated metrics for deferred reporting,
|
//! Maintain aggregated metrics for deferred reporting,
|
||||||
//!
|
//!
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use app_metrics::*;
|
use app_metrics::*;
|
||||||
use namespace::*;
|
use namespace::*;
|
||||||
use output::to_void;
|
use output::to_void;
|
||||||
|
@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock};
|
||||||
/// metrics.marker("my_event").mark();
|
/// metrics.marker("my_event").mark();
|
||||||
/// metrics.marker("my_event").mark();
|
/// metrics.marker("my_event").mark();
|
||||||
/// ```
|
/// ```
|
||||||
pub fn aggregate<E, M>(stat_fn: E, to_chain: ScopeMetrics<M>) -> Aggregator
|
pub fn aggregate<E, M>(stat_fn: E, to_chain: LocalMetrics<M>) -> Aggregator
|
||||||
where
|
where
|
||||||
E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
|
E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
|
||||||
M: Clone + Send + Sync + Debug + 'static,
|
M: Clone + Send + Sync + Debug + 'static,
|
||||||
|
|
|
@ -0,0 +1,189 @@
|
||||||
|
//! Decouple metric definition from configuration with trait objects.
|
||||||
|
|
||||||
|
use core::*;
|
||||||
|
use app_metrics::*;
|
||||||
|
use namespace::*;
|
||||||
|
use registry;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, RwLock, Weak};
|
||||||
|
|
||||||
|
use atomic_refcell::*;
|
||||||
|
|
||||||
|
|
||||||
|
/// Create a new dispatch point for metrics.
|
||||||
|
/// All dispatch points are automatically entered in the dispatch registry.
|
||||||
|
pub fn app_delegate() -> AppSend {
|
||||||
|
let send = AppSend {
|
||||||
|
inner: Arc::new(RwLock::new(InnerAppSend {
|
||||||
|
metrics: HashMap::new(),
|
||||||
|
recv: registry::get_default_app_recv(),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
registry::add_app_send(send.clone());
|
||||||
|
send
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dynamic counterpart of a `Dispatcher`.
|
||||||
|
/// Adapter to AppMetrics<_> of unknown type.
|
||||||
|
pub trait AppRecv {
|
||||||
|
/// Register a new metric.
|
||||||
|
/// Only one metric of a certain name will be defined.
|
||||||
|
/// Observer must return a MetricHandle that uniquely identifies the metric.
|
||||||
|
fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box<AppRecvMetric + Send + Sync>;
|
||||||
|
|
||||||
|
/// Flush the receiver's scope.
|
||||||
|
fn flush(&self);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dynamic counterpart of the `DispatcherMetric`.
|
||||||
|
/// Adapter to a metric of unknown type.
|
||||||
|
pub trait AppRecvMetric {
|
||||||
|
/// Write metric value to a scope.
|
||||||
|
/// Observers only receive previously registered handles.
|
||||||
|
fn write(&self, value: Value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Shortcut name because `AppMetrics<Dispatch>`
|
||||||
|
/// looks better than `AppMetrics<Arc<DispatcherMetric>>`.
|
||||||
|
pub type Delegate = Arc<AppSendMetric>;
|
||||||
|
|
||||||
|
/// A dynamically dispatched metric.
|
||||||
|
#[derive(Derivative)]
|
||||||
|
#[derivative(Debug)]
|
||||||
|
pub struct AppSendMetric {
|
||||||
|
kind: Kind,
|
||||||
|
name: String,
|
||||||
|
rate: Rate,
|
||||||
|
#[derivative(Debug = "ignore")]
|
||||||
|
recv_metric: AtomicRefCell<Box<AppRecvMetric + Send + Sync>>,
|
||||||
|
#[derivative(Debug = "ignore")]
|
||||||
|
send: AppSend,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out.
|
||||||
|
impl Drop for AppSendMetric {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.send.drop_metric(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A dynamic dispatch point for app and lib metrics.
|
||||||
|
/// Decouples metrics definition from backend configuration.
|
||||||
|
/// Allows defining metrics before a concrete type has been selected.
|
||||||
|
/// Allows replacing metrics backend on the fly at runtime.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AppSend {
|
||||||
|
inner: Arc<RwLock<InnerAppSend>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InnerAppSend {
|
||||||
|
metrics: HashMap<String, Weak<AppSendMetric>>,
|
||||||
|
recv: Arc<AppRecv + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&'static str> for AppMetrics<Delegate> {
|
||||||
|
fn from(prefix: &'static str) -> AppMetrics<Delegate> {
|
||||||
|
let app_metrics: AppMetrics<Delegate> = app_delegate().into();
|
||||||
|
app_metrics.with_prefix(prefix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<AppSend> for AppMetrics<Delegate> {
|
||||||
|
fn from(send: AppSend) -> AppMetrics<Delegate> {
|
||||||
|
let send_cmd = send.clone();
|
||||||
|
AppMetrics::new(
|
||||||
|
// define metric
|
||||||
|
Arc::new(move |kind, name, rate| send.define_metric(kind, name, rate)),
|
||||||
|
|
||||||
|
// write / flush metric
|
||||||
|
control_scope(move |cmd| match cmd {
|
||||||
|
ScopeCmd::Write(metric, value) => {
|
||||||
|
let dispatch: &Arc<AppSendMetric> = metric;
|
||||||
|
dispatch.recv_metric.borrow().write(value);
|
||||||
|
// let recv_metric: AtomicRef<Box<AppRecvMetric + Send + Sync>> = dispatch.recv_metric.borrow();
|
||||||
|
// recv_metric.write(value)
|
||||||
|
}
|
||||||
|
ScopeCmd::Flush => send_cmd.inner.write().expect("Locking Delegate").recv.flush(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppSend {
|
||||||
|
/// Install a new metric receiver, replacing the previous one.
|
||||||
|
pub fn set_receiver<IS: Into<AppMetrics<T>>, T: Send + Sync + Clone + 'static>(
|
||||||
|
&self,
|
||||||
|
receiver: IS,
|
||||||
|
) {
|
||||||
|
let receiver: Arc<AppRecv + Send + Sync> = Arc::new(receiver.into());
|
||||||
|
let inner: &mut InnerAppSend =
|
||||||
|
&mut *self.inner.write().expect("Lock Metrics Send");
|
||||||
|
|
||||||
|
for mut metric in inner.metrics.values() {
|
||||||
|
if let Some(metric) = metric.upgrade() {
|
||||||
|
let receiver_metric =
|
||||||
|
receiver.define_metric(metric.kind, metric.name.as_ref(), metric.rate);
|
||||||
|
*metric.recv_metric.borrow_mut() = receiver_metric;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO return old receiver (swap, how?)
|
||||||
|
inner.recv = receiver;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Delegate {
|
||||||
|
let mut inner = self.inner.write().expect("Lock Metrics Send");
|
||||||
|
inner.metrics.get(name)
|
||||||
|
.and_then(|metric_ref| Weak::upgrade(metric_ref))
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
let recv_metric = inner.recv.define_metric(kind, name, rate);
|
||||||
|
let new_metric = Arc::new(AppSendMetric {
|
||||||
|
kind,
|
||||||
|
name: name.to_string(),
|
||||||
|
rate,
|
||||||
|
recv_metric: AtomicRefCell::new(recv_metric),
|
||||||
|
send: self.clone(),
|
||||||
|
});
|
||||||
|
inner.metrics.insert(
|
||||||
|
new_metric.name.clone(),
|
||||||
|
Arc::downgrade(&new_metric),
|
||||||
|
);
|
||||||
|
new_metric
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drop_metric(&self, metric: &AppSendMetric) {
|
||||||
|
let mut inner = self.inner.write().expect("Lock Metrics Send");
|
||||||
|
if inner.metrics.remove(&metric.name).is_none() {
|
||||||
|
panic!("Could not remove DelegatingMetric weak ref from delegation point")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "bench")]
|
||||||
|
mod bench {
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use test;
|
||||||
|
use core::Kind::*;
|
||||||
|
use aggregate::*;
|
||||||
|
use publish::*;
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn dispatch_marker_to_aggregate(b: &mut test::Bencher) {
|
||||||
|
let dispatch = app_delegate();
|
||||||
|
let sink: AppMetrics<Delegate> = dispatch.clone().into();
|
||||||
|
dispatch.set_receiver(aggregate(summary, to_void()));
|
||||||
|
let metric = sink.marker("event_a");
|
||||||
|
b.iter(|| test::black_box(metric.mark()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn dispatch_marker_to_void(b: &mut test::Bencher) {
|
||||||
|
let dispatch = app_delegate();
|
||||||
|
let sink: AppMetrics<Delegate> = dispatch.into();
|
||||||
|
let metric = sink.marker("event_a");
|
||||||
|
b.iter(|| test::black_box(metric.mark()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,7 +12,7 @@ use core::Kind::*;
|
||||||
use namespace::*;
|
use namespace::*;
|
||||||
use cache::*;
|
use cache::*;
|
||||||
use schedule::*;
|
use schedule::*;
|
||||||
use delegate::*;
|
use app_delegate::*;
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -226,17 +226,17 @@ where
|
||||||
|
|
||||||
//// Dispatch / Receiver impl
|
//// Dispatch / Receiver impl
|
||||||
|
|
||||||
struct AppReceiverMetric<M> {
|
struct AppRecvMetricImpl<M> {
|
||||||
metric: M,
|
metric: M,
|
||||||
scope: ControlScopeFn<M>,
|
scope: ControlScopeFn<M>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + Clone + 'static> Receiver for AppMetrics<M> {
|
impl<M: Send + Sync + Clone + 'static> AppRecv for AppMetrics<M> {
|
||||||
fn box_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box<ReceiverMetric + Send + Sync> {
|
fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box<AppRecvMetric + Send + Sync> {
|
||||||
let scope: ControlScopeFn<M> = self.single_scope.clone();
|
let scope: ControlScopeFn<M> = self.single_scope.clone();
|
||||||
let metric: M = self.define_metric(kind, name, rate);
|
let metric: M = self.define_metric(kind, name, rate);
|
||||||
|
|
||||||
Box::new(AppReceiverMetric { metric, scope })
|
Box::new(AppRecvMetricImpl { metric, scope })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&self) {
|
fn flush(&self) {
|
||||||
|
@ -244,7 +244,7 @@ impl<M: Send + Sync + Clone + 'static> Receiver for AppMetrics<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> ReceiverMetric for AppReceiverMetric<M> {
|
impl<M> AppRecvMetric for AppRecvMetricImpl<M> {
|
||||||
fn write(&self, value: Value) {
|
fn write(&self, value: Value) {
|
||||||
self.scope.write(&self.metric, value);
|
self.scope.write(&self.metric, value);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
//! If queue size is exceeded, calling code reverts to blocking.
|
//! If queue size is exceeded, calling code reverts to blocking.
|
||||||
//!
|
//!
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use self_metrics::*;
|
use self_metrics::*;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -25,7 +25,7 @@ where
|
||||||
fn with_async_queue(&self, queue_size: usize) -> Self;
|
fn with_async_queue(&self, queue_size: usize) -> Self;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + Clone + 'static> WithAsyncQueue for ScopeMetrics<M> {
|
impl<M: Send + Sync + Clone + 'static> WithAsyncQueue for LocalMetrics<M> {
|
||||||
fn with_async_queue(&self, queue_size: usize) -> Self {
|
fn with_async_queue(&self, queue_size: usize) -> Self {
|
||||||
self.mod_scope(|next| {
|
self.mod_scope(|next| {
|
||||||
// setup channel
|
// setup channel
|
||||||
|
@ -78,10 +78,10 @@ impl<M: Send + Sync + Clone + 'static> WithAsyncQueue for ScopeMetrics<M> {
|
||||||
|
|
||||||
/// Enqueue collected metrics for dispatch on background thread.
|
/// Enqueue collected metrics for dispatch on background thread.
|
||||||
#[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")]
|
#[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")]
|
||||||
pub fn async<M, IC>(queue_size: usize, chain: IC) -> ScopeMetrics<M>
|
pub fn async<M, IC>(queue_size: usize, chain: IC) -> LocalMetrics<M>
|
||||||
where
|
where
|
||||||
M: Clone + Send + Sync + 'static,
|
M: Clone + Send + Sync + 'static,
|
||||||
IC: Into<ScopeMetrics<M>>,
|
IC: Into<LocalMetrics<M>>,
|
||||||
{
|
{
|
||||||
let chain = chain.into();
|
let chain = chain.into();
|
||||||
chain.with_async_queue(queue_size)
|
chain.with_async_queue(queue_size)
|
||||||
|
|
212
src/delegate.rs
212
src/delegate.rs
|
@ -1,212 +0,0 @@
|
||||||
//! Decouple metric definition from configuration with trait objects.
|
|
||||||
|
|
||||||
use core::*;
|
|
||||||
use app_metrics::*;
|
|
||||||
use output::*;
|
|
||||||
use namespace::*;
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::{Arc, RwLock, Weak};
|
|
||||||
|
|
||||||
use atomic_refcell::*;
|
|
||||||
|
|
||||||
/// The registry contains a list of every metrics dispatch point in the app.
|
|
||||||
lazy_static! {
|
|
||||||
static ref DELEGATE_REGISTRY: RwLock<Vec<DelegationPoint>> = RwLock::new(vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Install a new receiver for all dispatched metrics, replacing any previous receiver.
|
|
||||||
pub fn send_delegated_metrics<IS: Into<AppMetrics<T>>, T: Send + Sync + Clone + 'static>(
|
|
||||||
receiver: IS,
|
|
||||||
) {
|
|
||||||
let rec = receiver.into();
|
|
||||||
for d in DELEGATE_REGISTRY.read().unwrap().iter() {
|
|
||||||
d.set_receiver(rec.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a new dispatch point for metrics.
|
|
||||||
/// All dispatch points are automatically entered in the dispatch registry.
|
|
||||||
pub fn delegate() -> DelegationPoint {
|
|
||||||
let delegation_point = DelegationPoint {
|
|
||||||
inner: Arc::new(RwLock::new(InnerDelegationPoint {
|
|
||||||
metrics: HashMap::new(),
|
|
||||||
receiver: Box::new(app_metrics(to_void())),
|
|
||||||
})),
|
|
||||||
};
|
|
||||||
DELEGATE_REGISTRY
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.push(delegation_point.clone());
|
|
||||||
delegation_point
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dynamic counterpart of a `Dispatcher`.
|
|
||||||
/// Adapter to AppMetrics<_> of unknown type.
|
|
||||||
pub trait Receiver {
|
|
||||||
/// Register a new metric.
|
|
||||||
/// Only one metric of a certain name will be defined.
|
|
||||||
/// Observer must return a MetricHandle that uniquely identifies the metric.
|
|
||||||
fn box_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box<ReceiverMetric + Send + Sync>;
|
|
||||||
|
|
||||||
/// Flush the receiver's scope.
|
|
||||||
fn flush(&self);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dynamic counterpart of the `DispatcherMetric`.
|
|
||||||
/// Adapter to a metric of unknown type.
|
|
||||||
pub trait ReceiverMetric {
|
|
||||||
/// Write metric value to a scope.
|
|
||||||
/// Observers only receive previously registered handles.
|
|
||||||
fn write(&self, value: Value);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shortcut name because `AppMetrics<Dispatch>`
|
|
||||||
/// looks better than `AppMetrics<Arc<DispatcherMetric>>`.
|
|
||||||
pub type Delegate = Arc<DelegatingMetric>;
|
|
||||||
|
|
||||||
/// A dynamically dispatched metric.
|
|
||||||
#[derive(Derivative)]
|
|
||||||
#[derivative(Debug)]
|
|
||||||
pub struct DelegatingMetric {
|
|
||||||
kind: Kind,
|
|
||||||
name: String,
|
|
||||||
rate: Rate,
|
|
||||||
#[derivative(Debug = "ignore")]
|
|
||||||
receiver: AtomicRefCell<Box<ReceiverMetric + Send + Sync>>,
|
|
||||||
#[derivative(Debug = "ignore")]
|
|
||||||
dispatcher: DelegationPoint,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out.
|
|
||||||
impl Drop for DelegatingMetric {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.dispatcher.drop_metric(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A dynamic dispatch point for app and lib metrics.
|
|
||||||
/// Decouples metrics definition from backend configuration.
|
|
||||||
/// Allows defining metrics before a concrete type has been selected.
|
|
||||||
/// Allows replacing metrics backend on the fly at runtime.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DelegationPoint {
|
|
||||||
inner: Arc<RwLock<InnerDelegationPoint>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct InnerDelegationPoint {
|
|
||||||
metrics: HashMap<String, Weak<DelegatingMetric>>,
|
|
||||||
receiver: Box<Receiver + Send + Sync>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<&'static str> for AppMetrics<Delegate> {
|
|
||||||
fn from(prefix: &'static str) -> AppMetrics<Delegate> {
|
|
||||||
let app_metrics: AppMetrics<Delegate> = delegate().into();
|
|
||||||
app_metrics.with_prefix(prefix)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<DelegationPoint> for AppMetrics<Delegate> {
|
|
||||||
fn from(dispatcher: DelegationPoint) -> AppMetrics<Delegate> {
|
|
||||||
let dispatcher_1 = dispatcher.clone();
|
|
||||||
AppMetrics::new(
|
|
||||||
// define metric
|
|
||||||
Arc::new(move |kind, name, rate| dispatcher.define_metric(kind, name, rate)),
|
|
||||||
// write / flush metric
|
|
||||||
control_scope(move |cmd| match cmd {
|
|
||||||
ScopeCmd::Write(metric, value) => {
|
|
||||||
let dispatch: &Arc<DelegatingMetric> = metric;
|
|
||||||
let receiver_metric: AtomicRef<
|
|
||||||
Box<ReceiverMetric + Send + Sync>,
|
|
||||||
> = dispatch.receiver.borrow();
|
|
||||||
receiver_metric.write(value)
|
|
||||||
}
|
|
||||||
ScopeCmd::Flush => dispatcher_1
|
|
||||||
.inner
|
|
||||||
.write()
|
|
||||||
.expect("Locking dispatcher")
|
|
||||||
.receiver
|
|
||||||
.flush(),
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DelegationPoint {
|
|
||||||
/// Install a new metric receiver, replacing the previous one.
|
|
||||||
pub fn set_receiver<IS: Into<AppMetrics<T>>, T: Send + Sync + Clone + 'static>(
|
|
||||||
&self,
|
|
||||||
receiver: IS,
|
|
||||||
) {
|
|
||||||
let receiver: Box<Receiver + Send + Sync> = Box::new(receiver.into());
|
|
||||||
let inner: &mut InnerDelegationPoint =
|
|
||||||
&mut *self.inner.write().expect("Locking dispatcher");
|
|
||||||
|
|
||||||
for mut metric in inner.metrics.values() {
|
|
||||||
if let Some(metric) = metric.upgrade() {
|
|
||||||
let receiver_metric =
|
|
||||||
receiver.box_metric(metric.kind, metric.name.as_ref(), metric.rate);
|
|
||||||
*metric.receiver.borrow_mut() = receiver_metric;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO return old receiver (swap, how?)
|
|
||||||
inner.receiver = receiver;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Define a dispatch metric, registering it with the current receiver.
|
|
||||||
/// A weak ref is kept to update receiver metric if receiver is replaced.
|
|
||||||
pub fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Delegate {
|
|
||||||
let mut inner = self.inner.write().expect("Locking dispatcher");
|
|
||||||
|
|
||||||
let receiver_metric = inner.receiver.box_metric(kind, name, rate);
|
|
||||||
|
|
||||||
let delegating_metric = Arc::new(DelegatingMetric {
|
|
||||||
kind,
|
|
||||||
name: name.to_string(),
|
|
||||||
rate,
|
|
||||||
receiver: AtomicRefCell::new(receiver_metric),
|
|
||||||
dispatcher: self.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
inner.metrics.insert(
|
|
||||||
delegating_metric.name.clone(),
|
|
||||||
Arc::downgrade(&delegating_metric),
|
|
||||||
);
|
|
||||||
delegating_metric
|
|
||||||
}
|
|
||||||
|
|
||||||
fn drop_metric(&self, metric: &DelegatingMetric) {
|
|
||||||
let mut inner = self.inner.write().expect("Locking delegation point");
|
|
||||||
if inner.metrics.remove(&metric.name).is_none() {
|
|
||||||
panic!("Could not remove DelegatingMetric weak ref from delegation point")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "bench")]
|
|
||||||
mod bench {
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
use test;
|
|
||||||
use core::Kind::*;
|
|
||||||
use aggregate::*;
|
|
||||||
use publish::*;
|
|
||||||
|
|
||||||
#[bench]
|
|
||||||
fn dispatch_marker_to_aggregate(b: &mut test::Bencher) {
|
|
||||||
let dispatch = delegate();
|
|
||||||
let sink: AppMetrics<Delegate> = dispatch.clone().into();
|
|
||||||
dispatch.set_receiver(aggregate(summary, to_void()));
|
|
||||||
let metric = sink.marker("event_a");
|
|
||||||
b.iter(|| test::black_box(metric.mark()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[bench]
|
|
||||||
fn dispatch_marker_to_void(b: &mut test::Bencher) {
|
|
||||||
let dispatch = delegate();
|
|
||||||
let sink: AppMetrics<Delegate> = dispatch.into();
|
|
||||||
let metric = sink.marker("event_a");
|
|
||||||
b.iter(|| test::black_box(metric.mark()));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,7 +1,7 @@
|
||||||
//! Send metrics to a graphite server.
|
//! Send metrics to a graphite server.
|
||||||
|
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use error;
|
use error;
|
||||||
use self_metrics::*;
|
use self_metrics::*;
|
||||||
|
|
||||||
|
@ -23,13 +23,13 @@ app_metrics!{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send metrics to a graphite server at the address and port provided.
|
/// Send metrics to a graphite server at the address and port provided.
|
||||||
pub fn to_graphite<ADDR>(address: ADDR) -> error::Result<ScopeMetrics<Graphite>>
|
pub fn to_graphite<ADDR>(address: ADDR) -> error::Result<LocalMetrics<Graphite>>
|
||||||
where
|
where
|
||||||
ADDR: ToSocketAddrs + Debug + Clone,
|
ADDR: ToSocketAddrs + Debug + Clone,
|
||||||
{
|
{
|
||||||
debug!("Connecting to graphite {:?}", address);
|
debug!("Connecting to graphite {:?}", address);
|
||||||
let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?));
|
let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?));
|
||||||
Ok(ScopeMetrics::new(
|
Ok(LocalMetrics::new(
|
||||||
move |kind, name, rate| {
|
move |kind, name, rate| {
|
||||||
let mut prefix = String::with_capacity(32);
|
let mut prefix = String::with_capacity(32);
|
||||||
prefix.push_str(name);
|
prefix.push_str(name);
|
||||||
|
|
14
src/lib.rs
14
src/lib.rs
|
@ -30,11 +30,14 @@ pub mod macros;
|
||||||
pub mod core;
|
pub mod core;
|
||||||
pub use core::*;
|
pub use core::*;
|
||||||
|
|
||||||
pub mod scope_metrics;
|
pub mod local_metrics;
|
||||||
pub use scope_metrics::*;
|
pub use local_metrics::*;
|
||||||
|
|
||||||
pub mod delegate;
|
//pub mod local_delegate;
|
||||||
pub use delegate::*;
|
//pub use local_delegate::*;
|
||||||
|
|
||||||
|
pub mod app_delegate;
|
||||||
|
pub use app_delegate::*;
|
||||||
|
|
||||||
mod output;
|
mod output;
|
||||||
pub use output::*;
|
pub use output::*;
|
||||||
|
@ -81,5 +84,8 @@ pub use async_queue::*;
|
||||||
mod schedule;
|
mod schedule;
|
||||||
pub use schedule::*;
|
pub use schedule::*;
|
||||||
|
|
||||||
|
mod registry;
|
||||||
|
pub use registry::*;
|
||||||
|
|
||||||
mod self_metrics;
|
mod self_metrics;
|
||||||
pub use self_metrics::snapshot;
|
pub use self_metrics::snapshot;
|
||||||
|
|
|
@ -0,0 +1,205 @@
|
||||||
|
//! Decouple metric definition from configuration with trait objects.
|
||||||
|
|
||||||
|
use core::*;
|
||||||
|
use local_metrics::*;
|
||||||
|
use output::*;
|
||||||
|
use namespace::*;
|
||||||
|
use registry::*;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, RwLock, Weak};
|
||||||
|
|
||||||
|
use atomic_refcell::*;
|
||||||
|
|
||||||
|
|
||||||
|
/// Create a new dispatch point for metrics.
|
||||||
|
/// All dispatch points are automatically entered in the dispatch registry.
|
||||||
|
pub fn local_delegate() -> LocalSend {
|
||||||
|
let delegation_point = LocalSend {
|
||||||
|
inner_send: Arc::new(RwLock::new(InnerLocalSend {
|
||||||
|
active_metrics: HashMap::new(),
|
||||||
|
recv: registry.get,
|
||||||
|
last_metric_id: 0,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
register_local_delegation(delegation_point.clone());
|
||||||
|
delegation_point
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dynamic counterpart of a `Dispatcher`.
|
||||||
|
/// Adapter to LocalMetrics<_> of unknown type.
|
||||||
|
pub trait LocalRecv {
|
||||||
|
/// Register a new metric.
|
||||||
|
/// Only one metric of a certain name will be defined.
|
||||||
|
/// Observer must return a MetricHandle that uniquely identifies the metric.
|
||||||
|
fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> LocalRecvMetric;
|
||||||
|
|
||||||
|
/// Flush the receiver's scope.
|
||||||
|
fn open_scope(&self, buffered: bool) -> Arc<LocalRecvScope + Send + Sync>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A dynamically dispatched metric.
|
||||||
|
#[derive(Derivative)]
|
||||||
|
#[derivative(Debug)]
|
||||||
|
pub trait LocalRecvScope {
|
||||||
|
fn write(&self, metric: &LocalRecvMetric, value: Value);
|
||||||
|
fn flush(&self);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalRecvMetric (u64);
|
||||||
|
|
||||||
|
/// Shortcut name because `AppMetrics<Dispatch>`
|
||||||
|
/// looks better than `AppMetrics<Arc<DispatcherMetric>>`.
|
||||||
|
pub type LocalDelegate = Arc<LocalSendMetric>;
|
||||||
|
|
||||||
|
/// A dynamically dispatched metric.
|
||||||
|
#[derive(Derivative)]
|
||||||
|
#[derivative(Debug)]
|
||||||
|
pub struct LocalSendMetric {
|
||||||
|
kind: Kind,
|
||||||
|
name: String,
|
||||||
|
rate: Rate,
|
||||||
|
metric_id: usize,
|
||||||
|
#[derivative(Debug = "ignore")]
|
||||||
|
send: LocalSend,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out.
|
||||||
|
impl Drop for LocalSendMetric {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.dispatcher.drop_metric(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out.
|
||||||
|
impl Drop for LocalSendMetric {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.send.drop_metric(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A dynamic dispatch point for app and lib metrics.
|
||||||
|
/// Decouples metrics definition from backend configuration.
|
||||||
|
/// Allows defining metrics before a concrete type has been selected.
|
||||||
|
/// Allows replacing metrics backend on the fly at runtime.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LocalSend {
|
||||||
|
inner_send: Arc<RwLock<InnerLocalSend>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InnerLocalSend {
|
||||||
|
recv: Box<LocalRecv + Send + Sync>,
|
||||||
|
active_metrics: HashMap<String, Weak<LocalSendMetric>>,
|
||||||
|
last_metric_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&'static str> for LocalMetrics<LocalDelegate> {
|
||||||
|
fn from(prefix: &'static str) -> LocalMetrics<LocalDelegate> {
|
||||||
|
let app_metrics: LocalMetrics<LocalDelegate> = local_delegate().into();
|
||||||
|
app_metrics.with_prefix(prefix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<LocalSend> for LocalMetrics<LocalDelegate> {
|
||||||
|
fn from(send: LocalSend) -> LocalMetrics<LocalDelegate> {
|
||||||
|
let send_1 = send.clone();
|
||||||
|
LocalMetrics::new(
|
||||||
|
// define metric
|
||||||
|
Arc::new(move |kind, name, rate| send.define_metric(kind, name, rate)),
|
||||||
|
// write / flush metric
|
||||||
|
Arc::new(move |buffered| send.open_scope(buffered))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalSend {
|
||||||
|
/// Install a new metric receiver, replacing the previous one.
|
||||||
|
pub fn set_receiver<IS: Into<LocalMetrics<T>>, T: Send + Sync + Clone + 'static>(
|
||||||
|
&self,
|
||||||
|
receiver: IS,
|
||||||
|
) {
|
||||||
|
let receiver: Box<LocalRecv + Send + Sync> = Box::new(receiver.into());
|
||||||
|
let inner: &mut InnerLocalSend =
|
||||||
|
&mut *self.inner_send.write().expect("Lock Metrics Send");
|
||||||
|
|
||||||
|
for mut metric in inner.active_metrics.values() {
|
||||||
|
if let Some(metric) = metric.upgrade() {
|
||||||
|
let receiver_metric =
|
||||||
|
receiver.box_metric(metric.kind, metric.name.as_ref(), metric.rate);
|
||||||
|
*metric.receiver.borrow_mut() = receiver_metric;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO return old receiver (swap, how?)
|
||||||
|
inner.recv = receiver;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> LocalDelegate {
|
||||||
|
let mut inner = self.inner_send.write().expect("Lock Metrics Send");
|
||||||
|
inner.metrics.get(name)
|
||||||
|
.and_then(|metric_ref| Weak::upgrade(metric_ref))
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
let recv_metric = inner.recv.define_metric(kind, name, rate);
|
||||||
|
let new_metric = Arc::new(LocalSendMetric {
|
||||||
|
kind,
|
||||||
|
name: name.to_string(),
|
||||||
|
rate,
|
||||||
|
metric_id: inner.last_metric_id += 1,
|
||||||
|
send: send.clone(),
|
||||||
|
});
|
||||||
|
inner.metrics.insert(
|
||||||
|
new_metric.name.clone(),
|
||||||
|
Arc::downgrade(&new_metric),
|
||||||
|
);
|
||||||
|
new_metric
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn open_scope(&self, buffered: bool) -> Arc<ControlScopeFn<LocalDelegate>> {
|
||||||
|
let mut inner = self.inner_send.write().expect("Lock Metrics Send");
|
||||||
|
let write_scope = inner.recv.open_scope(buffered);
|
||||||
|
let flush_scope = write_scope.clone();
|
||||||
|
Arc::new(move |cmd| {
|
||||||
|
match cmd {
|
||||||
|
ScopeCmd::Write(metric, value) => write_scope.write(metric, value),
|
||||||
|
ScopeCmd::Flush => flush_scope.flush(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drop_metric(&self, metric: &LocalSendMetric) {
|
||||||
|
let mut inner = self.inner_send.write().expect("Lock Metrics Send");
|
||||||
|
if inner.metrics.remove(&metric.name).is_none() {
|
||||||
|
panic!("Could not remove DelegatingMetric weak ref from delegation point")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "bench")]
|
||||||
|
mod bench {
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use test;
|
||||||
|
use core::Kind::*;
|
||||||
|
use aggregate::*;
|
||||||
|
use publish::*;
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn dispatch_marker_to_aggregate(b: &mut test::Bencher) {
|
||||||
|
let dispatch = local_delegate();
|
||||||
|
let sink: LocalMetrics<LocalDelegate> = dispatch.clone().into();
|
||||||
|
dispatch.set_receiver(aggregate(summary, to_void()));
|
||||||
|
let metric = sink.marker("event_a");
|
||||||
|
b.iter(|| test::black_box(metric.mark()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn dispatch_marker_to_void(b: &mut test::Bencher) {
|
||||||
|
let dispatch = local_delegate();
|
||||||
|
let sink: LocalMetrics<LocalDelegate> = dispatch.into();
|
||||||
|
let metric = sink.marker("event_a");
|
||||||
|
b.iter(|| test::black_box(metric.mark()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -13,7 +13,7 @@ use namespace::*;
|
||||||
/// This is the building block for the metrics backend.
|
/// This is the building block for the metrics backend.
|
||||||
#[derive(Derivative, Clone)]
|
#[derive(Derivative, Clone)]
|
||||||
#[derivative(Debug)]
|
#[derivative(Debug)]
|
||||||
pub struct ScopeMetrics<M> {
|
pub struct LocalMetrics<M> {
|
||||||
#[derivative(Debug = "ignore")]
|
#[derivative(Debug = "ignore")]
|
||||||
define_metric_fn: DefineMetricFn<M>,
|
define_metric_fn: DefineMetricFn<M>,
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ pub struct ScopeMetrics<M> {
|
||||||
scope_metric_fn: OpenScopeFn<M>,
|
scope_metric_fn: OpenScopeFn<M>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> ScopeMetrics<M> {
|
impl<M> LocalMetrics<M> {
|
||||||
/// Define a new metric.
|
/// Define a new metric.
|
||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
pub fn define_metric(&self, kind: Kind, name: &str, sampling: Rate) -> M {
|
pub fn define_metric(&self, kind: Kind, name: &str, sampling: Rate) -> M {
|
||||||
|
@ -62,14 +62,14 @@ impl<M> ScopeMetrics<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + Clone + 'static> ScopeMetrics<M> {
|
impl<M: Send + Sync + Clone + 'static> LocalMetrics<M> {
|
||||||
/// Create a new metric chain with the provided metric definition and scope creation functions.
|
/// Create a new metric chain with the provided metric definition and scope creation functions.
|
||||||
pub fn new<MF, WF>(make_metric: MF, make_scope: WF) -> Self
|
pub fn new<MF, WF>(make_metric: MF, make_scope: WF) -> Self
|
||||||
where
|
where
|
||||||
MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static,
|
MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static,
|
||||||
WF: Fn(bool) -> ControlScopeFn<M> + Send + Sync + 'static,
|
WF: Fn(bool) -> ControlScopeFn<M> + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
ScopeMetrics {
|
LocalMetrics {
|
||||||
// capture the provided closures in Arc to provide cheap clones
|
// capture the provided closures in Arc to provide cheap clones
|
||||||
define_metric_fn: Arc::new(make_metric),
|
define_metric_fn: Arc::new(make_metric),
|
||||||
scope_metric_fn: Arc::new(make_scope),
|
scope_metric_fn: Arc::new(make_scope),
|
||||||
|
@ -77,38 +77,38 @@ impl<M: Send + Sync + Clone + 'static> ScopeMetrics<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an event counter of the provided name.
|
/// Get an event counter of the provided name.
|
||||||
pub fn marker<AS: AsRef<str>>(&self, name: AS) -> ScopeMarker<M> {
|
pub fn marker<AS: AsRef<str>>(&self, name: AS) -> LocalMarker<M> {
|
||||||
let metric = self.define_metric(Marker, name.as_ref(), 1.0);
|
let metric = self.define_metric(Marker, name.as_ref(), 1.0);
|
||||||
ScopeMarker { metric }
|
LocalMarker { metric }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a counter of the provided name.
|
/// Get a counter of the provided name.
|
||||||
pub fn counter<AS: AsRef<str>>(&self, name: AS) -> ScopeCounter<M> {
|
pub fn counter<AS: AsRef<str>>(&self, name: AS) -> LocalCounter<M> {
|
||||||
let metric = self.define_metric(Counter, name.as_ref(), 1.0);
|
let metric = self.define_metric(Counter, name.as_ref(), 1.0);
|
||||||
ScopeCounter { metric }
|
LocalCounter { metric }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a timer of the provided name.
|
/// Get a timer of the provided name.
|
||||||
pub fn timer<AS: AsRef<str>>(&self, name: AS) -> ScopeTimer<M> {
|
pub fn timer<AS: AsRef<str>>(&self, name: AS) -> LocalTimer<M> {
|
||||||
let metric = self.define_metric(Timer, name.as_ref(), 1.0);
|
let metric = self.define_metric(Timer, name.as_ref(), 1.0);
|
||||||
ScopeTimer { metric }
|
LocalTimer { metric }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a gauge of the provided name.
|
/// Get a gauge of the provided name.
|
||||||
pub fn gauge<AS: AsRef<str>>(&self, name: AS) -> ScopeGauge<M> {
|
pub fn gauge<AS: AsRef<str>>(&self, name: AS) -> LocalGauge<M> {
|
||||||
let metric = self.define_metric(Gauge, name.as_ref(), 1.0);
|
let metric = self.define_metric(Gauge, name.as_ref(), 1.0);
|
||||||
ScopeGauge { metric }
|
LocalGauge { metric }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Intercept both metric definition and scope creation, possibly changing the metric type.
|
/// Intercept both metric definition and scope creation, possibly changing the metric type.
|
||||||
pub fn mod_both<MF, N>(&self, mod_fn: MF) -> ScopeMetrics<N>
|
pub fn mod_both<MF, N>(&self, mod_fn: MF) -> LocalMetrics<N>
|
||||||
where
|
where
|
||||||
MF: Fn(DefineMetricFn<M>, OpenScopeFn<M>) -> (DefineMetricFn<N>, OpenScopeFn<N>),
|
MF: Fn(DefineMetricFn<M>, OpenScopeFn<M>) -> (DefineMetricFn<N>, OpenScopeFn<N>),
|
||||||
N: Clone + Send + Sync,
|
N: Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
let (metric_fn, scope_fn) =
|
let (metric_fn, scope_fn) =
|
||||||
mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone());
|
mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone());
|
||||||
ScopeMetrics {
|
LocalMetrics {
|
||||||
define_metric_fn: metric_fn,
|
define_metric_fn: metric_fn,
|
||||||
scope_metric_fn: scope_fn,
|
scope_metric_fn: scope_fn,
|
||||||
}
|
}
|
||||||
|
@ -119,32 +119,32 @@ impl<M: Send + Sync + Clone + 'static> ScopeMetrics<M> {
|
||||||
where
|
where
|
||||||
MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
|
MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
|
||||||
{
|
{
|
||||||
ScopeMetrics {
|
LocalMetrics {
|
||||||
define_metric_fn: self.define_metric_fn.clone(),
|
define_metric_fn: self.define_metric_fn.clone(),
|
||||||
scope_metric_fn: mod_fn(self.scope_metric_fn.clone()),
|
scope_metric_fn: mod_fn(self.scope_metric_fn.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> From<ScopeMetrics<M>> for AppMetrics<M> {
|
impl<M> From<LocalMetrics<M>> for AppMetrics<M> {
|
||||||
fn from(chain: ScopeMetrics<M>) -> AppMetrics<M> {
|
fn from(metrics: LocalMetrics<M>) -> AppMetrics<M> {
|
||||||
AppMetrics::new(chain.define_metric_fn.clone(), chain.open_scope(false))
|
AppMetrics::new(metrics.define_metric_fn.clone(), metrics.open_scope(false))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + Clone + 'static> WithCache for ScopeMetrics<M> {
|
impl<M: Send + Sync + Clone + 'static> WithCache for LocalMetrics<M> {
|
||||||
fn with_cache(&self, cache_size: usize) -> Self {
|
fn with_cache(&self, cache_size: usize) -> Self {
|
||||||
ScopeMetrics {
|
LocalMetrics {
|
||||||
define_metric_fn: add_cache(cache_size, self.define_metric_fn.clone()),
|
define_metric_fn: add_cache(cache_size, self.define_metric_fn.clone()),
|
||||||
scope_metric_fn: self.scope_metric_fn.clone(),
|
scope_metric_fn: self.scope_metric_fn.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + Clone + 'static> WithNamespace for ScopeMetrics<M> {
|
impl<M: Send + Sync + Clone + 'static> WithNamespace for LocalMetrics<M> {
|
||||||
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
|
fn with_name<IN: Into<Namespace>>(&self, names: IN) -> Self {
|
||||||
let ref ninto = names.into();
|
let ref ninto = names.into();
|
||||||
ScopeMetrics {
|
LocalMetrics {
|
||||||
define_metric_fn: add_namespace(ninto, self.define_metric_fn.clone()),
|
define_metric_fn: add_namespace(ninto, self.define_metric_fn.clone()),
|
||||||
scope_metric_fn: self.scope_metric_fn.clone(),
|
scope_metric_fn: self.scope_metric_fn.clone(),
|
||||||
}
|
}
|
||||||
|
@ -156,11 +156,11 @@ impl<M: Send + Sync + Clone + 'static> WithNamespace for ScopeMetrics<M> {
|
||||||
/// preventing programming errors.
|
/// preventing programming errors.
|
||||||
#[derive(Derivative)]
|
#[derive(Derivative)]
|
||||||
#[derivative(Debug)]
|
#[derivative(Debug)]
|
||||||
pub struct ScopeMarker<M> {
|
pub struct LocalMarker<M> {
|
||||||
metric: M,
|
metric: M,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> ScopeMarker<M> {
|
impl<M> LocalMarker<M> {
|
||||||
/// Record a single event occurence.
|
/// Record a single event occurence.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn mark(&self, scope: &mut ControlScopeFn<M>) {
|
pub fn mark(&self, scope: &mut ControlScopeFn<M>) {
|
||||||
|
@ -171,11 +171,11 @@ impl<M> ScopeMarker<M> {
|
||||||
/// A counter that sends values to the metrics backend
|
/// A counter that sends values to the metrics backend
|
||||||
#[derive(Derivative)]
|
#[derive(Derivative)]
|
||||||
#[derivative(Debug)]
|
#[derivative(Debug)]
|
||||||
pub struct ScopeCounter<M> {
|
pub struct LocalCounter<M> {
|
||||||
metric: M,
|
metric: M,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> ScopeCounter<M> {
|
impl<M> LocalCounter<M> {
|
||||||
/// Record a value count.
|
/// Record a value count.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn count<V>(&self, scope: &mut ControlScopeFn<M>, count: V)
|
pub fn count<V>(&self, scope: &mut ControlScopeFn<M>, count: V)
|
||||||
|
@ -189,11 +189,11 @@ impl<M> ScopeCounter<M> {
|
||||||
/// A gauge that sends values to the metrics backend
|
/// A gauge that sends values to the metrics backend
|
||||||
#[derive(Derivative)]
|
#[derive(Derivative)]
|
||||||
#[derivative(Debug)]
|
#[derivative(Debug)]
|
||||||
pub struct ScopeGauge<M> {
|
pub struct LocalGauge<M> {
|
||||||
metric: M,
|
metric: M,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Clone> ScopeGauge<M> {
|
impl<M: Clone> LocalGauge<M> {
|
||||||
/// Record a value point for this gauge.
|
/// Record a value point for this gauge.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn value<V>(&self, scope: &mut ControlScopeFn<M>, value: V)
|
pub fn value<V>(&self, scope: &mut ControlScopeFn<M>, value: V)
|
||||||
|
@ -212,11 +212,11 @@ impl<M: Clone> ScopeGauge<M> {
|
||||||
/// - with the interval_us() method, providing an externally determined microsecond interval
|
/// - with the interval_us() method, providing an externally determined microsecond interval
|
||||||
#[derive(Derivative)]
|
#[derive(Derivative)]
|
||||||
#[derivative(Debug)]
|
#[derivative(Debug)]
|
||||||
pub struct ScopeTimer<M> {
|
pub struct LocalTimer<M> {
|
||||||
metric: M,
|
metric: M,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Clone> ScopeTimer<M> {
|
impl<M: Clone> LocalTimer<M> {
|
||||||
/// Record a microsecond interval for this timer
|
/// Record a microsecond interval for this timer
|
||||||
/// Can be used in place of start()/stop() if an external time interval source is used
|
/// Can be used in place of start()/stop() if an external time interval source is used
|
||||||
#[inline]
|
#[inline]
|
26
src/multi.rs
26
src/multi.rs
|
@ -1,22 +1,22 @@
|
||||||
//! Dispatch metrics to multiple sinks.
|
//! Dispatch metrics to multiple sinks.
|
||||||
|
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use app_metrics::*;
|
use app_metrics::*;
|
||||||
|
|
||||||
/// Two chains of different types can be combined in a tuple.
|
/// Two chains of different types can be combined in a tuple.
|
||||||
/// The chains will act as one, each receiving calls in the order the appear in the tuple.
|
/// The chains will act as one, each receiving calls in the order the appear in the tuple.
|
||||||
/// For more than two types, make tuples of tuples, "Yo Dawg" style.
|
/// For more than two types, make tuples of tuples, "Yo Dawg" style.
|
||||||
impl<M1, M2> From<(ScopeMetrics<M1>, ScopeMetrics<M2>)> for ScopeMetrics<(M1, M2)>
|
impl<M1, M2> From<(LocalMetrics<M1>, LocalMetrics<M2>)> for LocalMetrics<(M1, M2)>
|
||||||
where
|
where
|
||||||
M1: 'static + Clone + Send + Sync,
|
M1: 'static + Clone + Send + Sync,
|
||||||
M2: 'static + Clone + Send + Sync,
|
M2: 'static + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
fn from(combo: (ScopeMetrics<M1>, ScopeMetrics<M2>)) -> ScopeMetrics<(M1, M2)> {
|
fn from(combo: (LocalMetrics<M1>, LocalMetrics<M2>)) -> LocalMetrics<(M1, M2)> {
|
||||||
let combo0 = combo.0.clone();
|
let combo0 = combo.0.clone();
|
||||||
let combo1 = combo.1.clone();
|
let combo1 = combo.1.clone();
|
||||||
|
|
||||||
ScopeMetrics::new(
|
LocalMetrics::new(
|
||||||
move |kind, name, rate| {
|
move |kind, name, rate| {
|
||||||
(
|
(
|
||||||
combo.0.define_metric(kind, name, rate),
|
combo.0.define_metric(kind, name, rate),
|
||||||
|
@ -43,28 +43,28 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M1, M2> From<(ScopeMetrics<M1>, ScopeMetrics<M2>)> for AppMetrics<(M1, M2)>
|
impl<M1, M2> From<(LocalMetrics<M1>, LocalMetrics<M2>)> for AppMetrics<(M1, M2)>
|
||||||
where
|
where
|
||||||
M1: 'static + Clone + Send + Sync,
|
M1: 'static + Clone + Send + Sync,
|
||||||
M2: 'static + Clone + Send + Sync,
|
M2: 'static + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
fn from(combo: (ScopeMetrics<M1>, ScopeMetrics<M2>)) -> AppMetrics<(M1, M2)> {
|
fn from(combo: (LocalMetrics<M1>, LocalMetrics<M2>)) -> AppMetrics<(M1, M2)> {
|
||||||
let chain: ScopeMetrics<(M1, M2)> = combo.into();
|
let chain: LocalMetrics<(M1, M2)> = combo.into();
|
||||||
app_metrics(chain)
|
app_metrics(chain)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Multiple chains of the same type can be combined in a slice.
|
/// Multiple chains of the same type can be combined in a slice.
|
||||||
/// The chains will act as one, each receiving calls in the order the appear in the slice.
|
/// The chains will act as one, each receiving calls in the order the appear in the slice.
|
||||||
impl<'a, M> From<&'a [ScopeMetrics<M>]> for ScopeMetrics<Vec<M>>
|
impl<'a, M> From<&'a [LocalMetrics<M>]> for LocalMetrics<Vec<M>>
|
||||||
where
|
where
|
||||||
M: 'static + Clone + Send + Sync,
|
M: 'static + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
fn from(chains: &'a [ScopeMetrics<M>]) -> ScopeMetrics<Vec<M>> {
|
fn from(chains: &'a [LocalMetrics<M>]) -> LocalMetrics<Vec<M>> {
|
||||||
let chains = chains.to_vec();
|
let chains = chains.to_vec();
|
||||||
let chains2 = chains.clone();
|
let chains2 = chains.clone();
|
||||||
|
|
||||||
ScopeMetrics::new(
|
LocalMetrics::new(
|
||||||
move |kind, name, rate| {
|
move |kind, name, rate| {
|
||||||
let mut metric = Vec::with_capacity(chains.len());
|
let mut metric = Vec::with_capacity(chains.len());
|
||||||
for chain in &chains {
|
for chain in &chains {
|
||||||
|
@ -94,12 +94,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M> From<&'a [ScopeMetrics<M>]> for AppMetrics<Vec<M>>
|
impl<'a, M> From<&'a [LocalMetrics<M>]> for AppMetrics<Vec<M>>
|
||||||
where
|
where
|
||||||
M: 'static + Clone + Send + Sync,
|
M: 'static + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
fn from(chains: &'a [ScopeMetrics<M>]) -> AppMetrics<Vec<M>> {
|
fn from(chains: &'a [LocalMetrics<M>]) -> AppMetrics<Vec<M>> {
|
||||||
let chain: ScopeMetrics<Vec<M>> = chains.into();
|
let chain: LocalMetrics<Vec<M>> = chains.into();
|
||||||
app_metrics(chain)
|
app_metrics(chain)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
//! Standard stateless metric outputs.
|
//! Standard stateless metric outputs.
|
||||||
// TODO parameterize templates
|
// TODO parameterize templates
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
/// Write metric values to stdout using `println!`.
|
/// Write metric values to stdout using `println!`.
|
||||||
pub fn to_stdout() -> ScopeMetrics<String> {
|
pub fn to_stdout() -> LocalMetrics<String> {
|
||||||
ScopeMetrics::new(
|
LocalMetrics::new(
|
||||||
|_kind, name, _rate| String::from(name),
|
|_kind, name, _rate| String::from(name),
|
||||||
|buffered| {
|
|buffered| {
|
||||||
if !buffered {
|
if !buffered {
|
||||||
|
@ -36,8 +36,8 @@ pub fn to_stdout() -> ScopeMetrics<String> {
|
||||||
|
|
||||||
/// Write metric values to the standard log using `info!`.
|
/// Write metric values to the standard log using `info!`.
|
||||||
// TODO parameterize log level
|
// TODO parameterize log level
|
||||||
pub fn to_log() -> ScopeMetrics<String> {
|
pub fn to_log() -> LocalMetrics<String> {
|
||||||
ScopeMetrics::new(
|
LocalMetrics::new(
|
||||||
|_kind, name, _rate| String::from(name),
|
|_kind, name, _rate| String::from(name),
|
||||||
|buffered| {
|
|buffered| {
|
||||||
if !buffered {
|
if !buffered {
|
||||||
|
@ -66,9 +66,9 @@ pub fn to_log() -> ScopeMetrics<String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Discard all metric values sent to it.
|
/// Discard all metric values sent to it.
|
||||||
pub fn to_void() -> ScopeMetrics<String> {
|
pub fn to_void() -> LocalMetrics<()> {
|
||||||
ScopeMetrics::new(
|
LocalMetrics::new(
|
||||||
move |_kind, name, _rate| String::from(name),
|
move |_kind, _name, _rate| (),
|
||||||
|_buffered| control_scope(|_cmd| {}),
|
|_buffered| control_scope(|_cmd| {}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use core::Kind::*;
|
use core::Kind::*;
|
||||||
use scores::{ScoreSnapshot, ScoreType};
|
use scores::{ScoreSnapshot, ScoreType};
|
||||||
use scores::ScoreType::*;
|
use scores::ScoreType::*;
|
||||||
|
@ -40,7 +40,7 @@ pub trait Publish: Send + Sync + Debug {
|
||||||
pub struct Publisher<E, M> {
|
pub struct Publisher<E, M> {
|
||||||
#[derivative(Debug = "ignore")]
|
#[derivative(Debug = "ignore")]
|
||||||
statistics: Box<E>,
|
statistics: Box<E>,
|
||||||
target_chain: ScopeMetrics<M>,
|
output: LocalMetrics<M>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E, M> Publisher<E, M>
|
impl<E, M> Publisher<E, M>
|
||||||
|
@ -50,10 +50,10 @@ where
|
||||||
{
|
{
|
||||||
/// Define a new metrics publishing strategy, from a transformation
|
/// Define a new metrics publishing strategy, from a transformation
|
||||||
/// function and a target metric chain.
|
/// function and a target metric chain.
|
||||||
pub fn new(stat_fn: E, target_chain: ScopeMetrics<M>) -> Self {
|
pub fn new(stat_fn: E, output: LocalMetrics<M>) -> Self {
|
||||||
Publisher {
|
Publisher {
|
||||||
statistics: Box::new(stat_fn),
|
statistics: Box::new(stat_fn),
|
||||||
target_chain,
|
output,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ where
|
||||||
E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
|
E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
fn publish(&self, snapshot: Vec<ScoreSnapshot>) {
|
fn publish(&self, snapshot: Vec<ScoreSnapshot>) {
|
||||||
let publish_scope_fn = self.target_chain.open_scope(false);
|
let publish_scope_fn = self.output.open_scope(false);
|
||||||
if snapshot.is_empty() {
|
if snapshot.is_empty() {
|
||||||
// no data was collected for this period
|
// no data was collected for this period
|
||||||
// TODO repeat previous frame min/max ?
|
// TODO repeat previous frame min/max ?
|
||||||
|
@ -73,9 +73,8 @@ where
|
||||||
for metric in snapshot {
|
for metric in snapshot {
|
||||||
for score in metric.2 {
|
for score in metric.2 {
|
||||||
if let Some(ex) = (self.statistics)(metric.0, metric.1.as_ref(), score) {
|
if let Some(ex) = (self.statistics)(metric.0, metric.1.as_ref(), score) {
|
||||||
let temp_metric =
|
let pub_metric = self.output.define_metric(ex.0, &ex.1.concat(), 1.0);
|
||||||
self.target_chain.define_metric(ex.0, &ex.1.concat(), 1.0);
|
publish_scope_fn.write(&pub_metric, ex.2);
|
||||||
publish_scope_fn.write(&temp_metric, ex.2);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
use app_metrics::AppMetrics;
|
||||||
|
use output;
|
||||||
|
use app_delegate::{AppRecv, AppSend};
|
||||||
|
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
fn no_app_metrics() -> Arc<AppRecv + Send + Sync> {
|
||||||
|
let void_metrics: AppMetrics<_> = output::to_void().into();
|
||||||
|
Arc::new(void_metrics)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The registry contains a list of every metrics dispatch point in the app.
|
||||||
|
lazy_static! {
|
||||||
|
static ref NO_APP_METRICS: Arc<AppRecv + Sync + Send> = no_app_metrics();
|
||||||
|
|
||||||
|
static ref DEFAULT_APP_RECEIVER: RwLock<Arc<AppRecv + Sync + Send>> = RwLock::new(NO_APP_METRICS.clone());
|
||||||
|
|
||||||
|
static ref DELEGATE_REGISTRY: RwLock<Vec<AppSend>> = RwLock::new(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a new app send.
|
||||||
|
pub fn add_app_send(send: AppSend) {
|
||||||
|
DELEGATE_REGISTRY.write().unwrap().push(send.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the default app recv.
|
||||||
|
pub fn get_default_app_recv() -> Arc<AppRecv + Send + Sync> {
|
||||||
|
DEFAULT_APP_RECEIVER.read().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Install a new receiver for all dispatched metrics, replacing any previous receiver.
|
||||||
|
pub fn send_app_metrics<IS: Into<AppMetrics<T>>, T: Send + Sync + Clone + 'static>(
|
||||||
|
into_recv: IS,
|
||||||
|
) {
|
||||||
|
let recv = into_recv.into();
|
||||||
|
for d in DELEGATE_REGISTRY.read().unwrap().iter() {
|
||||||
|
d.set_receiver(recv.clone());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,7 @@
|
||||||
//! Reduce the amount of data to process or transfer by statistically dropping some of it.
|
//! Reduce the amount of data to process or transfer by statistically dropping some of it.
|
||||||
|
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
|
|
||||||
use pcg32;
|
use pcg32;
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ where
|
||||||
fn with_sampling_rate(&self, sampling_rate: Rate) -> Self;
|
fn with_sampling_rate(&self, sampling_rate: Rate) -> Self;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send + Sync + 'static + Clone> WithSamplingRate for ScopeMetrics<M> {
|
impl<M: Send + Sync + 'static + Clone> WithSamplingRate for LocalMetrics<M> {
|
||||||
fn with_sampling_rate(&self, sampling_rate: Rate) -> Self {
|
fn with_sampling_rate(&self, sampling_rate: Rate) -> Self {
|
||||||
let int_sampling_rate = pcg32::to_int_rate(sampling_rate);
|
let int_sampling_rate = pcg32::to_int_rate(sampling_rate);
|
||||||
|
|
||||||
|
@ -52,10 +52,10 @@ impl<M: Send + Sync + 'static + Clone> WithSamplingRate for ScopeMetrics<M> {
|
||||||
|
|
||||||
/// Perform random sampling of values according to the specified rate.
|
/// Perform random sampling of values according to the specified rate.
|
||||||
#[deprecated(since = "0.5.0", note = "Use `with_sampling_rate` instead.")]
|
#[deprecated(since = "0.5.0", note = "Use `with_sampling_rate` instead.")]
|
||||||
pub fn sample<M, IC>(sampling_rate: Rate, chain: IC) -> ScopeMetrics<M>
|
pub fn sample<M, IC>(sampling_rate: Rate, chain: IC) -> LocalMetrics<M>
|
||||||
where
|
where
|
||||||
M: Clone + Send + Sync + 'static,
|
M: Clone + Send + Sync + 'static,
|
||||||
IC: Into<ScopeMetrics<M>>,
|
IC: Into<LocalMetrics<M>>,
|
||||||
{
|
{
|
||||||
let chain = chain.into();
|
let chain = chain.into();
|
||||||
chain.with_sampling_rate(sampling_rate)
|
chain.with_sampling_rate(sampling_rate)
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
//! Send metrics to a statsd server.
|
//! Send metrics to a statsd server.
|
||||||
|
|
||||||
use core::*;
|
use core::*;
|
||||||
use scope_metrics::*;
|
use local_metrics::*;
|
||||||
use error;
|
use error;
|
||||||
use self_metrics::*;
|
use self_metrics::*;
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ app_metrics! {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send metrics to a statsd server at the address and port provided.
|
/// Send metrics to a statsd server at the address and port provided.
|
||||||
pub fn to_statsd<ADDR>(address: ADDR) -> error::Result<ScopeMetrics<Statsd>>
|
pub fn to_statsd<ADDR>(address: ADDR) -> error::Result<LocalMetrics<Statsd>>
|
||||||
where
|
where
|
||||||
ADDR: ToSocketAddrs,
|
ADDR: ToSocketAddrs,
|
||||||
{
|
{
|
||||||
|
@ -26,7 +26,7 @@ where
|
||||||
socket.set_nonblocking(true)?;
|
socket.set_nonblocking(true)?;
|
||||||
socket.connect(address)?;
|
socket.connect(address)?;
|
||||||
|
|
||||||
Ok(ScopeMetrics::new(
|
Ok(LocalMetrics::new(
|
||||||
move |kind, name, rate| {
|
move |kind, name, rate| {
|
||||||
let mut prefix = String::with_capacity(32);
|
let mut prefix = String::with_capacity(32);
|
||||||
prefix.push_str(name);
|
prefix.push_str(name);
|
||||||
|
|
Loading…
Reference in New Issue