Adding protobuf prometheus

This commit is contained in:
Francis Lalonde 2018-06-24 00:31:34 -04:00
parent 6cae3b5d4c
commit d1f2b02761
19 changed files with 241 additions and 70 deletions

1
.gitignore vendored Normal file → Executable file
View File

@ -1,3 +1,4 @@
target/
**/*.rs.bk
Cargo.lock
src/prometheus_proto.rs

View File

@ -26,14 +26,17 @@ num = { version = "0.1", default-features = false }
# FIXME required only for random seed for sampling
time = "0.1"
protobuf = { version = "2", features = ["with-bytes"], optional = true }
[build-dependencies]
skeptic = { version = "0.13", optional = true }
protoc-rust = { version = "2", optional = true }
[features]
default-features = []
default-features = [ "prometheus", "self_metrics" ]
bench = []
self_metrics = []
prometheus = [ "protobuf", "protoc-rust" ]
[package.metadata.release]
#sign-commit = true

21
build.rs Normal file → Executable file
View File

@ -1,8 +1,29 @@
#[cfg(feature="skeptic")]
extern crate skeptic;
#[cfg(feature="protoc-rust")]
extern crate protoc_rust;
#[cfg(feature="protoc-rust")]
use protoc_rust as protoc;
fn main() {
// generates doc tests for `README.md`.
#[cfg(feature="skeptic")]
skeptic::generate_doc_tests(&["README.md"]);
#[cfg(feature="protobuf")]
protoc::run(protoc::Args {
out_dir: "src",
input: &["schema/prometheus_proto.proto"],
includes: &[".", "schema"],
customize: protoc::Customize {
..Default::default()
},
}).expect("protoc");
println!("cargo:rustc-env=RUST_GEN_SRC=../target/gen_src")
}

81
schema/prometheus_proto.proto Executable file
View File

@ -0,0 +1,81 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package io.prometheus.client;
option java_package = "io.prometheus.client";
message LabelPair {
optional string name = 1;
optional string value = 2;
}
enum MetricType {
COUNTER = 0;
GAUGE = 1;
SUMMARY = 2;
UNTYPED = 3;
HISTOGRAM = 4;
}
message Gauge {
optional double value = 1;
}
message Counter {
optional double value = 1;
}
message Quantile {
optional double quantile = 1;
optional double value = 2;
}
message Summary {
optional uint64 sample_count = 1;
optional double sample_sum = 2;
repeated Quantile quantile = 3;
}
message Untyped {
optional double value = 1;
}
message Histogram {
optional uint64 sample_count = 1;
optional double sample_sum = 2;
repeated Bucket bucket = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
}
message Bucket {
optional uint64 cumulative_count = 1; // Cumulative in increasing order.
optional double upper_bound = 2; // Inclusive.
}
message Metric {
repeated LabelPair label = 1;
optional Gauge gauge = 2;
optional Counter counter = 3;
optional Summary summary = 4;
optional Untyped untyped = 5;
optional Histogram histogram = 7;
optional int64 timestamp_ms = 6;
}
message MetricFamily {
optional string name = 1;
optional string help = 2;
optional MetricType type = 3;
repeated Metric metric = 4;
}

View File

@ -1,7 +1,7 @@
//! Maintain aggregated metrics for deferred reporting,
//!
use core::{Kind, Value, Name, WithName, output_none, Input, Metric, WithAttributes, Attributes,
RawInput, RawMetric, RawOutputDyn};
RawInput, RawMetric, RawOutputDyn, Flush};
use clock::TimeHandle;
use core::Kind::*;
use error;
@ -89,7 +89,7 @@ impl InnerBucket {
.for_each(|k| {purged.remove(k);});
self.metrics = purged;
pub_scope.flush_raw()
pub_scope.flush()
}
@ -220,7 +220,9 @@ impl Input for Bucket {
.clone();
Metric::new(move |value| scoreb.update(value))
}
}
impl Flush for Bucket {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) -> error::Result<()> {

View File

@ -68,6 +68,9 @@ impl Input for CacheInput {
new_metric
})
}
}
impl Flush for CacheInput {
fn flush(&self) -> error::Result<()> {
self.target.flush()

View File

@ -304,15 +304,10 @@ impl<T: Output + Send + Sync + 'static> OutputDyn for T {
}
/// Define metrics, write values and flush them.
pub trait Input {
pub trait Input: Flush {
/// Define a metric of the specified type.
fn new_metric(&self, name: Name, kind: Kind) -> Metric;
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
}
/// Define a counter.
fn counter(&self, name: &str) -> Counter {
self.new_metric(name.into(), Kind::Counter).into()
@ -335,6 +330,15 @@ pub trait Input {
}
pub trait Flush {
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
}
}
/// A metric is actually a function that knows to write a metric value to a metric output.
#[derive(Clone)]
pub struct Metric {
@ -415,8 +419,11 @@ impl Input for LockingInputBox {
} )
}
}
impl Flush for LockingInputBox {
fn flush(&self) -> error::Result<()> {
self.inner.lock().expect("RawInput Lock").flush_raw()
self.inner.lock().expect("RawInput Lock").flush()
}
}
@ -433,14 +440,11 @@ impl<T: RawOutput + Send + Sync + 'static> Output for T {
}
/// Define metrics, write values and flush them.
pub trait RawInput {
/// Define a metric of the specified type.
pub trait RawInput: Flush {
/// Define a raw metric of the specified type.
fn new_metric_raw(&self, name: Name, kind: Kind) -> RawMetric;
/// Flush does nothing by default.
fn flush_raw(&self) -> error::Result<()> {
Ok(())
}
}
/// Blanket impl that provides RawOutputs their dynamic flavor.
@ -634,6 +638,9 @@ impl RawInput for VoidOutput {
}
}
impl Flush for VoidOutput {
}
/// Discard all metric values sent to it.
pub fn output_none() -> VoidOutput {
VoidOutput {}

View File

@ -91,7 +91,7 @@ impl RawInput for Graphite {
})
} else {
RawMetric::new(move |value| {
if let Err(err) = cloned.buf_write(&metric, value).and_then(|_| cloned.flush_raw()) {
if let Err(err) = cloned.buf_write(&metric, value).and_then(|_| cloned.flush()) {
debug!("Graphite buffer write failed: {}", err);
metrics::GRAPHITE_SEND_ERR.mark();
}
@ -99,8 +99,11 @@ impl RawInput for Graphite {
})
}
}
}
fn flush_raw(&self) -> error::Result<()> {
impl Flush for Graphite {
fn flush(&self) -> error::Result<()> {
let buf = self.buffer.borrow_mut();
self.flush_inner(buf)
}
@ -176,7 +179,7 @@ pub struct GraphiteMetric {
/// Any remaining buffered data is flushed on Drop.
impl Drop for Graphite {
fn drop(&mut self) {
if let Err(err) = self.flush_raw() {
if let Err(err) = self.flush() {
warn!("Could not flush graphite metrics upon Drop: {}", err)
}
}

View File

@ -16,6 +16,9 @@ extern crate lazy_static;
extern crate atomic_refcell;
extern crate num;
#[cfg(feature="protobuf")]
extern crate protobuf;
// FIXME required only for random seed for sampling
extern crate time;
@ -39,10 +42,10 @@ mod bucket;
pub use bucket::{Bucket, input_bucket, stats_summary, stats_all, stats_average};
mod text;
pub use text::{output_stdout, output_stderr, TextOutput, TextInput};
pub use text::{output_stdout, output_stderr, TextOutput, Text};
mod logging;
pub use logging::{LogOutput, LogInput, output_log};
pub use logging::{LogOutput, Log, output_log};
mod pcg32;
@ -55,8 +58,12 @@ pub use statsd::{StatsdOutput, Statsd, output_statsd};
mod graphite;
pub use graphite::{GraphiteOutput, Graphite, output_graphite};
//mod prometheus;
//pub use prometheus::{Prometheus, to_prometheus};
#[cfg(feature="prometheus")]
mod prometheus;
#[cfg(feature="prometheus")]
mod prometheus_proto;
#[cfg(feature="prometheus")]
pub use prometheus::{Prometheus, PrometheusOutput};
mod map;
pub use map::{StatsMap, output_map};

View File

@ -1,4 +1,5 @@
use core::{Name, WithName, Value, Metric, Kind, Output, Input, WithAttributes, Attributes, WithBuffering};
use core::{Name, WithName, Value, Metric, Kind, Output, Input, WithAttributes, Attributes,
WithBuffering, Flush};
use error;
use std::sync::{RwLock, Arc};
use text;
@ -25,10 +26,10 @@ pub struct LogOutput {
}
impl Output for LogOutput {
type INPUT = LogInput;
type INPUT = Log;
fn new_input(&self) -> Self::INPUT {
LogInput {
Log {
attributes: self.attributes.clone(),
entries: Arc::new(RwLock::new(Vec::new())),
output: self.clone(),
@ -45,20 +46,20 @@ impl WithBuffering for LogOutput {}
/// The scope-local input for buffered log metrics output.
#[derive(Clone)]
pub struct LogInput {
pub struct Log {
attributes: Attributes,
entries: Arc<RwLock<Vec<Vec<u8>>>>,
output: LogOutput,
}
impl WithAttributes for LogInput {
impl WithAttributes for Log {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl WithBuffering for LogInput {}
impl WithBuffering for Log {}
impl Input for LogInput {
impl Input for Log {
fn new_metric(&self, name: Name, kind: Kind) -> Metric {
let name = self.qualified_name(name);
let template = (self.output.format_fn)(&name, kind);
@ -87,6 +88,9 @@ impl Input for LogInput {
})
}
}
}
impl Flush for Log {
fn flush(&self) -> error::Result<()> {
let mut entries = self.entries.write().expect("Metrics TextBuffer");
@ -101,7 +105,7 @@ impl Input for LogInput {
}
}
impl Drop for LogInput {
impl Drop for Log {
fn drop(&mut self) {
if let Err(e) = self.flush() {
warn!("Could not flush log metrics on Drop. {}", e)

View File

@ -123,12 +123,6 @@ mod test {
use core::*;
use proxy::Proxy;
// DECL = WITH_CLAUSE => { ELEM; ELEM.. }
// | ELEM
//
// ELEM =
// DECL
metrics!{TEST: Proxy = "test_prefix" => {
M1: Marker = "failed";
C1: Counter = "failed";

View File

@ -1,4 +1,4 @@
use core::{Value, RawMetric, Kind, Name, RawInput};
use core::{Value, RawMetric, Kind, Name, RawInput, Flush};
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::BTreeMap;
@ -32,6 +32,11 @@ impl RawInput for StatsMap {
}
}
impl Flush for StatsMap {
}
impl From<StatsMap> for BTreeMap<String, Value> {
fn from(map: StatsMap) -> Self {
// FIXME this is is possibly a full map copy, for nothing.

View File

@ -1,6 +1,6 @@
//! Dispatch metrics to multiple sinks.
use core::{Output, Input, Name, WithName, OutputDyn, Kind, Metric, WithAttributes, Attributes};
use core::{Output, Input, Name, WithName, OutputDyn, Kind, Metric, WithAttributes, Attributes, Flush};
use error;
use std::sync::Arc;
@ -89,7 +89,9 @@ impl Input for MultiInput {
metric.write(value)
})
}
}
impl Flush for MultiInput {
fn flush(&self) -> error::Result<()> {
for w in &self.inputs {
w.flush()?;

50
src/prometheus.rs Normal file → Executable file
View File

@ -2,31 +2,53 @@
// TODO impl this
use core::*;
use output::*;
use error;
use self_metrics::*;
use std::net::ToSocketAddrs;
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use std::io::Write;
use std::fmt::Debug;
use socket::RetrySocket;
use prometheus_proto as proto;
metrics!{
<Aggregate> DIPSTICK_METRICS.add_prefix("prometheus") => {
Marker SEND_ERR: "send_failed";
Marker TRESHOLD_EXCEEDED: "bufsize_exceeded";
Counter SENT_BYTES: "sent_bytes";
}
pub struct PrometheusOutput {
}
impl RawOutput for PrometheusOutput {
type INPUT = Prometheus;
fn new_raw_input(&self) -> Self::INPUT {
Prometheus {}
}
}
pub struct Prometheus {
}
impl RawInput for Prometheus {
/// Define a metric of the specified type.
fn new_metric_raw(&self, name: Name, kind: Kind) -> RawMetric {
RawMetric::new(|_value| {})
}
}
impl Flush for Prometheus {
/// Flush does nothing by default.
fn flush(&self) -> error::Result<()> {
Ok(())
}
}
/// Send metrics to a prometheus server at the address and port provided.
pub fn output_prometheus<ADDR>(address: ADDR) -> error::Result<MetricOutput<Prometheus>>
where
ADDR: ToSocketAddrs + Debug + Clone,
pub fn output_prometheus<ADDR>(address: ADDR) -> error::Result<PrometheusOutput>
{
Ok(PrometheusOutput{})
}
//mod shit {
// include! {concat!{env!{"RUST_GEN_SRC"}, "/prometheus.rs"}}
//}

View File

@ -1,6 +1,6 @@
//! Decouple metric definition from configuration with trait objects.
use core::{Name, WithName, Kind, Input, Metric, NO_METRIC_OUTPUT, WithAttributes, Attributes};
use core::{Name, WithName, Kind, Input, Metric, NO_METRIC_OUTPUT, WithAttributes, Attributes, Flush};
use error;
use std::collections::{HashMap, BTreeMap};
@ -220,6 +220,9 @@ impl Input for Proxy {
});
Metric::new(move |value| proxy.target.borrow().0.write(value))
}
}
impl Flush for Proxy {
fn flush(&self) -> error::Result<()> {
self.inner.write().expect("Dispatch Lock").flush(self.get_namespace())

View File

@ -2,7 +2,7 @@
//! Metrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::{Input, Value, Metric, Name, Kind, WithName, OutputDyn, Output,
WithAttributes, Attributes, Cache};
WithAttributes, Attributes, Cache, Flush};
use error;
use metrics;
@ -104,6 +104,9 @@ impl Input for Queue {
}
})
}
}
impl Flush for Queue {
fn flush(&self) -> error::Result<()> {
if let Err(e) = self.sender.send(QueueCmd::Flush(self.target.clone())) {

View File

@ -2,7 +2,7 @@
//! RawMetrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.
use core::{Value, RawMetric, Name, Kind, WithName, RawOutputDyn,
WithAttributes, Attributes, Input, Output, Metric, UnsafeInput};
WithAttributes, Attributes, Input, Output, Metric, UnsafeInput, Flush};
use error;
use metrics;
@ -17,7 +17,7 @@ fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<RawQueueCmd>> {
while !done {
match receiver.recv() {
Ok(RawQueueCmd::Write(wfn, value)) => wfn.write(value),
Ok(RawQueueCmd::Flush(input)) => if let Err(e) = input.flush_raw() {
Ok(RawQueueCmd::Flush(input)) => if let Err(e) = input.flush() {
debug!("Could not asynchronously flush metrics: {}", e);
},
Err(e) => {
@ -104,6 +104,9 @@ impl Input for RawQueue {
}
})
}
}
impl Flush for RawQueue {
fn flush(&self) -> error::Result<()> {
if let Err(e) = self.sender.send(RawQueueCmd::Flush(self.target.clone())) {

View File

@ -1,7 +1,7 @@
//! Send metrics to a statsd server.
use core::{Input, Output, Value, Metric, Attributes, WithAttributes, Kind,
Name, WithSamplingRate, WithName, WithBuffering, Sampling, Cache, Async};
Name, WithSamplingRate, WithName, WithBuffering, Sampling, Cache, Async, Flush};
use pcg32;
use error;
use metrics;
@ -104,6 +104,9 @@ impl Input for Statsd {
})
}
}
}
impl Flush for Statsd {
fn flush(&self) -> error::Result<()> {
let mut buffer = self.buffer.write().expect("InputBuffer");

View File

@ -1,7 +1,8 @@
//! Standard stateless metric outputs.
// TODO parameterize templates
use core::{Name, WithName, Value, Kind, RawInput, WithAttributes, Attributes, WithBuffering, RawMetric, RawOutput, Cache, RawAsync};
use core::{Name, WithName, Value, Kind, RawInput, WithAttributes, Attributes,
WithBuffering, RawMetric, RawOutput, Cache, RawAsync, Flush};
use error;
use std::sync::{RwLock, Arc};
use std::io::{Write, self};
@ -74,10 +75,10 @@ impl<W: Write + Send + Sync + 'static> WithBuffering for TextOutput<W> {}
impl<W: Write + Send + Sync + 'static> RawOutput for TextOutput<W> {
type INPUT = TextInput<W>;
type INPUT = Text<W>;
fn new_raw_input(&self) -> Self::INPUT {
TextInput {
Text {
attributes: self.attributes.clone(),
entries: Rc::new(RefCell::new(Vec::new())),
output: self.clone(),
@ -86,15 +87,15 @@ impl<W: Write + Send + Sync + 'static> RawOutput for TextOutput<W> {
}
/// The scope-local input for buffered text metrics output.
pub struct TextInput<W: Write + Send + Sync + 'static> {
pub struct Text<W: Write + Send + Sync + 'static> {
attributes: Attributes,
entries: Rc<RefCell<Vec<Vec<u8>>>>,
output: TextOutput<W>,
}
impl<W: Write + Send + Sync + 'static> Clone for TextInput<W> {
impl<W: Write + Send + Sync + 'static> Clone for Text<W> {
fn clone(&self) -> Self {
TextInput {
Text {
attributes: self.attributes.clone(),
entries: self.entries.clone(),
output: self.output.clone(),
@ -102,14 +103,14 @@ impl<W: Write + Send + Sync + 'static> Clone for TextInput<W> {
}
}
impl<W: Write + Send + Sync + 'static> WithAttributes for TextInput<W> {
impl<W: Write + Send + Sync + 'static> WithAttributes for Text<W> {
fn get_attributes(&self) -> &Attributes { &self.attributes }
fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}
impl<W: Write + Send + Sync + 'static> WithBuffering for TextInput<W> {}
impl<W: Write + Send + Sync + 'static> WithBuffering for Text<W> {}
impl<W: Write + Send + Sync + 'static> RawInput for TextInput<W> {
impl<W: Write + Send + Sync + 'static> RawInput for Text<W> {
fn new_metric_raw(&self, name: Name, kind: Kind) -> RawMetric {
let name = self.qualified_name(name);
let template = (self.output.format_fn)(&name, kind);
@ -144,8 +145,11 @@ impl<W: Write + Send + Sync + 'static> RawInput for TextInput<W> {
})
}
}
}
fn flush_raw(&self) -> error::Result<()> {
impl<W: Write + Send + Sync + 'static> Flush for Text<W> {
fn flush(&self) -> error::Result<()> {
let mut entries = self.entries.borrow_mut();
if !entries.is_empty() {
let mut output = self.output.inner.write().expect("TextOutput");
@ -158,9 +162,9 @@ impl<W: Write + Send + Sync + 'static> RawInput for TextInput<W> {
}
}
impl<W: Write + Send + Sync + 'static> Drop for TextInput<W> {
impl<W: Write + Send + Sync + 'static> Drop for Text<W> {
fn drop(&mut self) {
if let Err(e) = self.flush_raw() {
if let Err(e) = self.flush() {
warn!("Could not flush text metrics on Drop. {}", e)
}
}