Use prometheus-rust crate rather than reinventing it

This commit is contained in:
Francis Lalonde 2018-11-13 09:09:09 -05:00
parent 0b66ffdbfd
commit ce3273bafa
11 changed files with 194 additions and 2627 deletions

View File

@ -27,23 +27,20 @@ num = { version = "0.2", default-features = false }
# FIXME required only for random seed for sampling
time = "0.1"
# optional dep for prometheus binary format
protobuf = { version = "2", optional = true }
prometheus = { version = "0.4" }
# optional dep for standalone http pull metrics
tiny_http = { version = "0.6", optional = true }
[build-dependencies]
skeptic = { version = "0.13", optional = true }
protoc-rust = { version = "2", optional = true }
[features]
default-features = [ "prometheus", "self_metrics", "proto", "tiny_http" ]
default-features = [ "self_metrics", "tiny_http" ]
bench = []
self_metrics = []
proto = [ "protoc-rust", "protobuf" ]
prometheus = []
tokio = []
#prometheus = []
[package.metadata.release]
#sign-commit = true

View File

@ -46,7 +46,7 @@ Usable either through the time! macro, the closure form or explicit calls to sta
While timers internal precision are in nanoseconds, their accuracy depends on platform OS and hardware.
Timer's default output format is milliseconds but is scalable up or down.
```rust,skt-run
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
@ -74,7 +74,7 @@ Aggregated statistics may also append identifiers to the metric's name.
Names should exclude characters that can interfere with namespaces, separator and output protocols.
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.
```rust,skt-run
```$rust,skt-run
let app_metrics = metric_scope(to_stdout());
let db_metrics = app_metrics.add_prefix("database");
let _db_timer = db_metrics.timer("db_timer");
@ -102,7 +102,7 @@ Notes about labels:
Metric inputs are usually setup statically upon application startup.
```rust,skt-plain
```$rust,skt-run
#[macro_use]
extern crate dipstick;
@ -125,7 +125,7 @@ The static metric definition macro is just `lazy_static!` wrapper.
If necessary, metrics can also be defined "dynamically", with a possibly new name for every value.
This is more flexible but has a higher runtime cost, which may be alleviated with caching.
```rust,skt-run
```$rust,skt-run
let user_name = "john_day";
let app_metrics = to_log().with_cache(512);
app_metrics.gauge(format!("gauge_for_user_{}", user_name)).value(44);
@ -180,7 +180,7 @@ If enabled, buffering is usually a best-effort affair, to safely limit the amoun
Some outputs such as statsd also have the ability to sample metrics.
If enabled, sampling is done using pcg32, a fast random algorithm with reasonable entropy.
```rust,skt-fail
```$rust,skt-fail
let _app_metrics = to_statsd("server:8125")?.with_sampling_rate(0.01);
```
@ -196,6 +196,7 @@ which might happen after the static initialization phase in which metrics are de
To get around this catch-22, Dipstick provides a Proxy which acts as intermediate output,
allowing redirection to the effective output after it has been set up.
### Bucket
Another intermediate output is the Bucket, which can be used to aggregate metric values.
@ -204,15 +205,17 @@ Bucket-aggregated values can be used to infer statistics which will be flushed o
Bucket aggregation is performed locklessly and is very fast.
Count, Sum, Min, Max and Mean are tracked where they make sense, depending on the metric type.
#### Preset bucket statistics
Published statistics can be selected with presets such as `all_stats` (see previous example),
`summary`, `average`.
#### Custom bucket statistics
For more control over published statistics, provide your own strategy:
```rust,skt-run
```$rust,skt-run
metrics(aggregate());
set_default_aggregate_fn(|_kind, name, score|
match score {
@ -222,11 +225,12 @@ set_default_aggregate_fn(|_kind, name, score|
});
```
#### Scheduled publication
Aggregate metrics and schedule to be periodical publication in the background:
```rust,skt-run
```$rust,skt-run
use std::time::Duration;
let app_metrics = metric_scope(aggregate());
@ -240,7 +244,7 @@ app_metrics.flush_every(Duration::from_secs(3));
Like Constructicons, multiple metrics outputs can assemble, creating a unified facade that transparently dispatches
input metrics to each constituent output.
```rust,skt-fail,no_run
```$rust,skt-fail,no_run
let _app_metrics = metric_scope((
to_stdout(),
to_statsd("localhost:8125")?.with_namespace(&["my", "app"])
@ -250,9 +254,11 @@ let _app_metrics = metric_scope((
### Queue
Metrics can be recorded asynchronously:
```rust,skt-run
```$rust,skt-run
let _app_metrics = metric_scope(to_stdout().queue(64));
```
The async queue uses a Rust channel and a standalone thread.
If the queue ever fills up under heavy load, the behavior reverts to blocking (rather than dropping metrics).

View File

@ -33,7 +33,7 @@ For convenience, dipstick builds on stable Rust with minimal, feature-gated depe
### Non-goals
Dipstick's focus is on metrics collection (input) and forwarding (output).
Although it will happily track aggregated statistics, for the sake of simplicity and performance Dipstick will not
Although it will happily aggregate base statistics, for the sake of simplicity and performance Dipstick will not
- plot graphs
- send alerts
- track histograms
@ -45,7 +45,7 @@ These are all best done by downstream timeseries visualization and monitoring to
Here's a basic aggregating & auto-publish counter metric:
```$rust,skt-run
let bucket = Bucket::new();
let bucket = AtomicBucket::new();
bucket.set_target(Stream::stdout());
bucket.flush_every(Duration::from_secs(3));
let counter = bucket.counter("counter_a");
@ -67,7 +67,7 @@ fn main() {
```
For sample applications see the [examples](https://github.com/fralalonde/dipstick/tree/master/examples).
For documentation see the [handbook](https://github.com/fralalonde/dipstick/tree/master/handbook).
For documentation see the [handbook](https://github.com/fralalonde/dipstick/tree/master/HANDBOOK.md).
To use Dipstick in your project, add the following line to your `Cargo.toml`
in the `[dependencies]` section:

View File

@ -1,29 +1,11 @@
#[cfg(feature="skeptic")]
extern crate skeptic;
#[cfg(feature="proto")]
extern crate protoc_rust;
#[cfg(feature="proto")]
use protoc_rust as protoc;
fn main() {
// generates handbook tests for `README.md`.
#[cfg(feature="skeptic")]
skeptic::generate_doc_tests(&["README.md", "HANDBOOK.md"]);
#[cfg(feature="proto, prometheus")]
protoc::run(protoc::Args {
out_dir: "src",
// "prometheus_proto.rs" is excluded from git
// FIXME generate to target/gen_src instead
input: &["schema/prometheus_proto.proto"],
includes: &[".", "schema"],
customize: protoc::Customize {
..Default::default()
},
}).expect("protoc");
}

View File

@ -1,6 +1,5 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
//! A sample application sending ad-hoc metrics to graphite.
//extern crate badlog;
extern crate dipstick;
use dipstick::*;

20
examples/prometheus.rs Normal file
View File

@ -0,0 +1,20 @@
//! A sample application sending ad-hoc metrics to prometheus.
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {
let metrics =
Prometheus::send_json_to("localhost:2003")
.expect("Prometheus Socket")
.add_prefix("my_app")
.input();
loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);
std::thread::sleep(Duration::from_millis(40));
}
}

View File

@ -7,6 +7,7 @@ use std::result;
use std::sync::mpsc;
use queue::queue_in;
use queue::queue_out;
use prometheus;
use self::Error::*;
@ -16,17 +17,20 @@ pub enum Error {
/// A generic I/O error.
IO(io::Error),
/// An error from the async metric queue.
Async(mpsc::SendError<queue_in::InputQueueCmd>),
InputQueue(mpsc::SendError<queue_in::InputQueueCmd>),
/// An error from the async metric queue.
RawAsync(mpsc::SendError<queue_out::OutputQueueCmd>)
OutputQueue(mpsc::SendError<queue_out::OutputQueueCmd>),
/// An error from the async metric queue.
Prometheus(prometheus::Error),
}
impl Display for Error {
fn fmt(&self, formatter: &mut Formatter) -> result::Result<(), fmt::Error> {
match *self {
IO(ref err) => err.fmt(formatter),
Async(ref err) => err.fmt(formatter),
RawAsync(ref err) => err.fmt(formatter),
InputQueue(ref err) => err.fmt(formatter),
OutputQueue(ref err) => err.fmt(formatter),
Prometheus(ref err) => err.fmt(formatter),
}
}
}
@ -35,16 +39,18 @@ impl error::Error for Error {
fn description(&self) -> &str {
match *self {
IO(ref err) => err.description(),
Async(ref err) => err.description(),
RawAsync(ref err) => err.description(),
InputQueue(ref err) => err.description(),
OutputQueue(ref err) => err.description(),
Prometheus(ref err) => err.description(),
}
}
fn cause(&self) -> Option<&error::Error> {
match *self {
IO(ref err) => Some(err),
Async(ref err) => Some(err),
RawAsync(ref err) => Some(err),
InputQueue(ref err) => Some(err),
OutputQueue(ref err) => Some(err),
Prometheus(ref err) => Some(err),
}
}
}
@ -60,12 +66,18 @@ impl From<io::Error> for Error {
impl From<mpsc::SendError<queue_in::InputQueueCmd>> for Error {
fn from(err: mpsc::SendError<queue_in::InputQueueCmd>) -> Self {
Async(err)
InputQueue(err)
}
}
impl From<mpsc::SendError<queue_out::OutputQueueCmd>> for Error {
fn from(err: mpsc::SendError<queue_out::OutputQueueCmd>) -> Self {
RawAsync(err)
OutputQueue(err)
}
}
impl From<prometheus::Error> for Error {
fn from(err: prometheus::Error) -> Self {
Prometheus(err)
}
}

View File

@ -16,12 +16,12 @@ extern crate lazy_static;
extern crate atomic_refcell;
extern crate num;
#[cfg(feature="protobuf")]
extern crate protobuf;
// FIXME required only for pcg32 seed (for sampling)
extern crate time;
//#[cfg(feature="prometheus")]
extern crate prometheus;
#[macro_use]
mod macros;
pub use macros::*;
@ -51,6 +51,9 @@ pub use output::statsd::{Statsd, StatsdScope, StatsdMetric};
pub use output::map::{StatsMap};
pub use output::log::{Log, LogScope};
//#[cfg(feature="prometheus")]
pub use output::prometheus::{Prometheus, PrometheusScope};
mod bucket;
pub use bucket::{ScoreType, stats_all, stats_average, stats_summary};
pub use bucket::atomic::{AtomicBucket};

View File

@ -12,9 +12,5 @@ pub mod graphite;
pub mod statsd;
#[cfg(feature="prometheus")]
//#[cfg(feature="prometheus")]
pub mod prometheus;
#[cfg(feature="prometheus, proto")]
pub mod prometheus_proto;

View File

@ -1,63 +1,70 @@
//! Prometheus-related functionality.
//! Both push and pull are supported.
//! Both protobuf and text format are supported.
//! - Send metrics to a prometheus aggregator agent.
//! - Serve metrics with basic HTTP server
//! - Print metrics to a buffer provided by an HTTP framework.
use core::{Flush, MetricValue};
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::{Flush};
use core::input::{InputKind};
use core::attributes::{Attributes, WithAttributes, Buffered, Buffering, Prefixed};
use core::name::MetricName;
use core::output::{Output, OutputMetric, OutputScope};
use core::error;
use output::socket::RetrySocket;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use std::io::Write;
#[cfg(feature="proto")]
use prometheus_proto as proto;
use prometheus::{Opts, Registry, IntGauge, IntCounter, Encoder, ProtobufEncoder, TextEncoder,};
metrics!{
}
#[derive(Clone, Debug)]
enum PrometheusEncoding {
JSON,
PROTOBUF,
}
/// Prometheus push shared client
/// Holds a shared socket to a Prometheus host.
#[derive(Clone, Debug)]
pub struct Prometheus {
attributes: Attributes,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
}
impl Prometheus {
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_json_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::JSON,
})
}
/// Send metrics to a prometheus server at the address and port provided.
pub fn send_protobuf_to<A: ToSocketAddrs + Debug + Clone>(address: A) -> error::Result<Prometheus> {
Ok(Prometheus {
attributes: Attributes::default(),
socket: Arc::new(RwLock::new(RetrySocket::new(address.clone())?)),
encoding: PrometheusEncoding::PROTOBUF,
})
}
}
impl Output for Prometheus {
type SCOPE = PrometheusScope;
fn output(&self) -> Arc<Input + Send + Sync + 'static> {
PrometheusScope {}
}
}
/// Prometheus push client scope
pub struct PrometheusScope {
}
impl PrometheusScope {
/// Send metrics to a prometheus server at the address and port provided.
pub fn output<ADDR: ToSocketAddrs>(_address: ADDR) -> error::Result<Prometheus> {
Ok(Prometheus {})
}
}
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric {
let mut _prefix = self.prefix_prepend(name).join(".");
OutputMetric::new(|_value, _labels| {})
}
}
impl Flush for PrometheusScope {
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
fn output(&self) -> Self::SCOPE {
PrometheusScope {
attributes: self.attributes.clone(),
registry: Registry::new(),
socket: self.socket.clone(),
encoding: self.encoding.clone(),
}
}
}
@ -65,3 +72,81 @@ impl WithAttributes for Prometheus {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
/// Prometheus push client scope
#[derive(Clone)]
pub struct PrometheusScope {
attributes: Attributes,
registry: Registry,
socket: Arc<RwLock<RetrySocket>>,
encoding: PrometheusEncoding,
}
impl OutputScope for PrometheusScope {
/// Define a metric of the specified type.
fn new_metric(&self, name: MetricName, kind: InputKind) -> OutputMetric {
let name = self.prefix_prepend(name).join(".");
match kind {
InputKind::Counter => {
let opts = Opts::new(name, "".to_string());
let counter = IntCounter::with_opts(opts).expect("Prometheus Counter");
self.registry.register(Box::new(counter.clone())).expect("Registered Prometheus Counter");
OutputMetric::new(move |value, _labels|
counter.inc_by(value as i64)
)
},
InputKind::Marker => {
let opts = Opts::new(name, "".to_string());
let marker = IntCounter::with_opts(opts).expect("Prometheus Counter");
self.registry.register(Box::new(marker.clone())).expect("Registered Prometheus Marker");
OutputMetric::new(move |_value, _labels|
marker.inc()
)
},
InputKind::Timer => {
let opts = Opts::new(name, "".to_string());
let timer = IntCounter::with_opts(opts).expect("Prometheus Histogram");
self.registry.register(Box::new(timer.clone())).expect("Registered Prometheus Timer");
OutputMetric::new(move |value, _labels|
timer.inc_by(value as i64)
)
},
InputKind::Gauge => {
let opts = Opts::new(name, "".to_string());
let gauge = IntGauge::with_opts(opts).expect("Prometheus Gauge");
self.registry.register(Box::new(gauge.clone())).expect("Registered Prometheus Gauge");;
OutputMetric::new(move |value, _labels|
gauge.add(value as i64)
)
},
}
}
}
impl Flush for PrometheusScope {
fn flush(&self) -> error::Result<()> {
let metric_families = self.registry.gather();
let mut buffer = vec![];
match self.encoding {
PrometheusEncoding::JSON => {
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
PrometheusEncoding::PROTOBUF => {
let encoder = ProtobufEncoder::new();
encoder.encode(&metric_families, &mut buffer)?
},
}
let mut socket = self.socket.write().expect("Lock Prometheus Socket");
Ok(socket.write_all(&mut buffer)?)
}
}
impl WithAttributes for PrometheusScope {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}

File diff suppressed because it is too large Load Diff