Add RetrySocket for graphite

This commit is contained in:
Francis Lalonde 2017-12-07 15:06:14 -05:00
parent 2aa93cb14b
commit b29dade767
7 changed files with 223 additions and 13 deletions

98
Cargo.lock generated
View File

@ -5,6 +5,31 @@ dependencies = [
"dipstick 0.4.14-alpha.0",
]
[[package]]
name = "aho-corasick"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ansi_term"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "badlog"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "counter_timer_gauge"
version = "0.0.0"
@ -45,10 +70,20 @@ name = "either"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "env_logger"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "graphite"
version = "0.0.0"
dependencies = [
"badlog 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dipstick 0.4.14-alpha.0",
]
@ -84,6 +119,14 @@ name = "log"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "memchr"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "multi_print"
version = "0.0.0"
@ -147,6 +190,23 @@ name = "redox_syscall"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "regex"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex-syntax"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "static_metrics"
version = "0.0.0"
@ -164,6 +224,15 @@ dependencies = [
"unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "thread_local"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "time"
version = "0.1.38"
@ -180,6 +249,24 @@ name = "unicode-xid"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "unreachable"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "utf8-ranges"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi"
version = "0.2.8"
@ -191,21 +278,32 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4"
"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6"
"checksum badlog 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "69ff7ee77081b9de4c7ec6721321c892f02b02e9d29107f469e88b42468740cc"
"checksum derivative 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "67b3d6d0e84e53a5bdc263cc59340541877bb541706a191d762bfac6a481bdde"
"checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3"
"checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b"
"checksum itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4833d6978da405305126af4ac88569b5d71ff758581ce5a987dbfa3755f694fc"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73"
"checksum libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "5ba3df4dcb460b9dfbd070d41c94c19209620c191b0340b929ce748a2bcd42d2"
"checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b"
"checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d"
"checksum num 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "a311b77ebdc5dd4cf6449d81e4135d9f0e3b153839ac90e648a8ef538f923525"
"checksum num-integer 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "d1452e8b06e448a07f0e6ebb0bb1d92b8890eea63288c0b627331d53514d0fba"
"checksum num-iter 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)" = "7485fcc84f85b4ecd0ea527b14189281cf27d60e583ae65ebc9c088b13dffe01"
"checksum num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "99843c856d68d8b4313b03a17e33c4bb42ae8f6610ea81b28abe076ac721b9b0"
"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
"checksum redox_syscall 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "ab105df655884ede59d45b7070c8a65002d921461ee813a024558ca16030eea0"
"checksum regex 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ac6ab4e9218ade5b423358bbd2567d1617418403c7a512603630181813316322"
"checksum regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad890a5eef7953f55427c50575c680c42841653abd2b028b68cd223d157f62db"
"checksum syn 0.10.8 (registry+https://github.com/rust-lang/crates.io-index)" = "58fd09df59565db3399efbba34ba8a2fec1307511ebd245d0061ff9d42691673"
"checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14"
"checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520"
"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
"checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56"
"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"

View File

@ -5,3 +5,4 @@ workspace = "../../"
[dependencies]
dipstick = { path = '../../' }
badlog = "1.0"

View File

@ -1,10 +1,14 @@
//! A sample application sending ad-hoc counter values both to statsd _and_ to stdout.
extern crate dipstick;
extern crate badlog;
use dipstick::*;
use std::time::Duration;
fn main() {
badlog::init(Some("info"));
let metrics = metrics(
to_graphite("localhost:2003", "myapp.").expect("Could not connect to graphite")
);
@ -12,6 +16,6 @@ fn main() {
loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);
std::thread::sleep_ms(40);
std::thread::sleep(Duration::from_millis(40));
}
}

View File

@ -3,13 +3,13 @@
extern crate dipstick;
use dipstick::*;
use std::time::Duration;
fn main() {
let metrics = metrics(
// Metric caching allows re-use of the counter, skipping cost of redefining it on each use.
cache(12, (
sample(0.01, to_statsd("localhost:8125", "myapp.").expect("Could not connect to statsd")),
// to_graphite("localhost:8125", "myapp."),
to_stdout(),
)),
);
@ -17,6 +17,6 @@ fn main() {
loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);
std::thread::sleep_ms(40);
std::thread::sleep(Duration::from_millis(40));
}
}

View File

@ -4,24 +4,23 @@ use core::*;
use error;
use selfmetrics::*;
use std::net::TcpStream;
use std::net::ToSocketAddrs;
use std::sync::{Arc, RwLock};
pub use std::net::ToSocketAddrs;
use std::time::{SystemTime, UNIX_EPOCH};
use std::io::Write;
use std::fmt::Debug;
use socket::RetrySocket;
/// Send metrics to a graphite server at the address and port provided.
pub fn to_graphite<ADDR>(address: ADDR, prefix: &str) -> error::Result<GraphiteSink>
where
ADDR: ToSocketAddrs + Debug,
ADDR: ToSocketAddrs + Debug + 'static + Send + Sync,
{
debug!("Connecting to graphite {:?}", address);
let socket = TcpStream::connect(address)?;
socket.set_nonblocking(true)?;
Ok(GraphiteSink {
socket: Arc::new(RwLock::new(socket)),
socket: Arc::new(RwLock::new(RetrySocket::new(address)?)),
prefix: String::from(prefix),
})
}
@ -51,7 +50,7 @@ pub struct GraphiteMetric {
#[derive(Debug)]
struct ScopeBuffer {
buffer: Arc<RwLock<String>>,
socket: Arc<RwLock<TcpStream>>,
socket: Arc<RwLock<RetrySocket>>,
auto_flush: bool,
}
@ -102,7 +101,7 @@ impl ScopeBuffer {
// TODO locking is getting out of hand - use some Cell... or make scopes !Sync
let mut buf = self.buffer.write().expect("Could not lock graphite buffer.");
if !buf.is_empty() {
let mut sock = self.socket.write().expect("Could not lock graphite socket.");
let mut sock = self.socket.write().expect("Could not lock socket.");
match sock.write(buf.as_bytes()) {
Ok(size) => {
buf.clear();
@ -111,6 +110,7 @@ impl ScopeBuffer {
}
Err(e) => {
SEND_ERR.mark();
// still just a best effort, do not warn! for every failure
debug!("Failed to send buffer to graphite: {}", e);
}
};
@ -122,7 +122,7 @@ impl ScopeBuffer {
/// Allows sending metrics to a graphite server
#[derive(Debug)]
pub struct GraphiteSink {
socket: Arc<RwLock<TcpStream>>,
socket: Arc<RwLock<RetrySocket>>,
prefix: String,
}

View File

@ -161,6 +161,9 @@ pub use statsd::*;
mod graphite;
pub use graphite::*;
mod socket;
pub use socket::*;
mod cache;
pub use cache::*;

104
src/socket.rs Normal file
View File

@ -0,0 +1,104 @@
use std::net::TcpStream;
use std::net::{ToSocketAddrs, SocketAddr};
use std::io;
use std::time::{Duration, Instant};
use std::fmt::{Debug, Formatter};
use std::fmt;
use std::io::Write;
const MIN_RECONNECT_DELAY_MS: u64 = 50;
const MAX_RECONNECT_DELAY_MS: u64 = 10000;
/// A socket that retries
pub struct RetrySocket {
retries: usize,
next_try: Instant,
addresses: Vec<SocketAddr>,
socket: Option<TcpStream>,
}
impl Debug for RetrySocket {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
self.next_try.fmt(f)?;
self.socket.fmt(f)
}
}
impl RetrySocket {
/// Create a new socket that will retry
pub fn new<A: ToSocketAddrs>(addresses: A) -> io::Result<Self> {
// FIXME instead of collecting addresses early, store ToSocketAddrs as trait object
// FIXME apparently this can not be one because of Associated Types clusterfuck (?!)
let addresses = addresses.to_socket_addrs()?.collect();
let mut socket = RetrySocket {
retries: 0,
next_try: Instant::now() - Duration::from_millis(MIN_RECONNECT_DELAY_MS),
addresses,
socket: None
};
// try early connect
let _ = socket.flush().ok();
Ok(socket)
}
}
impl RetrySocket {
fn try_connect(&mut self) -> io::Result<()> {
if self.socket.is_none() {
let now = Instant::now();
if now > self.next_try {
let addresses: &[SocketAddr] = self.addresses.as_ref();
let socket = TcpStream::connect(addresses)?;
socket.set_nonblocking(true)?;
self.retries = 0;
info!("Connected to {:?}", addresses);
self.socket = Some(socket);
}
}
Ok(())
}
fn backoff(&mut self, e: io::Error) -> io::Error {
self.socket = None;
self.retries += 1;
let delay = MAX_RECONNECT_DELAY_MS.min(MIN_RECONNECT_DELAY_MS << self.retries);
warn!("Could not connect to {:?} after {} trie(s). Backing off reconnection by {}ms. {}",
self.addresses, self.retries, delay, e);
self.next_try = Instant::now() + Duration::from_millis(delay);
e
}
fn with_socket<F, T>(&mut self, operation: F) -> io::Result<T>
where F: FnOnce(&mut TcpStream) -> io::Result<T>
{
if let Err(e) = self.try_connect() {
return Err(self.backoff(e))
}
let opres = if let Some(ref mut socket) = self.socket {
operation(socket)
} else {
// still none, quiescent
return Err(io::Error::from(io::ErrorKind::NotConnected))
};
match opres {
Ok(r) => Ok(r),
Err(e) => Err(self.backoff(e))
}
}
}
impl Write for RetrySocket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.with_socket(|sock| sock.write(buf))
}
fn flush(&mut self) -> io::Result<()> {
self.with_socket(|sock| sock.flush())
}
}