mirror of https://github.com/fralalonde/dipstick
Fix shared flush listeners & schedule handles
This commit is contained in:
parent
23dc724a4b
commit
e5c74de9ed
|
@ -22,6 +22,7 @@ use dipstick::*;
|
|||
fn main() {
|
||||
let mut metrics = AtomicBucket::new().named("process");
|
||||
metrics.drain(Stream::to_stdout());
|
||||
metrics.flush_every(Duration::from_secs(3));
|
||||
|
||||
let uptime = metrics.gauge("uptime");
|
||||
metrics.observe(uptime, || 6).on_flush();
|
||||
|
@ -29,8 +30,6 @@ fn main() {
|
|||
let threads = metrics.gauge("threads");
|
||||
metrics.observe(threads, thread_count).every(Duration::from_secs(1));
|
||||
|
||||
metrics.flush_every(Duration::from_secs(3));
|
||||
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_millis(40));
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::{Arc};
|
||||
use std::collections::{HashMap};
|
||||
use std::default::Default;
|
||||
|
||||
|
@ -10,6 +10,13 @@ use std::time::Duration;
|
|||
use ::{InputScope, Gauge};
|
||||
use MetricValue;
|
||||
|
||||
#[cfg(not(feature="parking_lot"))]
|
||||
use std::sync::{RwLock};
|
||||
|
||||
#[cfg(feature="parking_lot")]
|
||||
use parking_lot::{RwLock};
|
||||
|
||||
|
||||
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum Sampling {
|
||||
|
@ -51,6 +58,8 @@ impl Default for Buffering {
|
|||
}
|
||||
}
|
||||
|
||||
type Shared<T> = Arc<RwLock<T>>;
|
||||
|
||||
/// Attributes common to metric components.
|
||||
/// Not all attributes used by all components.
|
||||
#[derive(Clone, Default)]
|
||||
|
@ -58,8 +67,8 @@ pub struct Attributes {
|
|||
naming: NameParts,
|
||||
sampling: Sampling,
|
||||
buffering: Buffering,
|
||||
flush_listeners: Vec<Arc<Fn() -> () + Send + Sync + 'static>>,
|
||||
tasks: Vec<CancelHandle>,
|
||||
flush_listeners: Shared<Vec<Arc<Fn() -> () + Send + Sync + 'static>>>,
|
||||
tasks: Shared<Vec<CancelHandle>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Attributes {
|
||||
|
@ -95,7 +104,7 @@ pub trait OnFlush {
|
|||
|
||||
impl <T> OnFlush for T where T: Flush + WithAttributes {
|
||||
fn notify_flush_listeners(&self) {
|
||||
for listener in &self.get_attributes().flush_listeners {
|
||||
for listener in read_lock!(self.get_attributes().flush_listeners).iter() {
|
||||
(listener)()
|
||||
}
|
||||
}
|
||||
|
@ -114,14 +123,14 @@ impl<'a, T, F> ObserveWhen<'a, T, F>
|
|||
pub fn on_flush(self) {
|
||||
let gauge = self.gauge;
|
||||
let op = self.operation;
|
||||
self.target.mut_attributes().flush_listeners.push(Arc::new(move || gauge.value(op())));
|
||||
write_lock!(self.target.mut_attributes().flush_listeners).push(Arc::new(move || gauge.value(op())));
|
||||
}
|
||||
|
||||
pub fn every(self, period: Duration,) -> CancelHandle {
|
||||
let gauge = self.gauge;
|
||||
let op = self.operation;
|
||||
let handle = SCHEDULER.schedule(period, move || gauge.value(op()));
|
||||
self.target.mut_attributes().tasks.push(handle.clone());
|
||||
write_lock!(self.target.mut_attributes().tasks).push(handle.clone());
|
||||
handle
|
||||
}
|
||||
}
|
||||
|
@ -150,7 +159,8 @@ impl<T: InputScope + WithAttributes> Observe for T {
|
|||
|
||||
impl Drop for Attributes {
|
||||
fn drop(&mut self) {
|
||||
for task in self.tasks.drain(..) {
|
||||
let mut tasks = write_lock!(self.tasks);
|
||||
for task in tasks.drain(..) {
|
||||
task.cancel()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue