From 4af8a4864ba32249ba77ba1bbf23ee3fdeab5f96 Mon Sep 17 00:00:00 2001 From: Yiyu Lin Date: Tue, 12 May 2020 04:47:25 +0800 Subject: [PATCH] add name for children (#205) add rustfmt.toml fix two warnings Co-authored-by: Mahmut Bulut --- rustfmt.toml | 0 src/bastion-executor/src/load_balancer.rs | 2 +- src/bastion-executor/src/sleepers.rs | 2 +- src/bastion/examples/broadcast_message.rs | 14 +++++++----- src/bastion/src/child_ref.rs | 20 ++++++++++++++-- src/bastion/src/children.rs | 28 +++++++++++++++++++---- src/bastion/src/dispatcher.rs | 21 +++++++++++------ 7 files changed, 66 insertions(+), 21 deletions(-) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..e69de29 diff --git a/src/bastion-executor/src/load_balancer.rs b/src/bastion-executor/src/load_balancer.rs index 7d10bba..7af06cd 100644 --- a/src/bastion-executor/src/load_balancer.rs +++ b/src/bastion-executor/src/load_balancer.rs @@ -164,7 +164,7 @@ pub fn stats() -> &'static Stats { #[inline] pub fn core_retrieval() -> &'static usize { lazy_static! { - static ref CORE_COUNT: usize = { placement::get_core_ids().unwrap().len() }; + static ref CORE_COUNT: usize = placement::get_core_ids().unwrap().len(); } &*CORE_COUNT diff --git a/src/bastion-executor/src/sleepers.rs b/src/bastion-executor/src/sleepers.rs index 2416987..e5c6fab 100644 --- a/src/bastion-executor/src/sleepers.rs +++ b/src/bastion-executor/src/sleepers.rs @@ -37,7 +37,7 @@ impl Sleepers { if !self.notified.swap(false, Ordering::SeqCst) { *sleep += 1; - self.wake.wait(sleep).unwrap(); + let _ = self.wake.wait(sleep).unwrap(); } } diff --git a/src/bastion/examples/broadcast_message.rs b/src/bastion/examples/broadcast_message.rs index f7fa787..5897fdd 100644 --- a/src/bastion/examples/broadcast_message.rs +++ b/src/bastion/examples/broadcast_message.rs @@ -47,9 +47,8 @@ fn response_supervisor(supervisor: Supervisor) -> Supervisor { } fn input_group(children: Children) -> Children { - children - .with_redundancy(1) - .with_exec(move |ctx: BastionContext| async move { + children.with_name("input").with_redundancy(1).with_exec( + move |ctx: BastionContext| async move { println!("[Input] Worker started!"); let data = vec!["A B C", "A C C", "B C C"]; @@ -61,11 +60,13 @@ fn input_group(children: Children) -> Children { } Ok(()) - }) + }, + ) } fn process_group(children: Children) -> Children { children + .with_name("process") .with_redundancy(3) .with_dispatcher( // Declare a dispatcher to use. All instantiated actors will be registered in @@ -94,7 +95,7 @@ fn process_group(children: Children) -> Children { *value += 1; } - println!("[Processing] Worker #{:?} processed data. Result: `{:?}`", ctx.current().id(), counter); + println!("[Processing] Worker {} #{:?} processed data. Result: `{:?}`", ctx.current().name(), ctx.current().id(), counter); // Push hashmap with data to the next actor group let group_name = "Response".to_string(); @@ -113,6 +114,7 @@ fn process_group(children: Children) -> Children { fn response_group(children: Children) -> Children { children + .with_name("response") .with_redundancy(1) .with_dispatcher( // We will re-use the dispatcher to make the example easier to understand @@ -137,7 +139,7 @@ fn response_group(children: Children) -> Children { let message = Arc::try_unwrap(raw_message).unwrap(); msg! { message, ref data: HashMap<&str, u32> => { - println!("[Response] Worker received `{:?}`", data); + println!("[Response] Worker {} received `{:?}`", ctx.current().name(), data); for (key, value) in data.iter() { let current_value = counter.entry(key).or_insert(0); diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index 13336ef..eeef281 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -16,12 +16,23 @@ use std::sync::Arc; pub struct ChildRef { id: BastionId, sender: Sender, + name: String, path: Arc, } impl ChildRef { - pub(crate) fn new(id: BastionId, sender: Sender, path: Arc) -> ChildRef { - ChildRef { id, sender, path } + pub(crate) fn new( + id: BastionId, + sender: Sender, + name: String, + path: Arc, + ) -> ChildRef { + ChildRef { + id, + sender, + name, + path, + } } /// Returns the identifier of the children group element this @@ -284,6 +295,11 @@ impl ChildRef { pub fn path(&self) -> &Arc { &self.path } + + /// Return the [`name`] of the child + pub fn name(&self) -> &str { + &self.name + } } impl PartialEq for ChildRef { diff --git a/src/bastion/src/children.rs b/src/bastion/src/children.rs index ded8c33..7c3916c 100644 --- a/src/bastion/src/children.rs +++ b/src/bastion/src/children.rs @@ -89,6 +89,8 @@ pub struct Children { started: bool, // List of dispatchers attached to each actor in the group. dispatchers: Vec>>, + // The name of children + name: Option, } impl Children { @@ -101,6 +103,7 @@ impl Children { let pre_start_msgs = Vec::new(); let started = false; let dispatchers = Vec::new(); + let name = None; Children { bcast, @@ -111,6 +114,7 @@ impl Children { pre_start_msgs, started, dispatchers, + name, } } @@ -156,6 +160,14 @@ impl Children { &self.callbacks } + pub(crate) fn name(&self) -> String { + if let Some(name) = &self.name { + name.clone() + } else { + "__Anonymous__".into() + } + } + pub(crate) fn as_ref(&self) -> ChildrenRef { trace!( "Children({}): Creating new ChildrenRef({}).", @@ -171,7 +183,7 @@ impl Children { for (id, (sender, _)) in &self.launched { trace!("Children({}): Creating new ChildRef({}).", self.id(), id); // TODO: clone or ref? - let child = ChildRef::new(id.clone(), sender.clone(), path.clone()); + let child = ChildRef::new(id.clone(), sender.clone(), self.name(), path.clone()); children.push(child); } @@ -184,6 +196,12 @@ impl Children { ChildrenRef::new(id, sender, path, children, dispatchers) } + /// Sets the name of this children group. + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = Some(name.into()); + self + } + /// Sets the closure taking a [`BastionContext`] and returning a /// [`Future`] that will be used by every element of this children /// group. @@ -237,7 +255,7 @@ impl Children { self } - /// Sets the number of number of elements this children group will + /// Sets the number of elements this children group will /// contain. Each element will call the closure passed in /// [`with_exec`] and run the returned future until it stops, /// panics or another element in the group stops or panics. @@ -456,7 +474,7 @@ impl Children { let id = bcast.id().clone(); let sender = bcast.sender().clone(); let path = bcast.path().clone(); - let child_ref = ChildRef::new(id.clone(), sender.clone(), path); + let child_ref = ChildRef::new(id.clone(), sender.clone(), self.name(), path); let children = self.as_ref(); let supervisor = self.bcast.parent().clone().into_supervisor(); @@ -670,6 +688,8 @@ impl Children { pub(crate) fn launch_elems(&mut self) { debug!("Children({}): Launching elements.", self.id()); + + let name = self.name(); for _ in 0..self.redundancy { let parent = Parent::children(self.as_ref()); let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new())); @@ -678,7 +698,7 @@ impl Children { let id = bcast.id().clone(); let sender = bcast.sender().clone(); let path = bcast.path().clone(); - let child_ref = ChildRef::new(id.clone(), sender.clone(), path); + let child_ref = ChildRef::new(id.clone(), sender.clone(), name.clone(), path); let children = self.as_ref(); let supervisor = self.bcast.parent().clone().into_supervisor(); diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 678866e..83a3e8e 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -443,7 +443,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); assert_eq!(instance.actors.contains_key(&child_ref), false); @@ -457,7 +458,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); instance.register(&child_ref, "my::test::module".to_string()); assert_eq!(instance.actors.contains_key(&child_ref), true); @@ -473,7 +475,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); instance.notify(&child_ref, NotificationType::Register); let handler_was_called = handler.was_called(); @@ -540,7 +543,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); let dispatcher_type = DispatcherType::Named("test".to_string()); let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone()))); @@ -561,7 +565,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); let dispatcher_type = DispatcherType::Named("test".to_string()); let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone()))); @@ -583,7 +588,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); let dispatcher_type = DispatcherType::Named("test".to_string()); let handler = Box::new(CustomHandler::new(false)); @@ -607,7 +613,8 @@ mod tests { let bastion_id = BastionId::new(); let (sender, _) = mpsc::unbounded(); let path = Arc::new(BastionPath::root()); - let child_ref = ChildRef::new(bastion_id, sender, path); + let name = "test_name".to_string(); + let child_ref = ChildRef::new(bastion_id, sender, name, path); let dispatcher_type = DispatcherType::Named("test".to_string()); let handler = Box::new(CustomHandler::new(false));