Don't remove a distributor from the global dispatcher if it has clients (#330)

Because distributors are usable globally by name, it's possible (and
useful) for multiple groups of children to use the same distributor.
However, if one of the groups of children is then stopped as part of the
normal flow of an application, the distributor is removed from the
global dispatcher, and all senders receive an error on trying to send to
that distributor, even if it still has active recipients from other
groups of children.

This commit changes `remove_distributor` in the global dispatcher so
that it checks to see if it still has recipients and only removes the
distributor if there are no remaining recipients.
This commit is contained in:
Colin J. Fuller 2021-04-27 08:35:16 -04:00 committed by GitHub
parent 7fb016ced9
commit 44508cd4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 6 deletions

View File

@ -61,8 +61,6 @@
///! // We create the function to exec when each children is called
///! .with_exec(move |ctx: BastionContext| async move { /* ... */ })
///! ```
/*
* cargo.toml:
*
@ -74,7 +72,6 @@
* tracing-subscriber = "0.2.17"
*
*/
use anyhow::{anyhow, Context, Result as AnyResult};
use bastion::distributor::*;
use bastion::prelude::*;

View File

@ -613,12 +613,18 @@ impl GlobalDispatcher {
Ok(())
}
/// Removes distributor from the global registry.
/// Removes distributor from the global registry if it has no remaining recipients.
pub(crate) fn remove_distributor(&self, distributor: &Distributor) -> AnyResult<()> {
let mut distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
anyhow::anyhow!("couldn't get write lock on distributors {:?}", error)
})?;
distributors.remove(distributor);
if distributors
.get(&distributor)
.map(|recipient| recipient.all().is_empty())
.unwrap_or_default()
{
distributors.remove(distributor);
}
Ok(())
}
}
@ -919,4 +925,44 @@ mod tests {
let handler_was_called = handler.was_called();
assert_eq!(handler_was_called, true);
}
#[test]
fn test_global_dispatcher_removes_distributor_with_no_recipients() {
let global_dispatcher = GlobalDispatcher::new();
let distributor = Distributor::named("test-distributor");
global_dispatcher
.register_distributor(&distributor)
.unwrap();
assert!(!global_dispatcher.distributors.read().unwrap().is_empty());
global_dispatcher.remove_distributor(&distributor).unwrap();
assert!(global_dispatcher.distributors.read().unwrap().is_empty());
}
#[test]
fn test_global_dispatcher_keeps_distributor_with_recipients() {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);
let global_dispatcher = GlobalDispatcher::new();
let distributor = Distributor::named("test-distributor");
global_dispatcher
.register_distributor(&distributor)
.unwrap();
global_dispatcher
.register_recipient(&distributor, child_ref.clone())
.unwrap();
global_dispatcher.remove_distributor(&distributor).unwrap();
// Should maintain the dispatcher because it still has a recipient.
assert!(!global_dispatcher.distributors.read().unwrap().is_empty());
global_dispatcher
.remove_recipient(&[distributor], child_ref)
.unwrap();
global_dispatcher.remove_distributor(&distributor).unwrap();
// Distributor is now removed because it has no remaining recipients.
assert!(global_dispatcher.distributors.read().unwrap().is_empty());
}
}