diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 3a14b22..41a3187 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -37,7 +37,7 @@ struct ProxyMetric { // the metric trait object to proxy metric values to // the second part can be up to namespace.len() + 1 if this metric was individually targeted // 0 if no target assigned - target: (AtomicRefCell<(InputMetric, usize)>), + target: AtomicRefCell<(InputMetric, usize)>, // a reference to the the parent proxy to remove the metric from when it is dropped proxy: Arc>, diff --git a/src/output/graphite.rs b/src/output/graphite.rs index ad0c2f6..a97e835 100755 --- a/src/output/graphite.rs +++ b/src/output/graphite.rs @@ -17,16 +17,14 @@ use std::fmt::Debug; use std::io::Write; use std::time::{SystemTime, UNIX_EPOCH}; -use std::cell::{RefCell, RefMut}; -use std::rc::Rc; - use std::sync::Arc; #[cfg(not(feature = "parking_lot"))] -use std::sync::RwLock; +use std::sync::{RwLock, RwLockWriteGuard}; #[cfg(feature = "parking_lot")] -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; + /// Graphite output holds a socket to a graphite server. /// The socket is shared between scopes opened from the output. @@ -42,7 +40,7 @@ impl Output for Graphite { fn new_scope(&self) -> Self::SCOPE { GraphiteScope { attributes: self.attributes.clone(), - buffer: Rc::new(RefCell::new(String::new())), + buffer: Arc::new(RwLock::new(String::new())), socket: self.socket.clone(), } } @@ -76,7 +74,7 @@ impl Buffered for Graphite {} #[derive(Debug, Clone)] pub struct GraphiteScope { attributes: Attributes, - buffer: Rc>, + buffer: Arc>, socket: Arc>, } @@ -104,7 +102,7 @@ impl OutputScope for GraphiteScope { impl Flush for GraphiteScope { fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); - let buf = self.buffer.borrow_mut(); + let buf = self.buffer.write(); self.flush_inner(buf) } } @@ -116,7 +114,7 @@ impl GraphiteScope { let start = SystemTime::now(); - let mut buffer = self.buffer.borrow_mut(); + let mut buffer = write_lock!(self.buffer); match start.duration_since(UNIX_EPOCH) { Ok(timestamp) => { buffer.push_str(&metric.prefix); @@ -129,7 +127,7 @@ impl GraphiteScope { metrics::GRAPHITE_OVERFLOW.mark(); warn!("Graphite Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD); let _ = self.flush_inner(buffer); - buffer = self.buffer.borrow_mut(); + buffer = write_lock!(self.buffer); } } Err(e) => { @@ -144,7 +142,7 @@ impl GraphiteScope { } } - fn flush_inner(&self, mut buf: RefMut) -> error::Result<()> { + fn flush_inner(&self, mut buf: RwLockWriteGuard) -> error::Result<()> { if buf.is_empty() { return Ok(()); } diff --git a/src/output/statsd.rs b/src/output/statsd.rs index d2924b3..bddc566 100755 --- a/src/output/statsd.rs +++ b/src/output/statsd.rs @@ -13,12 +13,17 @@ use crate::core::pcg32; use crate::core::{Flush, MetricValue}; use crate::queue::queue_out; -use std::cell::{RefCell, RefMut}; use std::net::ToSocketAddrs; use std::net::UdpSocket; -use std::rc::Rc; use std::sync::Arc; +#[cfg(not(feature = "parking_lot"))] +use std::sync::{RwLock, RwLockWriteGuard}; + +#[cfg(feature = "parking_lot")] +use parking_lot::{RwLock, RwLockWriteGuard}; + + /// Use a safe maximum size for UDP to prevent fragmentation. // TODO make configurable? const MAX_UDP_PAYLOAD: usize = 576; @@ -57,7 +62,7 @@ impl Output for Statsd { fn new_scope(&self) -> Self::SCOPE { StatsdScope { attributes: self.attributes.clone(), - buffer: Rc::new(RefCell::new(String::with_capacity(MAX_UDP_PAYLOAD))), + buffer: Arc::new(RwLock::new(String::with_capacity(MAX_UDP_PAYLOAD))), socket: self.socket.clone(), } } @@ -76,7 +81,7 @@ impl WithAttributes for Statsd { #[derive(Debug, Clone)] pub struct StatsdScope { attributes: Attributes, - buffer: Rc>, + buffer: Arc>, socket: Arc, } @@ -133,7 +138,7 @@ impl OutputScope for StatsdScope { impl Flush for StatsdScope { fn flush(&self) -> error::Result<()> { self.notify_flush_listeners(); - let buf = self.buffer.borrow_mut(); + let buf = write_lock!(self.buffer); self.flush_inner(buf) } } @@ -144,7 +149,7 @@ impl StatsdScope { let value_str = scaled_value.to_string(); let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len(); - let mut buffer = self.buffer.borrow_mut(); + let mut buffer = write_lock!(self.buffer); if entry_len > buffer.capacity() { // TODO report entry too big to fit in buffer (!?) return; @@ -154,7 +159,7 @@ impl StatsdScope { if entry_len + 1 > remaining { // buffer is nearly full, make room let _ = self.flush_inner(buffer); - buffer = self.buffer.borrow_mut(); + buffer = write_lock!(self.buffer); } else { if !buffer.is_empty() { // separate from previous entry @@ -172,7 +177,7 @@ impl StatsdScope { } } - fn flush_inner(&self, mut buffer: RefMut) -> error::Result<()> { + fn flush_inner(&self, mut buffer: RwLockWriteGuard) -> error::Result<()> { if !buffer.is_empty() { match self.socket.send(buffer.as_bytes()) { Ok(size) => {