Switch the HashMap+Mutex over to Dashmap

Hopefully Dashmap ends up with improved performance for 🌭
This commit is contained in:
R Tyler Croy 2020-06-21 21:10:24 -07:00
parent bf9a534c72
commit 5a6a76990b
5 changed files with 76 additions and 19 deletions

46
Cargo.lock generated
View File

@ -55,6 +55,15 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "aho-corasick"
version = "0.7.10"
@ -135,9 +144,9 @@ dependencies = [
[[package]]
name = "async-std"
version = "1.6.0"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a45cee2749d880d7066e328a7e161c7470ced883b2fd000ca4643e9f1dd5083a"
checksum = "b93c583a035d21e6d6f09adf48abfc55277bf48886406df370e5db6babe3ab98"
dependencies = [
"async-attributes",
"async-task",
@ -356,6 +365,26 @@ dependencies = [
"yaml-rust 0.4.4",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "cookie"
version = "0.14.1"
@ -439,6 +468,17 @@ dependencies = [
"subtle 1.0.0",
]
[[package]]
name = "dashmap"
version = "3.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cfcd41ae02d60edded204341d2798ba519c336c51a37330aa4b98a1128def32"
dependencies = [
"ahash",
"cfg-if",
"num_cpus",
]
[[package]]
name = "derivative"
version = "2.1.1"
@ -737,11 +777,13 @@ dependencies = [
"chrono",
"clap",
"config",
"dashmap",
"dipstick",
"futures",
"handlebars",
"jmespath",
"log",
"parking_lot",
"pretty_env_logger",
"rdkafka",
"regex",

View File

@ -8,7 +8,7 @@ edition = "2018"
[dependencies]
# Base asynchronous runtime
async-std = { version = "~1.6.0", features = ["attributes"] }
async-std = { version = "~1.6.1", features = ["attributes"] }
async-trait = "0.1.31"
futures = { version = "~0.3.4", features = ["thread-pool"] }
@ -22,6 +22,7 @@ clap = { version = "~2.33.0", features = ["yaml"] }
chrono = "~0.4.11"
# Handling configuration overlays
config = { version = "~0.10.1", features = ["yaml"] }
dashmap = "~3.11.4"
# Needed to report metrics of hotdog's performance
dipstick = "~0.9.0"
# Used for string replacements and other template based transformations
@ -32,6 +33,8 @@ handlebars = "~3.0.1"
jmespath = { git = "https://github.com/jmespath/jmespath.rs", features = ["sync"] }
# Logging
log = "~0.4.8"
# Faster locking primitives
parking_lot = "~0.10.2"
pretty_env_logger = "~0.3.1"
# Needed for forwarding messages along to Kafka

View File

@ -140,6 +140,9 @@ impl Kafka {
let start_time = Instant::now();
let producer = producer.clone();
// TODO: What if this is a task::spawn for each message, would that be too much
// overhead?
let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
/*
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the

View File

@ -43,6 +43,8 @@ use settings::*;
async fn main() -> Result<(), errors::HotdogError> {
pretty_env_logger::init();
info!("Starting hotdog version {}", env!["CARGO_PKG_VERSION"]);
let matches = App::new("Hotdog")
.version(env!("CARGO_PKG_VERSION"))
.author("R Tyler Croy <rtyler+hotdog@brokenco.de")

View File

@ -4,7 +4,8 @@
*
* The status module is also responsible for dispatching _all_ statsd metrics.
*/
use async_std::sync::{channel, Arc, Mutex, Receiver, Sender};
use async_std::sync::{channel, Arc, Receiver, Sender};
use dashmap::DashMap;
use dipstick::{InputScope, StatsdScope};
use log::*;
use serde::{Deserialize, Serialize};
@ -51,7 +52,7 @@ pub async fn status_server(
/**
* Simple type for tracking our statistics as time goes on
*/
type ThreadsafeStats = Arc<Mutex<HashMap<String, i64>>>;
type ThreadsafeStats = Arc<DashMap<String, i64>>;
pub type Statistic = (Stats, i64);
pub struct StatsHandler {
@ -64,7 +65,7 @@ pub struct StatsHandler {
impl StatsHandler {
pub fn new(metrics: Arc<StatsdScope>) -> Self {
let (tx, rx) = channel(1_000_000);
let values = Arc::new(Mutex::new(HashMap::<String, i64>::new()));
let values = Arc::new(DashMap::default());
StatsHandler {
values,
@ -103,12 +104,14 @@ impl StatsHandler {
*/
async fn handle_gauge(&self, stat: Stats, count: i64) {
let key = &stat.to_string();
let mut values = self.values.lock().await;
let new_count = values.get(key).unwrap_or(&0) + count;
let mut new_count = 0;
if let Some(gauge) = self.values.get(key) {
new_count = *gauge.value();
}
new_count += count;
self.metrics.gauge(key).value(new_count);
values.insert(key.to_string(), new_count);
self.values.insert(key.to_string(), new_count);
}
/**
@ -116,8 +119,12 @@ impl StatsHandler {
*/
async fn handle_counter(&self, stat: Stats, count: i64) {
let key = &stat.to_string();
let mut values = self.values.lock().await;
let new_count = values.get(key).unwrap_or(&0) + count;
let mut new_count = 0;
if let Some(counter) = self.values.get(key) {
new_count = *counter.value();
}
new_count += count;
let sized_count: usize = count.try_into().expect("Could not convert to usize!");
@ -128,17 +135,17 @@ impl StatsHandler {
Stats::KafkaMsgSubmitted { topic } => {
let subkey = &*format!("{}.{}", key, topic);
self.metrics.counter(subkey).count(sized_count);
values.insert(subkey.to_string(), new_count);
self.values.insert(subkey.to_string(), new_count);
}
Stats::KafkaMsgErrored { errcode } => {
let subkey = &*format!("{}.{}", key, errcode);
self.metrics.counter(subkey).count(sized_count);
values.insert(subkey.to_string(), new_count);
self.values.insert(subkey.to_string(), new_count);
}
_ => {}
};
values.insert(key.to_string(), new_count);
self.values.insert(key.to_string(), new_count);
}
/**
@ -146,13 +153,13 @@ impl StatsHandler {
*/
async fn handle_timer(&self, stat: Stats, duration_us: i64) {
let key = &stat.to_string();
let mut values = self.values.lock().await;
if let Ok(duration) = duration_us.try_into() {
self.metrics.timer(key).interval_us(duration);
} else {
error!("Failed to report timer to statsd with an i64 that couldn't fit into u64");
}
values.insert(key.to_string(), duration_us);
self.values.insert(key.to_string(), duration_us);
}
/**
@ -162,8 +169,8 @@ impl StatsHandler {
async fn healthcheck(&self) -> HealthResponse {
let mut stats = HashMap::new();
for (key, value) in self.values.lock().await.iter() {
stats.insert(key.to_string(), *value);
for entry in self.values.iter() {
stats.insert(entry.key().clone(), entry.value().clone());
}
HealthResponse {