Fix for BorrowMut Panic #78

This commit is contained in:
Francis Lalonde 2020-04-17 07:52:46 -04:00
parent 9774ea7343
commit 8025b9da5c
3 changed files with 23 additions and 20 deletions

View File

@ -37,7 +37,7 @@ struct ProxyMetric {
// the metric trait object to proxy metric values to
// the second part can be up to namespace.len() + 1 if this metric was individually targeted
// 0 if no target assigned
target: (AtomicRefCell<(InputMetric, usize)>),
target: AtomicRefCell<(InputMetric, usize)>,
// a reference to the the parent proxy to remove the metric from when it is dropped
proxy: Arc<RwLock<InnerProxy>>,

View File

@ -17,16 +17,14 @@ use std::fmt::Debug;
use std::io::Write;
use std::time::{SystemTime, UNIX_EPOCH};
use std::cell::{RefCell, RefMut};
use std::rc::Rc;
use std::sync::Arc;
#[cfg(not(feature = "parking_lot"))]
use std::sync::RwLock;
use std::sync::{RwLock, RwLockWriteGuard};
#[cfg(feature = "parking_lot")]
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockWriteGuard};
/// Graphite output holds a socket to a graphite server.
/// The socket is shared between scopes opened from the output.
@ -42,7 +40,7 @@ impl Output for Graphite {
fn new_scope(&self) -> Self::SCOPE {
GraphiteScope {
attributes: self.attributes.clone(),
buffer: Rc::new(RefCell::new(String::new())),
buffer: Arc::new(RwLock::new(String::new())),
socket: self.socket.clone(),
}
}
@ -76,7 +74,7 @@ impl Buffered for Graphite {}
#[derive(Debug, Clone)]
pub struct GraphiteScope {
attributes: Attributes,
buffer: Rc<RefCell<String>>,
buffer: Arc<RwLock<String>>,
socket: Arc<RwLock<RetrySocket>>,
}
@ -104,7 +102,7 @@ impl OutputScope for GraphiteScope {
impl Flush for GraphiteScope {
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
let buf = self.buffer.borrow_mut();
let buf = self.buffer.write();
self.flush_inner(buf)
}
}
@ -116,7 +114,7 @@ impl GraphiteScope {
let start = SystemTime::now();
let mut buffer = self.buffer.borrow_mut();
let mut buffer = write_lock!(self.buffer);
match start.duration_since(UNIX_EPOCH) {
Ok(timestamp) => {
buffer.push_str(&metric.prefix);
@ -129,7 +127,7 @@ impl GraphiteScope {
metrics::GRAPHITE_OVERFLOW.mark();
warn!("Graphite Buffer Size Exceeded: {}", BUFFER_FLUSH_THRESHOLD);
let _ = self.flush_inner(buffer);
buffer = self.buffer.borrow_mut();
buffer = write_lock!(self.buffer);
}
}
Err(e) => {
@ -144,7 +142,7 @@ impl GraphiteScope {
}
}
fn flush_inner(&self, mut buf: RefMut<String>) -> error::Result<()> {
fn flush_inner(&self, mut buf: RwLockWriteGuard<String>) -> error::Result<()> {
if buf.is_empty() {
return Ok(());
}

View File

@ -13,12 +13,17 @@ use crate::core::pcg32;
use crate::core::{Flush, MetricValue};
use crate::queue::queue_out;
use std::cell::{RefCell, RefMut};
use std::net::ToSocketAddrs;
use std::net::UdpSocket;
use std::rc::Rc;
use std::sync::Arc;
#[cfg(not(feature = "parking_lot"))]
use std::sync::{RwLock, RwLockWriteGuard};
#[cfg(feature = "parking_lot")]
use parking_lot::{RwLock, RwLockWriteGuard};
/// Use a safe maximum size for UDP to prevent fragmentation.
// TODO make configurable?
const MAX_UDP_PAYLOAD: usize = 576;
@ -57,7 +62,7 @@ impl Output for Statsd {
fn new_scope(&self) -> Self::SCOPE {
StatsdScope {
attributes: self.attributes.clone(),
buffer: Rc::new(RefCell::new(String::with_capacity(MAX_UDP_PAYLOAD))),
buffer: Arc::new(RwLock::new(String::with_capacity(MAX_UDP_PAYLOAD))),
socket: self.socket.clone(),
}
}
@ -76,7 +81,7 @@ impl WithAttributes for Statsd {
#[derive(Debug, Clone)]
pub struct StatsdScope {
attributes: Attributes,
buffer: Rc<RefCell<String>>,
buffer: Arc<RwLock<String>>,
socket: Arc<UdpSocket>,
}
@ -133,7 +138,7 @@ impl OutputScope for StatsdScope {
impl Flush for StatsdScope {
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
let buf = self.buffer.borrow_mut();
let buf = write_lock!(self.buffer);
self.flush_inner(buf)
}
}
@ -144,7 +149,7 @@ impl StatsdScope {
let value_str = scaled_value.to_string();
let entry_len = metric.prefix.len() + value_str.len() + metric.suffix.len();
let mut buffer = self.buffer.borrow_mut();
let mut buffer = write_lock!(self.buffer);
if entry_len > buffer.capacity() {
// TODO report entry too big to fit in buffer (!?)
return;
@ -154,7 +159,7 @@ impl StatsdScope {
if entry_len + 1 > remaining {
// buffer is nearly full, make room
let _ = self.flush_inner(buffer);
buffer = self.buffer.borrow_mut();
buffer = write_lock!(self.buffer);
} else {
if !buffer.is_empty() {
// separate from previous entry
@ -172,7 +177,7 @@ impl StatsdScope {
}
}
fn flush_inner(&self, mut buffer: RefMut<String>) -> error::Result<()> {
fn flush_inner(&self, mut buffer: RwLockWriteGuard<String>) -> error::Result<()> {
if !buffer.is_empty() {
match self.socket.send(buffer.as_bytes()) {
Ok(size) => {