mirror of https://github.com/fralalonde/dipstick
Compare commits
3 Commits
a63ddbeab7
...
e3e82f86a9
Author | SHA1 | Date |
---|---|---|
Francis Lalonde | e3e82f86a9 | |
Francis Lalonde | a5939a794e | |
Francis Lalonde | 46241ad439 |
10
CHANGES.md
10
CHANGES.md
|
@ -1,6 +1,14 @@
|
|||
# Latest changes + history
|
||||
|
||||
## version 0.8.0 ("SUCH REJOICING")
|
||||
## version 0.9.0
|
||||
- Abandon custom Result type and error module in favor
|
||||
of io::Result usage across all API. (Based on @rtyler's comment in #80)
|
||||
- Update all dependencies to latest versions
|
||||
- Move Void module to output (internal change)
|
||||
- Examples no longer declare `extern crate dipstick;`
|
||||
|
||||
## version 0.8.0 - ("SUCH REJOICING")
|
||||
- THIS VERSION HAS BEEN YANKED - API broke (again) for 0.9.0 and 0.8.0 hadn't been out long enough.
|
||||
- Abandon non-threadsafe "Output"s in exchange for a simpler, more consistent API.
|
||||
Everything is now threadsafe and thus all "Output" have been promoted to Inputs.
|
||||
No significant performance loss was observed (using parking_lot locks).
|
||||
|
|
10
Cargo.toml
10
Cargo.toml
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "dipstick"
|
||||
version = "0.8.0"
|
||||
version = "0.9.0"
|
||||
authors = ["Francis Lalonde <fralalonde@gmail.com>"]
|
||||
|
||||
description = """A fast, all-purpose metrics library decoupling instrumentation from reporting backends.
|
||||
|
@ -24,16 +24,16 @@ lazy_static = "1"
|
|||
atomic_refcell = "0.1"
|
||||
skeptic = { version = "0.13", optional = true }
|
||||
num = { version = "0.2", default-features = false }
|
||||
crossbeam-channel = { version = "0.3", optional = true }
|
||||
parking_lot = { version = "0.9", optional = true }
|
||||
crossbeam-channel = { version = "0.4", optional = true }
|
||||
parking_lot = { version = "0.10", optional = true }
|
||||
|
||||
# FIXME required only for random seed for sampling
|
||||
time = "0.1"
|
||||
|
||||
minreq = { version = "1.0.0" }
|
||||
minreq = { version = "2" }
|
||||
|
||||
# optional dep for standalone http pull metrics
|
||||
tiny_http = { version = "0.6", optional = true }
|
||||
tiny_http = { version = "0.7", optional = true }
|
||||
|
||||
[build-dependencies]
|
||||
skeptic = { version = "0.13", optional = true }
|
||||
|
|
|
@ -53,7 +53,6 @@ Timers measure an operation's duration.
|
|||
Timers can be used in code with the `time!` macro, wrap around a closure or with explicit calls to `start()` and `stop()`.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
fn main() {
|
||||
let metrics = Stream::write_to_stdout().metrics();
|
||||
|
@ -90,7 +89,6 @@ Compared to counters:
|
|||
rather than min/max observed individual values.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
|
||||
fn main() {
|
||||
|
@ -110,7 +108,6 @@ As such, a gauge's aggregated statistics are simply the mean, max and min values
|
|||
Values can be observed for gauges at any moment, like any other metric.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
|
||||
fn main() {
|
||||
|
@ -125,7 +122,6 @@ The observation of values for any metric can be triggered on schedule or upon pu
|
|||
|
||||
This mechanism can be used for automatic reporting of gauge values:
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -151,7 +147,6 @@ Observations triggered `on_flush` take place _before_ metrics are published, al
|
|||
|
||||
Scheduling could also be used to setup a "heartbeat" metric:
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
use std::time::{Duration};
|
||||
|
||||
|
@ -182,7 +177,6 @@ Names are opaque to the application and are used only to identify the metrics up
|
|||
Names may be prepended with a application-namespace shared across all backends.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
fn main() {
|
||||
let stdout = Stream::write_to_stdout();
|
||||
|
@ -233,7 +227,6 @@ Notes about labels:
|
|||
Metric inputs are usually setup statically upon application startup.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
|
||||
metrics!("my_app" => {
|
||||
|
@ -254,7 +247,6 @@ If necessary, metrics can also be defined "dynamically".
|
|||
This is more flexible but has a higher runtime cost, which may be alleviated with the optional caching mechanism.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
fn main() {
|
||||
let user_name = "john_day";
|
||||
|
@ -303,7 +295,6 @@ Some outputs such as statsd also have the ability to sample metrics values.
|
|||
If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy.
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
fn main() {
|
||||
let _app_metrics = Statsd::send_to("localhost:8125").expect("connected")
|
||||
|
|
|
@ -41,7 +41,6 @@ These are all best done by downstream timeseries visualization and monitoring to
|
|||
Here's a basic aggregating & auto-publish counter metric:
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
|
||||
fn main() {
|
||||
|
@ -56,7 +55,6 @@ fn main() {
|
|||
Persistent apps wanting to declare static metrics will prefer using the `metrics!` macro:
|
||||
|
||||
```rust
|
||||
extern crate dipstick;
|
||||
use dipstick::*;
|
||||
|
||||
metrics! { METRICS = "my_app" => {
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application asynchronously printing metrics to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::{Input, InputScope, QueuedInput, Stream};
|
||||
use std::thread;
|
||||
use std::thread::sleep;
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
//! An app demonstrating the basics of the metrics front-end.
|
||||
//! Defines metrics of each kind and use them to print values to the console in multiple ways.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use dipstick::{time, Input, InputScope, Prefixed, Stream};
|
||||
use std::io;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application asynchronously printing metrics to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use dipstick::{stats_all, AtomicBucket, Input, InputScope, Stream};
|
||||
use std::env::args;
|
||||
use std::str::FromStr;
|
||||
use std::thread;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application asynchronously printing metrics to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use dipstick::{AtomicBucket, Input, InputScope, Proxy, Stream};
|
||||
use std::env::args;
|
||||
use std::str::FromStr;
|
||||
use std::thread;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application asynchronously printing metrics to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use dipstick::{AtomicBucket, Input, InputQueueScope, InputScope, Stream};
|
||||
use std::env::args;
|
||||
use std::str::FromStr;
|
||||
use std::thread;
|
||||
|
@ -11,6 +9,7 @@ use std::time::Duration;
|
|||
|
||||
fn main() {
|
||||
let bucket = AtomicBucket::new();
|
||||
// NOTE: Wrapping an AtomicBucket with a Queue probably useless, as it is very fast and performs no I/O.
|
||||
let queue = InputQueueScope::wrap(bucket.clone(), 10000);
|
||||
let event = queue.marker("a");
|
||||
let args = &mut args();
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application continuously aggregating metrics,
|
||||
//! printing the summary stats every three seconds
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application continuously aggregating metrics,
|
||||
//! printing the summary stats every three seconds
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Transient metrics are not retained by buckets after flushing.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
|
||||
use std::thread::sleep;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A sample application continuously aggregating metrics,
|
||||
//! printing the summary stats every three seconds
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Metrics are printed at the end of every cycle as scope is dropped
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application asynchronously printing metrics to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::io;
|
||||
use std::thread::sleep;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A dropwizard-like configuration using three buckets
|
||||
//! aggregating one, five and fifteen minutes of data.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
//! A demonstration of customization of exported aggregated metrics.
|
||||
//! Using match on origin metric kind or score type to alter publication output.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc metrics to graphite.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
|
||||
use std::time::Duration;
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::{Graphite, Input, InputScope, MultiInput, Prefixed, Stream};
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
//! ```
|
||||
//!
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use dipstick::*;
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc marker values both to statsd _and_ to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc metrics to prometheus.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Use the proxy to dynamically switch the metrics input & names.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::{Input, InputScope, Prefixed, Proxy, Stream};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Use the proxy to send metrics to multiple outputs
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
/// Create a pipeline that fans out
|
||||
/// The key here is to use AtomicBucket to read
|
||||
/// from the proxy and aggregate and flush metrics
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
//! Use the metrics backend directly to log a metric value.
|
||||
//! Applications should use the metrics()-provided instruments instead.
|
||||
|
||||
#[macro_use]
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::{Input, InputScope, Labels};
|
||||
use dipstick::{labels, Input, InputScope, Labels};
|
||||
|
||||
fn main() {
|
||||
raw_write()
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::*;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Print metrics to stderr with custom formatter including a label.
|
||||
|
||||
extern crate dipstick;
|
||||
|
||||
use dipstick::{
|
||||
AppLabel, Formatting, Input, InputKind, InputScope, LabelOp, LineFormat, LineOp, LineTemplate,
|
||||
MetricName, Stream,
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::clock::TimeHandle;
|
||||
use crate::error;
|
||||
use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use crate::name::MetricName;
|
||||
use crate::stats::ScoreType::*;
|
||||
|
@ -11,12 +10,12 @@ use crate::{Flush, MetricValue, Void};
|
|||
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::isize;
|
||||
use std::mem;
|
||||
use std::sync::atomic::AtomicIsize;
|
||||
use std::sync::atomic::Ordering::*;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
#[cfg(not(feature = "parking_lot"))]
|
||||
use std::sync::RwLock;
|
||||
|
@ -72,7 +71,7 @@ lazy_static! {
|
|||
}
|
||||
|
||||
impl InnerAtomicBucket {
|
||||
fn flush(&mut self) -> error::Result<()> {
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
let pub_scope: Arc<dyn InputScope> = match self.drain {
|
||||
Some(ref out) => out.input_dyn(),
|
||||
None => read_lock!(DEFAULT_AGGREGATE_INPUT).input_dyn(),
|
||||
|
@ -99,7 +98,7 @@ impl InnerAtomicBucket {
|
|||
/// Take a snapshot of aggregated values and reset them.
|
||||
/// Compute stats on captured values using assigned or default stats function.
|
||||
/// Write stats to assigned or default output.
|
||||
fn flush_to(&mut self, target: &dyn InputScope) -> error::Result<()> {
|
||||
fn flush_to(&mut self, target: &dyn InputScope) -> io::Result<()> {
|
||||
let now = TimeHandle::now();
|
||||
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
|
||||
self.period_start = now;
|
||||
|
@ -244,7 +243,7 @@ impl AtomicBucket {
|
|||
}
|
||||
|
||||
/// Immediately flush the stats's metrics to the specified scope and stats.
|
||||
pub fn flush_to(&self, publish_scope: &dyn InputScope) -> error::Result<()> {
|
||||
pub fn flush_to(&self, publish_scope: &dyn InputScope) -> io::Result<()> {
|
||||
let mut inner = write_lock!(self.inner);
|
||||
inner.flush_to(publish_scope)
|
||||
}
|
||||
|
@ -267,7 +266,7 @@ impl InputScope for AtomicBucket {
|
|||
impl Flush for AtomicBucket {
|
||||
/// Collect and reset aggregated data.
|
||||
/// Publish statistics
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let mut inner = write_lock!(self.inner);
|
||||
inner.flush()
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
//! Metric input scope caching.
|
||||
|
||||
use crate::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::error;
|
||||
use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use crate::lru_cache as lru;
|
||||
use crate::name::MetricName;
|
||||
|
@ -14,6 +13,7 @@ use std::sync::RwLock;
|
|||
|
||||
#[cfg(feature = "parking_lot")]
|
||||
use parking_lot::RwLock;
|
||||
use std::io;
|
||||
|
||||
/// Wrap an input with a metric definition cache.
|
||||
/// This can provide performance benefits for metrics that are dynamically defined at runtime on each access.
|
||||
|
@ -101,7 +101,7 @@ impl InputScope for InputScopeCache {
|
|||
}
|
||||
|
||||
impl Flush for InputScopeCache {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
self.target.flush()
|
||||
}
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
use std::error;
|
||||
use std::result;
|
||||
|
||||
/// Just put any error in a box.
|
||||
pub type Result<T> = result::Result<T, Box<dyn error::Error + Send + Sync>>;
|
|
@ -53,7 +53,6 @@ macro_rules! read_lock {
|
|||
|
||||
mod attributes;
|
||||
mod clock;
|
||||
mod error;
|
||||
mod input;
|
||||
mod label;
|
||||
mod metrics;
|
||||
|
@ -61,7 +60,6 @@ mod name;
|
|||
mod pcg32;
|
||||
mod proxy;
|
||||
mod scheduler;
|
||||
mod void;
|
||||
|
||||
mod atomic;
|
||||
mod stats;
|
||||
|
@ -76,14 +74,13 @@ pub use crate::attributes::{
|
|||
Buffered, Buffering, Observe, ObserveWhen, OnFlush, OnFlushCancel, Prefixed, Sampled, Sampling,
|
||||
};
|
||||
pub use crate::clock::TimeHandle;
|
||||
pub use crate::error::Result;
|
||||
pub use crate::input::{
|
||||
Counter, Gauge, Input, InputDyn, InputKind, InputMetric, InputScope, Level, Marker, Timer,
|
||||
};
|
||||
pub use crate::label::{AppLabel, Labels, ThreadLabel};
|
||||
pub use crate::name::{MetricName, NameParts};
|
||||
pub use crate::output::void::Void;
|
||||
pub use crate::scheduler::{Cancel, CancelGuard, CancelHandle, ScheduleFlush};
|
||||
pub use crate::void::Void;
|
||||
|
||||
#[cfg(test)]
|
||||
pub use crate::clock::{mock_clock_advance, mock_clock_reset};
|
||||
|
@ -109,13 +106,15 @@ pub use crate::multi::{MultiInput, MultiInputScope};
|
|||
pub use crate::queue::{InputQueue, InputQueueScope, QueuedInput};
|
||||
pub use crate::stats::{stats_all, stats_average, stats_summary, ScoreType};
|
||||
|
||||
use std::io;
|
||||
|
||||
/// Base type for recorded metric values.
|
||||
pub type MetricValue = isize;
|
||||
|
||||
/// Both InputScope and OutputScope share the ability to flush the recorded data.
|
||||
pub trait Flush {
|
||||
/// Flush does nothing by default.
|
||||
fn flush(&self) -> Result<()>;
|
||||
fn flush(&self) -> io::Result<()>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "bench")]
|
||||
|
|
|
@ -23,8 +23,7 @@ macro_rules! time {
|
|||
/// ## Example
|
||||
///
|
||||
/// ```
|
||||
/// #[macro_use] extern crate dipstick;
|
||||
///
|
||||
/// #[macro_use] ///
|
||||
/// use dipstick::*;
|
||||
///
|
||||
/// # fn main() {
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
//! Dispatch metrics to multiple sinks.
|
||||
|
||||
use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::error;
|
||||
use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use crate::name::MetricName;
|
||||
use crate::Flush;
|
||||
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Opens multiple scopes at a time from just as many outputs.
|
||||
|
@ -102,7 +102,7 @@ impl InputScope for MultiInputScope {
|
|||
}
|
||||
|
||||
impl Flush for MultiInputScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
for w in &self.scopes {
|
||||
w.flush()?;
|
||||
|
|
|
@ -3,7 +3,8 @@ use crate::input::InputKind;
|
|||
use crate::name::MetricName;
|
||||
use crate::MetricValue;
|
||||
|
||||
use std::io::{Error, Write};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Print commands are steps in the execution of output templates.
|
||||
|
@ -48,12 +49,7 @@ impl LineTemplate {
|
|||
}
|
||||
|
||||
/// Template execution applies commands in turn, writing to the output.
|
||||
pub fn print<L>(
|
||||
&self,
|
||||
output: &mut dyn Write,
|
||||
value: MetricValue,
|
||||
lookup: L,
|
||||
) -> Result<(), Error>
|
||||
pub fn print<L>(&self, output: &mut dyn Write, value: MetricValue, lookup: L) -> io::Result<()>
|
||||
where
|
||||
L: Fn(&str) -> Option<Arc<String>>,
|
||||
{
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::input::{Input, InputMetric, InputScope};
|
|||
use crate::metrics;
|
||||
use crate::name::MetricName;
|
||||
use crate::output::socket::RetrySocket;
|
||||
use crate::{error, CachedInput, QueuedInput};
|
||||
use crate::{CachedInput, QueuedInput};
|
||||
use crate::{Flush, MetricValue};
|
||||
|
||||
use std::net::ToSocketAddrs;
|
||||
|
@ -22,6 +22,7 @@ use std::sync::{RwLock, RwLockWriteGuard};
|
|||
|
||||
#[cfg(feature = "parking_lot")]
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
use std::io;
|
||||
|
||||
/// Graphite Input holds a socket to a graphite server.
|
||||
/// The socket is shared between scopes opened from the Input.
|
||||
|
@ -45,7 +46,7 @@ impl Input for Graphite {
|
|||
|
||||
impl Graphite {
|
||||
/// Send metrics to a graphite server at the address and port provided.
|
||||
pub fn send_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Graphite> {
|
||||
pub fn send_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> io::Result<Graphite> {
|
||||
debug!("Connecting to graphite {:?}", address);
|
||||
let socket = Arc::new(RwLock::new(RetrySocket::new(address)?));
|
||||
|
||||
|
@ -98,7 +99,7 @@ impl InputScope for GraphiteScope {
|
|||
}
|
||||
|
||||
impl Flush for GraphiteScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let buf = write_lock!(self.buffer);
|
||||
self.flush_inner(buf)
|
||||
|
@ -140,7 +141,7 @@ impl GraphiteScope {
|
|||
}
|
||||
}
|
||||
|
||||
fn flush_inner(&self, mut buf: RwLockWriteGuard<String>) -> error::Result<()> {
|
||||
fn flush_inner(&self, mut buf: RwLockWriteGuard<String>) -> io::Result<()> {
|
||||
if buf.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -156,7 +157,7 @@ impl GraphiteScope {
|
|||
Err(e) => {
|
||||
metrics::GRAPHITE_SEND_ERR.mark();
|
||||
debug!("Failed to send buffer to graphite: {}", e);
|
||||
Err(e.into())
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::input::{Input, InputKind, InputMetric, InputScope};
|
|||
use crate::name::MetricName;
|
||||
use crate::output::format::{Formatting, LineFormat, SimpleFormat};
|
||||
use crate::Flush;
|
||||
use crate::{error, CachedInput, QueuedInput};
|
||||
use crate::{CachedInput, QueuedInput};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -13,6 +13,7 @@ use std::sync::RwLock;
|
|||
#[cfg(feature = "parking_lot")]
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
/// Buffered metrics log output.
|
||||
|
@ -146,7 +147,7 @@ impl InputScope for LogScope {
|
|||
}
|
||||
|
||||
impl Flush for LogScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let mut entries = write_lock!(self.entries);
|
||||
if !entries.is_empty() {
|
||||
|
|
|
@ -4,8 +4,8 @@ use crate::name::MetricName;
|
|||
use crate::{Flush, MetricValue};
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::error::Error;
|
||||
|
||||
use std::io;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
/// A BTreeMap wrapper to receive metrics or stats values.
|
||||
|
@ -64,7 +64,7 @@ impl InputScope for StatsMapScope {
|
|||
}
|
||||
|
||||
impl Flush for StatsMapScope {
|
||||
fn flush(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
pub mod void;
|
||||
|
||||
pub mod format;
|
||||
|
||||
pub mod map;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
//! Send metrics to a Prometheus server.
|
||||
|
||||
use crate::attributes::{Attributes, Buffered, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::error;
|
||||
use crate::input::InputKind;
|
||||
use crate::input::{Input, InputMetric, InputScope};
|
||||
use crate::label::Labels;
|
||||
|
@ -17,6 +16,7 @@ use std::sync::{RwLock, RwLockWriteGuard};
|
|||
|
||||
#[cfg(feature = "parking_lot")]
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
use std::io;
|
||||
|
||||
/// Prometheus Input holds a socket to a Prometheus server.
|
||||
/// The socket is shared between scopes opened from the Input.
|
||||
|
@ -43,7 +43,7 @@ impl Prometheus {
|
|||
/// URL path must include group identifier labels `job`
|
||||
/// as shown in https://github.com/prometheus/pushgateway#command-line
|
||||
/// For example `http://pushgateway.example.org:9091/metrics/job/some_job`
|
||||
pub fn push_to(url: &str) -> error::Result<Prometheus> {
|
||||
pub fn push_to(url: &str) -> io::Result<Prometheus> {
|
||||
debug!("Pushing to Prometheus {:?}", url);
|
||||
|
||||
Ok(Prometheus {
|
||||
|
@ -95,7 +95,7 @@ impl InputScope for PrometheusScope {
|
|||
}
|
||||
|
||||
impl Flush for PrometheusScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let buf = write_lock!(self.buffer);
|
||||
self.flush_inner(buf)
|
||||
|
@ -154,7 +154,7 @@ impl PrometheusScope {
|
|||
}
|
||||
}
|
||||
|
||||
fn flush_inner(&self, mut buf: RwLockWriteGuard<String>) -> error::Result<()> {
|
||||
fn flush_inner(&self, mut buf: RwLockWriteGuard<String>) -> io::Result<()> {
|
||||
if buf.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ impl PrometheusScope {
|
|||
Err(e) => {
|
||||
metrics::PROMETHEUS_SEND_ERR.mark();
|
||||
debug!("Failed to send buffer to Prometheus: {}", e);
|
||||
Err(e.into())
|
||||
Err(io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,10 +5,10 @@ use crate::attributes::{
|
|||
};
|
||||
use crate::input::InputKind;
|
||||
use crate::input::{Input, InputMetric, InputScope};
|
||||
use crate::metrics;
|
||||
use crate::name::MetricName;
|
||||
use crate::pcg32;
|
||||
use crate::{error, CachedInput, LineFormat, LineTemplate, QueuedInput};
|
||||
use crate::{metrics, LineOp};
|
||||
use crate::{CachedInput, QueuedInput};
|
||||
use crate::{Flush, MetricValue};
|
||||
|
||||
use std::net::ToSocketAddrs;
|
||||
|
@ -20,6 +20,7 @@ use std::sync::{RwLock, RwLockWriteGuard};
|
|||
|
||||
#[cfg(feature = "parking_lot")]
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
use std::io;
|
||||
|
||||
/// Use a safe maximum size for UDP to prevent fragmentation.
|
||||
// TODO make configurable?
|
||||
|
@ -35,7 +36,7 @@ pub struct Statsd {
|
|||
|
||||
impl Statsd {
|
||||
/// Send metrics to a statsd server at the address and port provided.
|
||||
pub fn send_to<ADDR: ToSocketAddrs>(address: ADDR) -> error::Result<Statsd> {
|
||||
pub fn send_to<ADDR: ToSocketAddrs>(address: ADDR) -> io::Result<Statsd> {
|
||||
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0")?);
|
||||
socket.set_nonblocking(true)?;
|
||||
socket.connect(address)?;
|
||||
|
@ -136,7 +137,7 @@ impl InputScope for StatsdScope {
|
|||
}
|
||||
|
||||
impl Flush for StatsdScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let buf = write_lock!(self.buffer);
|
||||
self.flush_inner(buf)
|
||||
|
@ -177,7 +178,7 @@ impl StatsdScope {
|
|||
}
|
||||
}
|
||||
|
||||
fn flush_inner(&self, mut buffer: RwLockWriteGuard<String>) -> error::Result<()> {
|
||||
fn flush_inner(&self, mut buffer: RwLockWriteGuard<String>) -> io::Result<()> {
|
||||
if !buffer.is_empty() {
|
||||
match self.socket.send(buffer.as_bytes()) {
|
||||
Ok(size) => {
|
||||
|
@ -186,7 +187,7 @@ impl StatsdScope {
|
|||
}
|
||||
Err(e) => {
|
||||
metrics::STATSD_SEND_ERR.mark();
|
||||
return Err(e.into());
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
buffer.clear();
|
||||
|
@ -223,41 +224,41 @@ impl Drop for StatsdScope {
|
|||
}
|
||||
}
|
||||
|
||||
use crate::output::format::LineOp::{ScaledValueAsText, ValueAsText};
|
||||
|
||||
impl LineFormat for StatsdScope {
|
||||
fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate {
|
||||
let mut prefix = name.join(".");
|
||||
prefix.push(':');
|
||||
|
||||
let mut suffix = String::with_capacity(16);
|
||||
suffix.push('|');
|
||||
suffix.push_str(match kind {
|
||||
InputKind::Marker | InputKind::Counter => "c",
|
||||
InputKind::Gauge | InputKind::Level => "g",
|
||||
InputKind::Timer => "ms",
|
||||
});
|
||||
|
||||
// specify sampling rate if any
|
||||
if let Sampling::Random(float_rate) = self.get_sampling() {
|
||||
suffix.push_str(&format! {"|@{}\n", float_rate});
|
||||
}
|
||||
|
||||
// scale timer values
|
||||
let op_value_text = match kind {
|
||||
// timers are in µs, statsd wants ms
|
||||
InputKind::Timer => ScaledValueAsText(1000.0),
|
||||
_ => ValueAsText,
|
||||
};
|
||||
|
||||
LineTemplate::new(vec![
|
||||
LineOp::Literal(prefix.into_bytes()),
|
||||
op_value_text,
|
||||
LineOp::Literal(suffix.into_bytes()),
|
||||
LineOp::NewLine,
|
||||
])
|
||||
}
|
||||
}
|
||||
// use crate::output::format::LineOp::{ScaledValueAsText, ValueAsText};
|
||||
//
|
||||
// impl LineFormat for StatsdScope {
|
||||
// fn template(&self, name: &MetricName, kind: InputKind) -> LineTemplate {
|
||||
// let mut prefix = name.join(".");
|
||||
// prefix.push(':');
|
||||
//
|
||||
// let mut suffix = String::with_capacity(16);
|
||||
// suffix.push('|');
|
||||
// suffix.push_str(match kind {
|
||||
// InputKind::Marker | InputKind::Counter => "c",
|
||||
// InputKind::Gauge | InputKind::Level => "g",
|
||||
// InputKind::Timer => "ms",
|
||||
// });
|
||||
//
|
||||
// // specify sampling rate if any
|
||||
// if let Sampling::Random(float_rate) = self.get_sampling() {
|
||||
// suffix.push_str(&format! {"|@{}\n", float_rate});
|
||||
// }
|
||||
//
|
||||
// // scale timer values
|
||||
// let op_value_text = match kind {
|
||||
// // timers are in µs, statsd wants ms
|
||||
// InputKind::Timer => ScaledValueAsText(1000.0),
|
||||
// _ => ValueAsText,
|
||||
// };
|
||||
//
|
||||
// LineTemplate::new(vec![
|
||||
// LineOp::Literal(prefix.into_bytes()),
|
||||
// op_value_text,
|
||||
// LineOp::Literal(suffix.into_bytes()),
|
||||
// LineOp::NewLine,
|
||||
// ])
|
||||
// }
|
||||
// }
|
||||
|
||||
#[cfg(feature = "bench")]
|
||||
mod bench {
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::attributes::{Attributes, Buffered, MetricId, OnFlush, Prefixed, WithA
|
|||
use crate::input::InputKind;
|
||||
use crate::name::MetricName;
|
||||
use crate::Flush;
|
||||
use crate::{error, CachedInput, QueuedInput};
|
||||
use crate::{CachedInput, QueuedInput};
|
||||
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{self, Write};
|
||||
|
@ -55,12 +55,12 @@ impl Stream<File> {
|
|||
/// Write metric values to a file.
|
||||
#[deprecated(since = "0.8.0", note = "Use write_to_file()")]
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
pub fn to_file<P: AsRef<Path>>(file: P) -> error::Result<Stream<File>> {
|
||||
pub fn to_file<P: AsRef<Path>>(file: P) -> io::Result<Stream<File>> {
|
||||
Self::write_to_file(file)
|
||||
}
|
||||
|
||||
/// Write metric values to a file.
|
||||
pub fn write_to_file<P: AsRef<Path>>(file: P) -> error::Result<Stream<File>> {
|
||||
pub fn write_to_file<P: AsRef<Path>>(file: P) -> io::Result<Stream<File>> {
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
|
@ -75,7 +75,7 @@ impl Stream<File> {
|
|||
/// existing file, if false, the attempt will result in an error.
|
||||
#[deprecated(since = "0.8.0", note = "Use write_to_new_file()")]
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
pub fn to_new_file<P: AsRef<Path>>(file: P, clobber: bool) -> error::Result<Stream<File>> {
|
||||
pub fn to_new_file<P: AsRef<Path>>(file: P, clobber: bool) -> io::Result<Stream<File>> {
|
||||
Self::write_to_new_file(file, clobber)
|
||||
}
|
||||
|
||||
|
@ -83,10 +83,7 @@ impl Stream<File> {
|
|||
///
|
||||
/// Creates a new file to dump data into. If `clobber` is set to true, it allows overwriting
|
||||
/// existing file, if false, the attempt will result in an error.
|
||||
pub fn write_to_new_file<P: AsRef<Path>>(
|
||||
file: P,
|
||||
clobber: bool,
|
||||
) -> error::Result<Stream<File>> {
|
||||
pub fn write_to_new_file<P: AsRef<Path>>(file: P, clobber: bool) -> io::Result<Stream<File>> {
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
|
@ -223,7 +220,7 @@ impl<W: Write + Send + Sync + 'static> InputScope for TextScope<W> {
|
|||
}
|
||||
|
||||
impl<W: Write + Send + Sync + 'static> Flush for TextScope<W> {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
let mut entries = write_lock!(self.entries);
|
||||
if !entries.is_empty() {
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::Flush;
|
|||
|
||||
use crate::attributes::MetricId;
|
||||
use crate::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use std::error::Error;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
lazy_static! {
|
||||
|
@ -50,7 +50,7 @@ impl InputScope for VoidInput {
|
|||
}
|
||||
|
||||
impl Flush for VoidInput {
|
||||
fn flush(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,15 +1,14 @@
|
|||
//! Decouple metric definition from configuration with trait objects.
|
||||
|
||||
use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::error;
|
||||
use crate::input::{InputKind, InputMetric, InputScope};
|
||||
use crate::name::{MetricName, NameParts};
|
||||
use crate::void::VOID_INPUT;
|
||||
use crate::output::void::VOID_INPUT;
|
||||
use crate::Flush;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{fmt, io};
|
||||
|
||||
#[cfg(not(feature = "parking_lot"))]
|
||||
use std::sync::RwLock;
|
||||
|
@ -167,7 +166,7 @@ impl InnerProxy {
|
|||
}
|
||||
}
|
||||
|
||||
fn flush(&self, namespace: &NameParts) -> error::Result<()> {
|
||||
fn flush(&self, namespace: &NameParts) -> io::Result<()> {
|
||||
if let Some((target, _nslen)) = self.get_effective_target(namespace) {
|
||||
target.flush()
|
||||
} else {
|
||||
|
@ -265,7 +264,7 @@ impl InputScope for Proxy {
|
|||
}
|
||||
|
||||
impl Flush for Proxy {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
write_lock!(self.inner).flush(self.get_prefixes())
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
//! If queue size is exceeded, calling code reverts to blocking.
|
||||
|
||||
use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
|
||||
use crate::error;
|
||||
use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
|
||||
use crate::label::Labels;
|
||||
use crate::metrics;
|
||||
|
@ -14,7 +13,7 @@ use crate::{Flush, MetricValue};
|
|||
#[cfg(not(feature = "crossbeam-channel"))]
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::{io, thread};
|
||||
|
||||
#[cfg(feature = "crossbeam-channel")]
|
||||
use crossbeam_channel as crossbeam;
|
||||
|
@ -199,12 +198,12 @@ impl InputScope for InputQueueScope {
|
|||
}
|
||||
|
||||
impl Flush for InputQueueScope {
|
||||
fn flush(&self) -> error::Result<()> {
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
self.notify_flush_listeners();
|
||||
if let Err(e) = self.sender.send(InputQueueCmd::Flush(self.target.clone())) {
|
||||
metrics::SEND_FAILED.mark();
|
||||
debug!("Failed to flush async metrics: {}", e);
|
||||
Err(e.into())
|
||||
Err(io::Error::new(io::ErrorKind::Other, e))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue