unregister from distributors and dispatchers when a child panics (#336)

This commit is contained in:
Jeremy Lempereur 2021-08-13 09:48:48 +02:00 committed by GitHub
parent 44508cd4e7
commit beff46a61b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 52 deletions

View File

@ -6,6 +6,7 @@ use crate::child_ref::ChildRef;
use crate::context::{BastionContext, BastionId, ContextState}; use crate::context::{BastionContext, BastionId, ContextState};
use crate::envelope::Envelope; use crate::envelope::Envelope;
use crate::message::BastionMessage; use crate::message::BastionMessage;
use crate::prelude::ChildrenRef;
#[cfg(feature = "scaling")] #[cfg(feature = "scaling")]
use crate::resizer::ActorGroupStats; use crate::resizer::ActorGroupStats;
use crate::system::SYSTEM; use crate::system::SYSTEM;
@ -105,9 +106,13 @@ impl Child {
warn!("Child({}): Panicked.", id); warn!("Child({}): Panicked.", id);
if let Some(parent) = &parent_inner { if let Some(parent) = &parent_inner {
let used_dispatchers = parent.dispatchers(); Self::remove_from_dispatchers(parent, &child_ref_inner);
let global_dispatcher = SYSTEM.dispatcher(); let _ = Self::remove_from_distributors(parent, &child_ref_inner).map_err(|_| {
global_dispatcher.remove(used_dispatchers, &child_ref_inner); warn!(
"Child({}): couldn't remove child from parent distributors.",
id
)
});
} }
let id = id.clone(); let id = id.clone();
@ -124,17 +129,20 @@ impl Child {
fn stopped(&mut self) { fn stopped(&mut self) {
debug!("Child({}): Stopped.", self.id()); debug!("Child({}): Stopped.", self.id());
self.remove_from_dispatchers(); let parent = self.bcast.parent().clone().into_children().unwrap();
let _ = self.remove_from_distributors();
Self::remove_from_dispatchers(&parent, &self.child_ref);
let _ = Self::remove_from_distributors(&parent, &self.child_ref);
self.bcast.stopped(); self.bcast.stopped();
} }
fn faulted(&mut self) { fn faulted(&mut self) {
debug!("Child({}): Faulted.", self.id()); debug!("Child({}): Faulted.", self.id());
self.remove_from_dispatchers();
let _ = self.remove_from_distributors();
let parent = self.bcast.parent().clone().into_children().unwrap(); let parent = self.bcast.parent().clone().into_children().unwrap();
Self::remove_from_dispatchers(&parent, &self.child_ref);
let _ = Self::remove_from_distributors(&parent, &self.child_ref);
let path = self.bcast.path().clone(); let path = self.bcast.path().clone();
let sender = self.bcast.sender().clone(); let sender = self.bcast.sender().clone();
@ -304,11 +312,13 @@ impl Child {
async fn run(mut self) { async fn run(mut self) {
debug!("Child({}): Launched.", self.id()); debug!("Child({}): Launched.", self.id());
if let Err(e) = self.register_in_dispatchers() { let parent = self.bcast.parent().clone().into_children().unwrap();
if let Err(e) = Self::register_in_dispatchers(&parent, &self.child_ref) {
error!("couldn't add actor to the registry: {}", e); error!("couldn't add actor to the registry: {}", e);
return; return;
}; };
if let Err(e) = self.register_to_distributors() { if let Err(e) = Self::register_to_distributors(&parent, &self.child_ref) {
error!("couldn't add actor to the distributors: {}", e); error!("couldn't add actor to the distributors: {}", e);
return; return;
}; };
@ -396,57 +406,43 @@ impl Child {
} }
/// Adds the actor into each registry declared in the parent node. /// Adds the actor into each registry declared in the parent node.
fn register_in_dispatchers(&self) -> AnyResult<()> { fn register_in_dispatchers(parent: &ChildrenRef, child_ref: &ChildRef) -> AnyResult<()> {
if let Some(parent) = self.bcast.parent().clone().into_children() { let used_dispatchers = parent.dispatchers();
let child_ref = self.child_ref.clone();
let used_dispatchers = parent.dispatchers();
let global_dispatcher = SYSTEM.dispatcher(); let global_dispatcher = SYSTEM.dispatcher();
// FIXME: Pass the module name explicitly? // FIXME: Pass the module name explicitly?
let module_name = module_path!().to_string(); let module_name = module_path!().to_string();
global_dispatcher.register(used_dispatchers, &child_ref, module_name)?; global_dispatcher.register(used_dispatchers, &child_ref, module_name)
}
Ok(())
} }
/// Adds the actor into each distributor declared in the parent node. /// Adds the actor into each distributor declared in the parent node.
fn register_to_distributors(&self) -> AnyResult<()> { fn register_to_distributors(parent: &ChildrenRef, child_ref: &ChildRef) -> AnyResult<()> {
if let Some(parent) = self.bcast.parent().clone().into_children() { let distributors = parent.distributors();
let child_ref = self.child_ref.clone();
let distributors = parent.distributors(); let global_dispatcher = SYSTEM.dispatcher();
distributors
.iter()
.map(|&distributor| {
global_dispatcher.register_recipient(&distributor, child_ref.clone())
})
.collect::<AnyResult<Vec<_>>>()?;
let global_dispatcher = SYSTEM.dispatcher();
distributors
.iter()
.map(|&distributor| {
global_dispatcher.register_recipient(&distributor, child_ref.clone())
})
.collect::<AnyResult<Vec<_>>>()?;
}
Ok(()) Ok(())
} }
/// Cleanup the actor's record from each declared distributor. /// Cleanup the actor's record from each declared distributor.
fn remove_from_distributors(&self) -> AnyResult<()> { fn remove_from_distributors(parent: &ChildrenRef, child_ref: &ChildRef) -> AnyResult<()> {
if let Some(parent) = self.bcast.parent().clone().into_children() { let distributors = parent.distributors();
let child_ref = self.child_ref.clone(); let global_dispatcher = SYSTEM.dispatcher();
let distributors = parent.distributors(); global_dispatcher.remove_recipient(distributors, &child_ref)?;
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.remove_recipient(distributors, child_ref)?;
}
Ok(()) Ok(())
} }
/// Cleanup the actor's record from each declared dispatcher. /// Cleanup the actor's record from each declared dispatcher.
fn remove_from_dispatchers(&self) { fn remove_from_dispatchers(parent: &ChildrenRef, child_ref: &ChildRef) {
if let Some(parent) = self.bcast.parent().clone().into_children() { let used_dispatchers = parent.dispatchers();
let child_ref = self.child_ref.clone(); let global_dispatcher = SYSTEM.dispatcher();
let used_dispatchers = parent.dispatchers(); global_dispatcher.remove(used_dispatchers, &child_ref);
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.remove(used_dispatchers, &child_ref);
}
} }
#[cfg(feature = "scaling")] #[cfg(feature = "scaling")]

View File

@ -581,7 +581,7 @@ impl GlobalDispatcher {
pub(crate) fn remove_recipient( pub(crate) fn remove_recipient(
&self, &self,
distributor_list: &[Distributor], distributor_list: &[Distributor],
child_ref: ChildRef, child_ref: &ChildRef,
) -> AnyResult<()> { ) -> AnyResult<()> {
let distributors = self.distributors.write().map_err(|error| { let distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error) anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
@ -589,7 +589,7 @@ impl GlobalDispatcher {
distributor_list.iter().for_each(|distributor| { distributor_list.iter().for_each(|distributor| {
distributors distributors
.get(&distributor) .get(&distributor)
.map(|recipients| recipients.remove(&child_ref)); .map(|recipients| recipients.remove(child_ref));
}); });
Ok(()) Ok(())
} }
@ -959,7 +959,7 @@ mod tests {
assert!(!global_dispatcher.distributors.read().unwrap().is_empty()); assert!(!global_dispatcher.distributors.read().unwrap().is_empty());
global_dispatcher global_dispatcher
.remove_recipient(&[distributor], child_ref) .remove_recipient(&[distributor], &child_ref)
.unwrap(); .unwrap();
global_dispatcher.remove_distributor(&distributor).unwrap(); global_dispatcher.remove_distributor(&distributor).unwrap();
// Distributor is now removed because it has no remaining recipients. // Distributor is now removed because it has no remaining recipients.

View File

@ -528,7 +528,7 @@ impl Distributor {
/// ``` /// ```
pub fn unsubscribe(&self, child_ref: ChildRef) -> AnyResult<()> { pub fn unsubscribe(&self, child_ref: ChildRef) -> AnyResult<()> {
let global_dispatcher = SYSTEM.dispatcher(); let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.remove_recipient(&vec![*self], child_ref) global_dispatcher.remove_recipient(&vec![*self], &child_ref)
} }
pub(crate) fn interned(&self) -> &Spur { pub(crate) fn interned(&self) -> &Spur {