Some cleanup

This commit is contained in:
Francis Lalonde 2018-12-20 07:35:31 -05:00
parent 0158c5c698
commit 547458f240
9 changed files with 93 additions and 83 deletions

View File

@ -1,12 +1,10 @@
//! A sample application asynchronously printing metrics to stdout.
#[macro_use]
extern crate dipstick;
use std::thread::sleep;
use std::time::Duration;
use dipstick::{Stream, Counter, InputScope, QueuedOutput, Input};
use std::io;
use dipstick::{Stream, InputScope, QueuedOutput, Input};
use std::thread;
fn main() {

View File

@ -1,8 +1,8 @@
//! An app demonstrating the basics of the metrics front-end.
//! Defines metrics of each kind and use them to print values to the console in multiple ways.
#[macro_use]
extern crate dipstick;
use std::thread::sleep;
use std::io;
use std::time::Duration;

View File

@ -4,12 +4,11 @@ extern crate dipstick;
use std::time::Duration;
use std::thread::sleep;
use std::io;
use dipstick::*;
fn main() {
let input = Stream::write_to(io::stdout()).buffered(Buffering::Unlimited);
let input = Stream::to_stdout().buffered(Buffering::Unlimited);
loop {
println!("\n------- open scope");

View File

@ -7,7 +7,6 @@ extern crate dipstick;
use std::time::Duration;
use dipstick::*;
use std::thread::sleep;
use std::io;
metrics!{
APP = "application" => {
@ -34,7 +33,7 @@ fn main() {
// send application metrics to aggregator
Proxy::default().set_target(all_buckets);
AtomicBucket::set_default_flush_to(Stream::write_to(io::stdout()));
AtomicBucket::set_default_flush_to(Stream::to_stdout());
AtomicBucket::set_default_stats(stats_all);
loop {

View File

@ -1,12 +1,10 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
#[macro_use]
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
use std::io;
// undeclared root (un-prefixed) metrics
metrics! {
@ -37,7 +35,7 @@ metrics!(LIB_METRICS => {
});
fn main() {
dipstick::Proxy::set_default_target(dipstick::Stream::write_to(io::stdout()).input());
dipstick::Proxy::set_default_target(Stream::to_stdout());
loop {
ROOT_COUNTER.count(123);

View File

@ -61,6 +61,17 @@ pub trait InputScope: Flush {
}
/// Blanket impl of input trait for input scope
impl<T: InputScope + Send + Sync + 'static + Clone> Input for T {
type SCOPE = Self;
/// Open a new scope from this output.
fn input(&self) -> Self::SCOPE {
self.clone()
}
}
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct InputMetric {

View File

@ -1,70 +1,75 @@
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingScopeBox {
attributes: Attributes,
inner: Arc<Mutex<LockScope>>
}
impl WithAttributes for LockingScopeBox {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingScopeBox {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
let raw_metric = self.inner.lock().expect("RawScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {
let _guard = mutex.lock().expect("OutputMetric Lock");
raw_metric.write(value, labels)
} )
}
}
impl Flush for LockingScopeBox {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("OutputScope Lock").flush()
}
}
/// Blanket impl that provides RawOutputs their dynamic flavor.
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingScopeBox;
fn input(&self) -> Self::SCOPE {
LockingScopeBox {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(LockScope(self.output_dyn())))
}
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
struct LockScope(Rc<OutputScope + 'static> );
impl ops::Deref for LockScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
unsafe impl Send for LockScope {}
unsafe impl Sync for LockScope {}
//! Default locking strategy for shared concurrent output.
//! This makes all outputs also immediately usable as inputs.
//! The alternatives are queuing or thread local.
use core::input::{InputScope, InputMetric, Input, InputKind};
use core::output::{Output, OutputScope};
use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::Flush;
use core::error;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::ops;
/// Synchronous thread-safety for metric output using basic locking.
#[derive(Clone)]
pub struct LockingOutput {
attributes: Attributes,
inner: Arc<Mutex<LockedOutputScope>>
}
impl WithAttributes for LockingOutput {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl InputScope for LockingOutput {
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
let name = self.prefix_append(name);
// lock when creating metrics
let raw_metric = self.inner.lock().expect("OutputScope Lock").new_metric(name, kind);
let mutex = self.inner.clone();
InputMetric::new(move |value, labels| {
// lock when collecting values
let _guard = mutex.lock().expect("OutputScope Lock");
raw_metric.write(value, labels)
} )
}
}
impl Flush for LockingOutput {
fn flush(&self) -> error::Result<()> {
self.inner.lock()?.flush()
}
}
impl<T: Output + Send + Sync + 'static> Input for T {
type SCOPE = LockingOutput;
fn input(&self) -> Self::SCOPE {
LockingOutput {
attributes: Attributes::default(),
inner: Arc::new(Mutex::new(LockedOutputScope(self.output_dyn())))
}
}
}
/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread at a time or dragons may occur.
#[derive(Clone)]
struct LockedOutputScope(Rc<OutputScope + 'static> );
impl ops::Deref for LockedOutputScope {
type Target = OutputScope + 'static;
fn deref(&self) -> &Self::Target {
Rc::as_ref(&self.0)
}
}
unsafe impl Send for LockedOutputScope {}
unsafe impl Sync for LockedOutputScope {}

View File

@ -3,7 +3,7 @@ pub mod name;
pub mod attributes;
pub mod input;
pub mod output;
pub mod out_lock;
pub mod locking;
pub mod clock;
pub mod void;
pub mod proxy;

View File

@ -33,7 +33,7 @@ pub use core::name::{MetricName, NameParts};
pub use core::input::{Input, InputDyn, InputScope, InputMetric, Counter, Timer, Marker, Gauge, InputKind};
pub use core::output::{Output, OutputDyn, OutputScope, OutputMetric};
pub use core::scheduler::{ScheduleFlush, CancelHandle};
pub use core::out_lock::{LockingScopeBox};
pub use core::locking::LockingOutput;
pub use core::error::{Result};
pub use core::clock::{TimeHandle};
pub use core::label::{Labels, AppLabel, ThreadLabel};