diff --git a/slipstream.yml b/slipstream.yml index 334b224..e55188b 100644 --- a/slipstream.yml +++ b/slipstream.yml @@ -62,3 +62,11 @@ topics: path: 'other.yml' routing: valid: '$name.valid' + +# All internal settings are optional, and should only be changed by +# power-users, i.e. those who likely have read the source code to slipstream :) +internal: + # Size of the internal buffer for queueing messages to the librdkafka + # producer. This is _not_ the same as the librdkafka internal buffer and only + # serves to pass processed messages between worker tasks. + sendbuffer: 1024 diff --git a/src/main.rs b/src/main.rs index 168ae3c..564e589 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,8 +82,7 @@ async fn main() { // Load schemas from directory let schemas = load_schemas_from(settings.schemas.clone()).expect("Failed to load schemas.d"); - // XXX: fix this magic number and make the channel size configurable - let (sender, mut receiver) = channel::(1024); + let (sender, mut receiver) = channel::(settings.internal.sendbuffer); // Creating an Arc to pass into the consumers let schemas = Arc::new(schemas); let mut kafka_config: ClientConfig = ClientConfig::new(); diff --git a/src/settings.rs b/src/settings.rs index 889d066..3540610 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -23,10 +23,21 @@ pub struct Settings { * ClientConfig */ pub kafka: HashMap, + /** + * Internal settings that most users shouldn't ever need to tweak + */ + pub internal: Internal, pub schemas: PathBuf, pub topics: Vec, } +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Internal { + #[serde(default = "default_buffer_size")] + pub sendbuffer: usize, +} + #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Topic { @@ -58,6 +69,13 @@ pub struct RoutingInfo { pub error: Option, } +/** + * Returns the default buffer size for internal messaging within slipstream + */ +fn default_buffer_size() -> usize { + 1024 +} + #[cfg(test)] mod tests { use super::*;