Remove a magic number of the channel size between consumers and the producer

This commit is contained in:
R Tyler Croy 2020-08-02 14:44:01 -07:00
parent cec1e3c19f
commit 5162358e7f
3 changed files with 27 additions and 2 deletions

View File

@ -62,3 +62,11 @@ topics:
path: 'other.yml'
routing:
valid: '$name.valid'
# All internal settings are optional, and should only be changed by
# power-users, i.e. those who likely have read the source code to slipstream :)
internal:
# Size of the internal buffer for queueing messages to the librdkafka
# producer. This is _not_ the same as the librdkafka internal buffer and only
# serves to pass processed messages between worker tasks.
sendbuffer: 1024

View File

@ -82,8 +82,7 @@ async fn main() {
// Load schemas from directory
let schemas = load_schemas_from(settings.schemas.clone()).expect("Failed to load schemas.d");
// XXX: fix this magic number and make the channel size configurable
let (sender, mut receiver) = channel::<DispatchMessage>(1024);
let (sender, mut receiver) = channel::<DispatchMessage>(settings.internal.sendbuffer);
// Creating an Arc to pass into the consumers
let schemas = Arc::new(schemas);
let mut kafka_config: ClientConfig = ClientConfig::new();

View File

@ -23,10 +23,21 @@ pub struct Settings {
* ClientConfig
*/
pub kafka: HashMap<String, String>,
/**
* Internal settings that most users shouldn't ever need to tweak
*/
pub internal: Internal,
pub schemas: PathBuf,
pub topics: Vec<Topic>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Internal {
#[serde(default = "default_buffer_size")]
pub sendbuffer: usize,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Topic {
@ -58,6 +69,13 @@ pub struct RoutingInfo {
pub error: Option<String>,
}
/**
* Returns the default buffer size for internal messaging within slipstream
*/
fn default_buffer_size() -> usize {
1024
}
#[cfg(test)]
mod tests {
use super::*;