diff --git a/README.md b/README.md index 12ad112..31f604a 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ app_metrics!("my_app" => { }); fn main() { - send_delegated_metrics(to_stdout()); + send_app_metrics(to_stdout()); COUNTER_A.count(11); } ``` diff --git a/examples/macro_delegate.rs b/examples/macro_delegate.rs index 2da3566..b9a0c44 100644 --- a/examples/macro_delegate.rs +++ b/examples/macro_delegate.rs @@ -42,7 +42,7 @@ app_metrics!(LIB_METRICS => { }); fn main() { - send_delegated_metrics(to_stdout()); + send_app_metrics(to_stdout()); loop { ROOT_COUNTER.count(123); diff --git a/src/aggregate.rs b/src/aggregate.rs index c8f2094..a637e54 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -1,7 +1,7 @@ //! Maintain aggregated metrics for deferred reporting, //! use core::*; -use scope_metrics::*; +use local_metrics::*; use app_metrics::*; use namespace::*; use output::to_void; @@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock}; /// metrics.marker("my_event").mark(); /// metrics.marker("my_event").mark(); /// ``` -pub fn aggregate(stat_fn: E, to_chain: ScopeMetrics) -> Aggregator +pub fn aggregate(stat_fn: E, to_chain: LocalMetrics) -> Aggregator where E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static, M: Clone + Send + Sync + Debug + 'static, diff --git a/src/app_delegate.rs b/src/app_delegate.rs new file mode 100755 index 0000000..c900bcf --- /dev/null +++ b/src/app_delegate.rs @@ -0,0 +1,189 @@ +//! Decouple metric definition from configuration with trait objects. + +use core::*; +use app_metrics::*; +use namespace::*; +use registry; + +use std::collections::HashMap; +use std::sync::{Arc, RwLock, Weak}; + +use atomic_refcell::*; + + +/// Create a new dispatch point for metrics. +/// All dispatch points are automatically entered in the dispatch registry. +pub fn app_delegate() -> AppSend { + let send = AppSend { + inner: Arc::new(RwLock::new(InnerAppSend { + metrics: HashMap::new(), + recv: registry::get_default_app_recv(), + })), + }; + registry::add_app_send(send.clone()); + send +} + +/// Dynamic counterpart of a `Dispatcher`. +/// Adapter to AppMetrics<_> of unknown type. +pub trait AppRecv { + /// Register a new metric. + /// Only one metric of a certain name will be defined. + /// Observer must return a MetricHandle that uniquely identifies the metric. + fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box; + + /// Flush the receiver's scope. + fn flush(&self); +} + +/// Dynamic counterpart of the `DispatcherMetric`. +/// Adapter to a metric of unknown type. +pub trait AppRecvMetric { + /// Write metric value to a scope. + /// Observers only receive previously registered handles. + fn write(&self, value: Value); +} + +/// Shortcut name because `AppMetrics` +/// looks better than `AppMetrics>`. +pub type Delegate = Arc; + +/// A dynamically dispatched metric. +#[derive(Derivative)] +#[derivative(Debug)] +pub struct AppSendMetric { + kind: Kind, + name: String, + rate: Rate, + #[derivative(Debug = "ignore")] + recv_metric: AtomicRefCell>, + #[derivative(Debug = "ignore")] + send: AppSend, +} + +/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out. +impl Drop for AppSendMetric { + fn drop(&mut self) { + self.send.drop_metric(self) + } +} + +/// A dynamic dispatch point for app and lib metrics. +/// Decouples metrics definition from backend configuration. +/// Allows defining metrics before a concrete type has been selected. +/// Allows replacing metrics backend on the fly at runtime. +#[derive(Clone)] +pub struct AppSend { + inner: Arc>, +} + +struct InnerAppSend { + metrics: HashMap>, + recv: Arc, +} + +impl From<&'static str> for AppMetrics { + fn from(prefix: &'static str) -> AppMetrics { + let app_metrics: AppMetrics = app_delegate().into(); + app_metrics.with_prefix(prefix) + } +} + +impl From for AppMetrics { + fn from(send: AppSend) -> AppMetrics { + let send_cmd = send.clone(); + AppMetrics::new( + // define metric + Arc::new(move |kind, name, rate| send.define_metric(kind, name, rate)), + + // write / flush metric + control_scope(move |cmd| match cmd { + ScopeCmd::Write(metric, value) => { + let dispatch: &Arc = metric; + dispatch.recv_metric.borrow().write(value); +// let recv_metric: AtomicRef> = dispatch.recv_metric.borrow(); +// recv_metric.write(value) + } + ScopeCmd::Flush => send_cmd.inner.write().expect("Locking Delegate").recv.flush(), + }), + ) + } +} + +impl AppSend { + /// Install a new metric receiver, replacing the previous one. + pub fn set_receiver>, T: Send + Sync + Clone + 'static>( + &self, + receiver: IS, + ) { + let receiver: Arc = Arc::new(receiver.into()); + let inner: &mut InnerAppSend = + &mut *self.inner.write().expect("Lock Metrics Send"); + + for mut metric in inner.metrics.values() { + if let Some(metric) = metric.upgrade() { + let receiver_metric = + receiver.define_metric(metric.kind, metric.name.as_ref(), metric.rate); + *metric.recv_metric.borrow_mut() = receiver_metric; + } + } + // TODO return old receiver (swap, how?) + inner.recv = receiver; + } + + fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Delegate { + let mut inner = self.inner.write().expect("Lock Metrics Send"); + inner.metrics.get(name) + .and_then(|metric_ref| Weak::upgrade(metric_ref)) + .unwrap_or_else(|| { + let recv_metric = inner.recv.define_metric(kind, name, rate); + let new_metric = Arc::new(AppSendMetric { + kind, + name: name.to_string(), + rate, + recv_metric: AtomicRefCell::new(recv_metric), + send: self.clone(), + }); + inner.metrics.insert( + new_metric.name.clone(), + Arc::downgrade(&new_metric), + ); + new_metric + }) + } + + fn drop_metric(&self, metric: &AppSendMetric) { + let mut inner = self.inner.write().expect("Lock Metrics Send"); + if inner.metrics.remove(&metric.name).is_none() { + panic!("Could not remove DelegatingMetric weak ref from delegation point") + } + } +} + +#[cfg(feature = "bench")] +mod bench { + + use super::*; + use test; + use core::Kind::*; + use aggregate::*; + use publish::*; + + #[bench] + fn dispatch_marker_to_aggregate(b: &mut test::Bencher) { + let dispatch = app_delegate(); + let sink: AppMetrics = dispatch.clone().into(); + dispatch.set_receiver(aggregate(summary, to_void())); + let metric = sink.marker("event_a"); + b.iter(|| test::black_box(metric.mark())); + } + + #[bench] + fn dispatch_marker_to_void(b: &mut test::Bencher) { + let dispatch = app_delegate(); + let sink: AppMetrics = dispatch.into(); + let metric = sink.marker("event_a"); + b.iter(|| test::black_box(metric.mark())); + } + +} diff --git a/src/app_metrics.rs b/src/app_metrics.rs index 6053cdc..9f6be1e 100644 --- a/src/app_metrics.rs +++ b/src/app_metrics.rs @@ -12,7 +12,7 @@ use core::Kind::*; use namespace::*; use cache::*; use schedule::*; -use delegate::*; +use app_delegate::*; use std::time::Duration; @@ -226,17 +226,17 @@ where //// Dispatch / Receiver impl -struct AppReceiverMetric { +struct AppRecvMetricImpl { metric: M, scope: ControlScopeFn, } -impl Receiver for AppMetrics { - fn box_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box { +impl AppRecv for AppMetrics { + fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box { let scope: ControlScopeFn = self.single_scope.clone(); let metric: M = self.define_metric(kind, name, rate); - Box::new(AppReceiverMetric { metric, scope }) + Box::new(AppRecvMetricImpl { metric, scope }) } fn flush(&self) { @@ -244,7 +244,7 @@ impl Receiver for AppMetrics { } } -impl ReceiverMetric for AppReceiverMetric { +impl AppRecvMetric for AppRecvMetricImpl { fn write(&self, value: Value) { self.scope.write(&self.metric, value); } diff --git a/src/async_queue.rs b/src/async_queue.rs index a52070a..e73b78f 100644 --- a/src/async_queue.rs +++ b/src/async_queue.rs @@ -3,7 +3,7 @@ //! If queue size is exceeded, calling code reverts to blocking. //! use core::*; -use scope_metrics::*; +use local_metrics::*; use self_metrics::*; use std::sync::Arc; @@ -25,7 +25,7 @@ where fn with_async_queue(&self, queue_size: usize) -> Self; } -impl WithAsyncQueue for ScopeMetrics { +impl WithAsyncQueue for LocalMetrics { fn with_async_queue(&self, queue_size: usize) -> Self { self.mod_scope(|next| { // setup channel @@ -78,10 +78,10 @@ impl WithAsyncQueue for ScopeMetrics { /// Enqueue collected metrics for dispatch on background thread. #[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")] -pub fn async(queue_size: usize, chain: IC) -> ScopeMetrics +pub fn async(queue_size: usize, chain: IC) -> LocalMetrics where M: Clone + Send + Sync + 'static, - IC: Into>, + IC: Into>, { let chain = chain.into(); chain.with_async_queue(queue_size) diff --git a/src/delegate.rs b/src/delegate.rs deleted file mode 100755 index 5c4c57b..0000000 --- a/src/delegate.rs +++ /dev/null @@ -1,212 +0,0 @@ -//! Decouple metric definition from configuration with trait objects. - -use core::*; -use app_metrics::*; -use output::*; -use namespace::*; - -use std::collections::HashMap; -use std::sync::{Arc, RwLock, Weak}; - -use atomic_refcell::*; - -/// The registry contains a list of every metrics dispatch point in the app. -lazy_static! { - static ref DELEGATE_REGISTRY: RwLock> = RwLock::new(vec![]); -} - -/// Install a new receiver for all dispatched metrics, replacing any previous receiver. -pub fn send_delegated_metrics>, T: Send + Sync + Clone + 'static>( - receiver: IS, -) { - let rec = receiver.into(); - for d in DELEGATE_REGISTRY.read().unwrap().iter() { - d.set_receiver(rec.clone()); - } -} - -/// Create a new dispatch point for metrics. -/// All dispatch points are automatically entered in the dispatch registry. -pub fn delegate() -> DelegationPoint { - let delegation_point = DelegationPoint { - inner: Arc::new(RwLock::new(InnerDelegationPoint { - metrics: HashMap::new(), - receiver: Box::new(app_metrics(to_void())), - })), - }; - DELEGATE_REGISTRY - .write() - .unwrap() - .push(delegation_point.clone()); - delegation_point -} - -/// Dynamic counterpart of a `Dispatcher`. -/// Adapter to AppMetrics<_> of unknown type. -pub trait Receiver { - /// Register a new metric. - /// Only one metric of a certain name will be defined. - /// Observer must return a MetricHandle that uniquely identifies the metric. - fn box_metric(&self, kind: Kind, name: &str, rate: Rate) -> Box; - - /// Flush the receiver's scope. - fn flush(&self); -} - -/// Dynamic counterpart of the `DispatcherMetric`. -/// Adapter to a metric of unknown type. -pub trait ReceiverMetric { - /// Write metric value to a scope. - /// Observers only receive previously registered handles. - fn write(&self, value: Value); -} - -/// Shortcut name because `AppMetrics` -/// looks better than `AppMetrics>`. -pub type Delegate = Arc; - -/// A dynamically dispatched metric. -#[derive(Derivative)] -#[derivative(Debug)] -pub struct DelegatingMetric { - kind: Kind, - name: String, - rate: Rate, - #[derivative(Debug = "ignore")] - receiver: AtomicRefCell>, - #[derivative(Debug = "ignore")] - dispatcher: DelegationPoint, -} - -/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out. -impl Drop for DelegatingMetric { - fn drop(&mut self) { - self.dispatcher.drop_metric(self) - } -} - -/// A dynamic dispatch point for app and lib metrics. -/// Decouples metrics definition from backend configuration. -/// Allows defining metrics before a concrete type has been selected. -/// Allows replacing metrics backend on the fly at runtime. -#[derive(Clone)] -pub struct DelegationPoint { - inner: Arc>, -} - -struct InnerDelegationPoint { - metrics: HashMap>, - receiver: Box, -} - -impl From<&'static str> for AppMetrics { - fn from(prefix: &'static str) -> AppMetrics { - let app_metrics: AppMetrics = delegate().into(); - app_metrics.with_prefix(prefix) - } -} - -impl From for AppMetrics { - fn from(dispatcher: DelegationPoint) -> AppMetrics { - let dispatcher_1 = dispatcher.clone(); - AppMetrics::new( - // define metric - Arc::new(move |kind, name, rate| dispatcher.define_metric(kind, name, rate)), - // write / flush metric - control_scope(move |cmd| match cmd { - ScopeCmd::Write(metric, value) => { - let dispatch: &Arc = metric; - let receiver_metric: AtomicRef< - Box, - > = dispatch.receiver.borrow(); - receiver_metric.write(value) - } - ScopeCmd::Flush => dispatcher_1 - .inner - .write() - .expect("Locking dispatcher") - .receiver - .flush(), - }), - ) - } -} - -impl DelegationPoint { - /// Install a new metric receiver, replacing the previous one. - pub fn set_receiver>, T: Send + Sync + Clone + 'static>( - &self, - receiver: IS, - ) { - let receiver: Box = Box::new(receiver.into()); - let inner: &mut InnerDelegationPoint = - &mut *self.inner.write().expect("Locking dispatcher"); - - for mut metric in inner.metrics.values() { - if let Some(metric) = metric.upgrade() { - let receiver_metric = - receiver.box_metric(metric.kind, metric.name.as_ref(), metric.rate); - *metric.receiver.borrow_mut() = receiver_metric; - } - } - // TODO return old receiver (swap, how?) - inner.receiver = receiver; - } - - /// Define a dispatch metric, registering it with the current receiver. - /// A weak ref is kept to update receiver metric if receiver is replaced. - pub fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> Delegate { - let mut inner = self.inner.write().expect("Locking dispatcher"); - - let receiver_metric = inner.receiver.box_metric(kind, name, rate); - - let delegating_metric = Arc::new(DelegatingMetric { - kind, - name: name.to_string(), - rate, - receiver: AtomicRefCell::new(receiver_metric), - dispatcher: self.clone(), - }); - - inner.metrics.insert( - delegating_metric.name.clone(), - Arc::downgrade(&delegating_metric), - ); - delegating_metric - } - - fn drop_metric(&self, metric: &DelegatingMetric) { - let mut inner = self.inner.write().expect("Locking delegation point"); - if inner.metrics.remove(&metric.name).is_none() { - panic!("Could not remove DelegatingMetric weak ref from delegation point") - } - } -} - -#[cfg(feature = "bench")] -mod bench { - - use super::*; - use test; - use core::Kind::*; - use aggregate::*; - use publish::*; - - #[bench] - fn dispatch_marker_to_aggregate(b: &mut test::Bencher) { - let dispatch = delegate(); - let sink: AppMetrics = dispatch.clone().into(); - dispatch.set_receiver(aggregate(summary, to_void())); - let metric = sink.marker("event_a"); - b.iter(|| test::black_box(metric.mark())); - } - - #[bench] - fn dispatch_marker_to_void(b: &mut test::Bencher) { - let dispatch = delegate(); - let sink: AppMetrics = dispatch.into(); - let metric = sink.marker("event_a"); - b.iter(|| test::black_box(metric.mark())); - } - -} diff --git a/src/graphite.rs b/src/graphite.rs index b0562e9..a0928e2 100644 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -1,7 +1,7 @@ //! Send metrics to a graphite server. use core::*; -use scope_metrics::*; +use local_metrics::*; use error; use self_metrics::*; @@ -23,13 +23,13 @@ app_metrics!{ } /// Send metrics to a graphite server at the address and port provided. -pub fn to_graphite(address: ADDR) -> error::Result> +pub fn to_graphite(address: ADDR) -> error::Result> where ADDR: ToSocketAddrs + Debug + Clone, { debug!("Connecting to graphite {:?}", address); let socket = Arc::new(RwLock::new(RetrySocket::new(address.clone())?)); - Ok(ScopeMetrics::new( + Ok(LocalMetrics::new( move |kind, name, rate| { let mut prefix = String::with_capacity(32); prefix.push_str(name); diff --git a/src/lib.rs b/src/lib.rs index e8c8dc0..0b98474 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,11 +30,14 @@ pub mod macros; pub mod core; pub use core::*; -pub mod scope_metrics; -pub use scope_metrics::*; +pub mod local_metrics; +pub use local_metrics::*; -pub mod delegate; -pub use delegate::*; +//pub mod local_delegate; +//pub use local_delegate::*; + +pub mod app_delegate; +pub use app_delegate::*; mod output; pub use output::*; @@ -81,5 +84,8 @@ pub use async_queue::*; mod schedule; pub use schedule::*; +mod registry; +pub use registry::*; + mod self_metrics; pub use self_metrics::snapshot; diff --git a/src/local_delegate.rs b/src/local_delegate.rs new file mode 100755 index 0000000..fe872a9 --- /dev/null +++ b/src/local_delegate.rs @@ -0,0 +1,205 @@ +//! Decouple metric definition from configuration with trait objects. + +use core::*; +use local_metrics::*; +use output::*; +use namespace::*; +use registry::*; + +use std::collections::HashMap; +use std::sync::{Arc, RwLock, Weak}; + +use atomic_refcell::*; + + +/// Create a new dispatch point for metrics. +/// All dispatch points are automatically entered in the dispatch registry. +pub fn local_delegate() -> LocalSend { + let delegation_point = LocalSend { + inner_send: Arc::new(RwLock::new(InnerLocalSend { + active_metrics: HashMap::new(), + recv: registry.get, + last_metric_id: 0, + })), + }; + register_local_delegation(delegation_point.clone()); + delegation_point +} + +/// Dynamic counterpart of a `Dispatcher`. +/// Adapter to LocalMetrics<_> of unknown type. +pub trait LocalRecv { + /// Register a new metric. + /// Only one metric of a certain name will be defined. + /// Observer must return a MetricHandle that uniquely identifies the metric. + fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> LocalRecvMetric; + + /// Flush the receiver's scope. + fn open_scope(&self, buffered: bool) -> Arc; +} + +/// A dynamically dispatched metric. +#[derive(Derivative)] +#[derivative(Debug)] +pub trait LocalRecvScope { + fn write(&self, metric: &LocalRecvMetric, value: Value); + fn flush(&self); +} + +pub struct LocalRecvMetric (u64); + +/// Shortcut name because `AppMetrics` +/// looks better than `AppMetrics>`. +pub type LocalDelegate = Arc; + +/// A dynamically dispatched metric. +#[derive(Derivative)] +#[derivative(Debug)] +pub struct LocalSendMetric { + kind: Kind, + name: String, + rate: Rate, + metric_id: usize, + #[derivative(Debug = "ignore")] + send: LocalSend, +} + +/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out. +impl Drop for LocalSendMetric { + fn drop(&mut self) { + self.dispatcher.drop_metric(self) + } +} + + +/// Dispatcher weak ref does not prevent dropping but still needs to be cleaned out. +impl Drop for LocalSendMetric { + fn drop(&mut self) { + self.send.drop_metric(self) + } +} + +/// A dynamic dispatch point for app and lib metrics. +/// Decouples metrics definition from backend configuration. +/// Allows defining metrics before a concrete type has been selected. +/// Allows replacing metrics backend on the fly at runtime. +#[derive(Clone)] +pub struct LocalSend { + inner_send: Arc>, +} + +struct InnerLocalSend { + recv: Box, + active_metrics: HashMap>, + last_metric_id: usize, +} + +impl From<&'static str> for LocalMetrics { + fn from(prefix: &'static str) -> LocalMetrics { + let app_metrics: LocalMetrics = local_delegate().into(); + app_metrics.with_prefix(prefix) + } +} + +impl From for LocalMetrics { + fn from(send: LocalSend) -> LocalMetrics { + let send_1 = send.clone(); + LocalMetrics::new( + // define metric + Arc::new(move |kind, name, rate| send.define_metric(kind, name, rate)), + // write / flush metric + Arc::new(move |buffered| send.open_scope(buffered)) + ) + } +} + +impl LocalSend { + /// Install a new metric receiver, replacing the previous one. + pub fn set_receiver>, T: Send + Sync + Clone + 'static>( + &self, + receiver: IS, + ) { + let receiver: Box = Box::new(receiver.into()); + let inner: &mut InnerLocalSend = + &mut *self.inner_send.write().expect("Lock Metrics Send"); + + for mut metric in inner.active_metrics.values() { + if let Some(metric) = metric.upgrade() { + let receiver_metric = + receiver.box_metric(metric.kind, metric.name.as_ref(), metric.rate); + *metric.receiver.borrow_mut() = receiver_metric; + } + } + // TODO return old receiver (swap, how?) + inner.recv = receiver; + } + + fn define_metric(&self, kind: Kind, name: &str, rate: Rate) -> LocalDelegate { + let mut inner = self.inner_send.write().expect("Lock Metrics Send"); + inner.metrics.get(name) + .and_then(|metric_ref| Weak::upgrade(metric_ref)) + .unwrap_or_else(|| { + let recv_metric = inner.recv.define_metric(kind, name, rate); + let new_metric = Arc::new(LocalSendMetric { + kind, + name: name.to_string(), + rate, + metric_id: inner.last_metric_id += 1, + send: send.clone(), + }); + inner.metrics.insert( + new_metric.name.clone(), + Arc::downgrade(&new_metric), + ); + new_metric + }) + } + + + pub fn open_scope(&self, buffered: bool) -> Arc> { + let mut inner = self.inner_send.write().expect("Lock Metrics Send"); + let write_scope = inner.recv.open_scope(buffered); + let flush_scope = write_scope.clone(); + Arc::new(move |cmd| { + match cmd { + ScopeCmd::Write(metric, value) => write_scope.write(metric, value), + ScopeCmd::Flush => flush_scope.flush(), + } + }) + } + + fn drop_metric(&self, metric: &LocalSendMetric) { + let mut inner = self.inner_send.write().expect("Lock Metrics Send"); + if inner.metrics.remove(&metric.name).is_none() { + panic!("Could not remove DelegatingMetric weak ref from delegation point") + } + } +} + +#[cfg(feature = "bench")] +mod bench { + + use super::*; + use test; + use core::Kind::*; + use aggregate::*; + use publish::*; + + #[bench] + fn dispatch_marker_to_aggregate(b: &mut test::Bencher) { + let dispatch = local_delegate(); + let sink: LocalMetrics = dispatch.clone().into(); + dispatch.set_receiver(aggregate(summary, to_void())); + let metric = sink.marker("event_a"); + b.iter(|| test::black_box(metric.mark())); + } + + #[bench] + fn dispatch_marker_to_void(b: &mut test::Bencher) { + let dispatch = local_delegate(); + let sink: LocalMetrics = dispatch.into(); + let metric = sink.marker("event_a"); + b.iter(|| test::black_box(metric.mark())); + } + +} diff --git a/src/scope_metrics.rs b/src/local_metrics.rs similarity index 85% rename from src/scope_metrics.rs rename to src/local_metrics.rs index a54ed3a..98a3b66 100755 --- a/src/scope_metrics.rs +++ b/src/local_metrics.rs @@ -13,7 +13,7 @@ use namespace::*; /// This is the building block for the metrics backend. #[derive(Derivative, Clone)] #[derivative(Debug)] -pub struct ScopeMetrics { +pub struct LocalMetrics { #[derivative(Debug = "ignore")] define_metric_fn: DefineMetricFn, @@ -21,7 +21,7 @@ pub struct ScopeMetrics { scope_metric_fn: OpenScopeFn, } -impl ScopeMetrics { +impl LocalMetrics { /// Define a new metric. #[allow(unused_variables)] pub fn define_metric(&self, kind: Kind, name: &str, sampling: Rate) -> M { @@ -62,14 +62,14 @@ impl ScopeMetrics { } } -impl ScopeMetrics { +impl LocalMetrics { /// Create a new metric chain with the provided metric definition and scope creation functions. pub fn new(make_metric: MF, make_scope: WF) -> Self where MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static, WF: Fn(bool) -> ControlScopeFn + Send + Sync + 'static, { - ScopeMetrics { + LocalMetrics { // capture the provided closures in Arc to provide cheap clones define_metric_fn: Arc::new(make_metric), scope_metric_fn: Arc::new(make_scope), @@ -77,38 +77,38 @@ impl ScopeMetrics { } /// Get an event counter of the provided name. - pub fn marker>(&self, name: AS) -> ScopeMarker { + pub fn marker>(&self, name: AS) -> LocalMarker { let metric = self.define_metric(Marker, name.as_ref(), 1.0); - ScopeMarker { metric } + LocalMarker { metric } } /// Get a counter of the provided name. - pub fn counter>(&self, name: AS) -> ScopeCounter { + pub fn counter>(&self, name: AS) -> LocalCounter { let metric = self.define_metric(Counter, name.as_ref(), 1.0); - ScopeCounter { metric } + LocalCounter { metric } } /// Get a timer of the provided name. - pub fn timer>(&self, name: AS) -> ScopeTimer { + pub fn timer>(&self, name: AS) -> LocalTimer { let metric = self.define_metric(Timer, name.as_ref(), 1.0); - ScopeTimer { metric } + LocalTimer { metric } } /// Get a gauge of the provided name. - pub fn gauge>(&self, name: AS) -> ScopeGauge { + pub fn gauge>(&self, name: AS) -> LocalGauge { let metric = self.define_metric(Gauge, name.as_ref(), 1.0); - ScopeGauge { metric } + LocalGauge { metric } } /// Intercept both metric definition and scope creation, possibly changing the metric type. - pub fn mod_both(&self, mod_fn: MF) -> ScopeMetrics + pub fn mod_both(&self, mod_fn: MF) -> LocalMetrics where MF: Fn(DefineMetricFn, OpenScopeFn) -> (DefineMetricFn, OpenScopeFn), N: Clone + Send + Sync, { let (metric_fn, scope_fn) = mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone()); - ScopeMetrics { + LocalMetrics { define_metric_fn: metric_fn, scope_metric_fn: scope_fn, } @@ -119,32 +119,32 @@ impl ScopeMetrics { where MF: Fn(OpenScopeFn) -> OpenScopeFn, { - ScopeMetrics { + LocalMetrics { define_metric_fn: self.define_metric_fn.clone(), scope_metric_fn: mod_fn(self.scope_metric_fn.clone()), } } } -impl From> for AppMetrics { - fn from(chain: ScopeMetrics) -> AppMetrics { - AppMetrics::new(chain.define_metric_fn.clone(), chain.open_scope(false)) +impl From> for AppMetrics { + fn from(metrics: LocalMetrics) -> AppMetrics { + AppMetrics::new(metrics.define_metric_fn.clone(), metrics.open_scope(false)) } } -impl WithCache for ScopeMetrics { +impl WithCache for LocalMetrics { fn with_cache(&self, cache_size: usize) -> Self { - ScopeMetrics { + LocalMetrics { define_metric_fn: add_cache(cache_size, self.define_metric_fn.clone()), scope_metric_fn: self.scope_metric_fn.clone(), } } } -impl WithNamespace for ScopeMetrics { +impl WithNamespace for LocalMetrics { fn with_name>(&self, names: IN) -> Self { let ref ninto = names.into(); - ScopeMetrics { + LocalMetrics { define_metric_fn: add_namespace(ninto, self.define_metric_fn.clone()), scope_metric_fn: self.scope_metric_fn.clone(), } @@ -156,11 +156,11 @@ impl WithNamespace for ScopeMetrics { /// preventing programming errors. #[derive(Derivative)] #[derivative(Debug)] -pub struct ScopeMarker { +pub struct LocalMarker { metric: M, } -impl ScopeMarker { +impl LocalMarker { /// Record a single event occurence. #[inline] pub fn mark(&self, scope: &mut ControlScopeFn) { @@ -171,11 +171,11 @@ impl ScopeMarker { /// A counter that sends values to the metrics backend #[derive(Derivative)] #[derivative(Debug)] -pub struct ScopeCounter { +pub struct LocalCounter { metric: M, } -impl ScopeCounter { +impl LocalCounter { /// Record a value count. #[inline] pub fn count(&self, scope: &mut ControlScopeFn, count: V) @@ -189,11 +189,11 @@ impl ScopeCounter { /// A gauge that sends values to the metrics backend #[derive(Derivative)] #[derivative(Debug)] -pub struct ScopeGauge { +pub struct LocalGauge { metric: M, } -impl ScopeGauge { +impl LocalGauge { /// Record a value point for this gauge. #[inline] pub fn value(&self, scope: &mut ControlScopeFn, value: V) @@ -212,11 +212,11 @@ impl ScopeGauge { /// - with the interval_us() method, providing an externally determined microsecond interval #[derive(Derivative)] #[derivative(Debug)] -pub struct ScopeTimer { +pub struct LocalTimer { metric: M, } -impl ScopeTimer { +impl LocalTimer { /// Record a microsecond interval for this timer /// Can be used in place of start()/stop() if an external time interval source is used #[inline] diff --git a/src/multi.rs b/src/multi.rs index 1be1f6a..e220d4c 100644 --- a/src/multi.rs +++ b/src/multi.rs @@ -1,22 +1,22 @@ //! Dispatch metrics to multiple sinks. use core::*; -use scope_metrics::*; +use local_metrics::*; use app_metrics::*; /// Two chains of different types can be combined in a tuple. /// The chains will act as one, each receiving calls in the order the appear in the tuple. /// For more than two types, make tuples of tuples, "Yo Dawg" style. -impl From<(ScopeMetrics, ScopeMetrics)> for ScopeMetrics<(M1, M2)> +impl From<(LocalMetrics, LocalMetrics)> for LocalMetrics<(M1, M2)> where M1: 'static + Clone + Send + Sync, M2: 'static + Clone + Send + Sync, { - fn from(combo: (ScopeMetrics, ScopeMetrics)) -> ScopeMetrics<(M1, M2)> { + fn from(combo: (LocalMetrics, LocalMetrics)) -> LocalMetrics<(M1, M2)> { let combo0 = combo.0.clone(); let combo1 = combo.1.clone(); - ScopeMetrics::new( + LocalMetrics::new( move |kind, name, rate| { ( combo.0.define_metric(kind, name, rate), @@ -43,28 +43,28 @@ where } } -impl From<(ScopeMetrics, ScopeMetrics)> for AppMetrics<(M1, M2)> +impl From<(LocalMetrics, LocalMetrics)> for AppMetrics<(M1, M2)> where M1: 'static + Clone + Send + Sync, M2: 'static + Clone + Send + Sync, { - fn from(combo: (ScopeMetrics, ScopeMetrics)) -> AppMetrics<(M1, M2)> { - let chain: ScopeMetrics<(M1, M2)> = combo.into(); + fn from(combo: (LocalMetrics, LocalMetrics)) -> AppMetrics<(M1, M2)> { + let chain: LocalMetrics<(M1, M2)> = combo.into(); app_metrics(chain) } } /// Multiple chains of the same type can be combined in a slice. /// The chains will act as one, each receiving calls in the order the appear in the slice. -impl<'a, M> From<&'a [ScopeMetrics]> for ScopeMetrics> +impl<'a, M> From<&'a [LocalMetrics]> for LocalMetrics> where M: 'static + Clone + Send + Sync, { - fn from(chains: &'a [ScopeMetrics]) -> ScopeMetrics> { + fn from(chains: &'a [LocalMetrics]) -> LocalMetrics> { let chains = chains.to_vec(); let chains2 = chains.clone(); - ScopeMetrics::new( + LocalMetrics::new( move |kind, name, rate| { let mut metric = Vec::with_capacity(chains.len()); for chain in &chains { @@ -94,12 +94,12 @@ where } } -impl<'a, M> From<&'a [ScopeMetrics]> for AppMetrics> +impl<'a, M> From<&'a [LocalMetrics]> for AppMetrics> where M: 'static + Clone + Send + Sync, { - fn from(chains: &'a [ScopeMetrics]) -> AppMetrics> { - let chain: ScopeMetrics> = chains.into(); + fn from(chains: &'a [LocalMetrics]) -> AppMetrics> { + let chain: LocalMetrics> = chains.into(); app_metrics(chain) } } diff --git a/src/output.rs b/src/output.rs index 6f040d4..c16b7b1 100644 --- a/src/output.rs +++ b/src/output.rs @@ -1,12 +1,12 @@ //! Standard stateless metric outputs. // TODO parameterize templates use core::*; -use scope_metrics::*; +use local_metrics::*; use std::sync::RwLock; /// Write metric values to stdout using `println!`. -pub fn to_stdout() -> ScopeMetrics { - ScopeMetrics::new( +pub fn to_stdout() -> LocalMetrics { + LocalMetrics::new( |_kind, name, _rate| String::from(name), |buffered| { if !buffered { @@ -36,8 +36,8 @@ pub fn to_stdout() -> ScopeMetrics { /// Write metric values to the standard log using `info!`. // TODO parameterize log level -pub fn to_log() -> ScopeMetrics { - ScopeMetrics::new( +pub fn to_log() -> LocalMetrics { + LocalMetrics::new( |_kind, name, _rate| String::from(name), |buffered| { if !buffered { @@ -66,9 +66,9 @@ pub fn to_log() -> ScopeMetrics { } /// Discard all metric values sent to it. -pub fn to_void() -> ScopeMetrics { - ScopeMetrics::new( - move |_kind, name, _rate| String::from(name), +pub fn to_void() -> LocalMetrics<()> { + LocalMetrics::new( + move |_kind, _name, _rate| (), |_buffered| control_scope(|_cmd| {}), ) } diff --git a/src/publish.rs b/src/publish.rs index fdfd640..878a8e7 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -20,7 +20,7 @@ //! ``` use core::*; -use scope_metrics::*; +use local_metrics::*; use core::Kind::*; use scores::{ScoreSnapshot, ScoreType}; use scores::ScoreType::*; @@ -40,7 +40,7 @@ pub trait Publish: Send + Sync + Debug { pub struct Publisher { #[derivative(Debug = "ignore")] statistics: Box, - target_chain: ScopeMetrics, + output: LocalMetrics, } impl Publisher @@ -50,10 +50,10 @@ where { /// Define a new metrics publishing strategy, from a transformation /// function and a target metric chain. - pub fn new(stat_fn: E, target_chain: ScopeMetrics) -> Self { + pub fn new(stat_fn: E, output: LocalMetrics) -> Self { Publisher { statistics: Box::new(stat_fn), - target_chain, + output, } } } @@ -64,7 +64,7 @@ where E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static, { fn publish(&self, snapshot: Vec) { - let publish_scope_fn = self.target_chain.open_scope(false); + let publish_scope_fn = self.output.open_scope(false); if snapshot.is_empty() { // no data was collected for this period // TODO repeat previous frame min/max ? @@ -73,9 +73,8 @@ where for metric in snapshot { for score in metric.2 { if let Some(ex) = (self.statistics)(metric.0, metric.1.as_ref(), score) { - let temp_metric = - self.target_chain.define_metric(ex.0, &ex.1.concat(), 1.0); - publish_scope_fn.write(&temp_metric, ex.2); + let pub_metric = self.output.define_metric(ex.0, &ex.1.concat(), 1.0); + publish_scope_fn.write(&pub_metric, ex.2); } } } diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 0000000..3d69c27 --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,39 @@ +use app_metrics::AppMetrics; +use output; +use app_delegate::{AppRecv, AppSend}; + +use std::sync::{Arc, RwLock}; + +fn no_app_metrics() -> Arc { + let void_metrics: AppMetrics<_> = output::to_void().into(); + Arc::new(void_metrics) +} + +/// The registry contains a list of every metrics dispatch point in the app. +lazy_static! { + static ref NO_APP_METRICS: Arc = no_app_metrics(); + + static ref DEFAULT_APP_RECEIVER: RwLock> = RwLock::new(NO_APP_METRICS.clone()); + + static ref DELEGATE_REGISTRY: RwLock> = RwLock::new(vec![]); +} + +/// Register a new app send. +pub fn add_app_send(send: AppSend) { + DELEGATE_REGISTRY.write().unwrap().push(send.clone()); +} + +/// Get the default app recv. +pub fn get_default_app_recv() -> Arc { + DEFAULT_APP_RECEIVER.read().unwrap().clone() +} + +/// Install a new receiver for all dispatched metrics, replacing any previous receiver. +pub fn send_app_metrics>, T: Send + Sync + Clone + 'static>( + into_recv: IS, +) { + let recv = into_recv.into(); + for d in DELEGATE_REGISTRY.read().unwrap().iter() { + d.set_receiver(recv.clone()); + } +} \ No newline at end of file diff --git a/src/sample.rs b/src/sample.rs index d191c54..6f3450a 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -1,7 +1,7 @@ //! Reduce the amount of data to process or transfer by statistically dropping some of it. use core::*; -use scope_metrics::*; +use local_metrics::*; use pcg32; @@ -16,7 +16,7 @@ where fn with_sampling_rate(&self, sampling_rate: Rate) -> Self; } -impl WithSamplingRate for ScopeMetrics { +impl WithSamplingRate for LocalMetrics { fn with_sampling_rate(&self, sampling_rate: Rate) -> Self { let int_sampling_rate = pcg32::to_int_rate(sampling_rate); @@ -52,10 +52,10 @@ impl WithSamplingRate for ScopeMetrics { /// Perform random sampling of values according to the specified rate. #[deprecated(since = "0.5.0", note = "Use `with_sampling_rate` instead.")] -pub fn sample(sampling_rate: Rate, chain: IC) -> ScopeMetrics +pub fn sample(sampling_rate: Rate, chain: IC) -> LocalMetrics where M: Clone + Send + Sync + 'static, - IC: Into>, + IC: Into>, { let chain = chain.into(); chain.with_sampling_rate(sampling_rate) diff --git a/src/statsd.rs b/src/statsd.rs index 94367fa..4292761 100644 --- a/src/statsd.rs +++ b/src/statsd.rs @@ -1,7 +1,7 @@ //! Send metrics to a statsd server. use core::*; -use scope_metrics::*; +use local_metrics::*; use error; use self_metrics::*; @@ -18,7 +18,7 @@ app_metrics! { } /// Send metrics to a statsd server at the address and port provided. -pub fn to_statsd(address: ADDR) -> error::Result> +pub fn to_statsd(address: ADDR) -> error::Result> where ADDR: ToSocketAddrs, { @@ -26,7 +26,7 @@ where socket.set_nonblocking(true)?; socket.connect(address)?; - Ok(ScopeMetrics::new( + Ok(LocalMetrics::new( move |kind, name, rate| { let mut prefix = String::with_capacity(32); prefix.push_str(name);