[Refactoring] Bastion actors and definitions (#273)

* dispatch to available children only. (#268)

* Fix RestartStrategy::timeout < 1s (#265)

* Renamed ActorStateData struct to Context

* Extended available states for Actor

* Added methods for additional states

* Added removed state

* Added definition module

* Added Definition struct

* Removed unsafe Send and Sync implementation for MailboxTx

* Added docstrings for Mailbox<T>

* Added tests for the Definition struct

* Added presaving messages

* Removed excessive imports

* Added methods for getting latest message

* Removed reference for the returned envelope

* Code review changes

Co-authored-by: Jeremy Lempereur <jeremy.lempereur@gmail.com>
Co-authored-by: nicolaiunrein <n.unrein@gmail.com>
This commit is contained in:
Valeryi Savich 2020-10-25 12:39:02 +02:00 committed by GitHub
parent ed7ead49f4
commit ea44f84050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 592 additions and 224 deletions

View File

@ -1,5 +1,8 @@
use bastion::prelude::*;
use futures_timer::Delay;
use std::sync::Arc;
use std::time::Duration;
use tracing::Level;
///
/// Prologue:
@ -22,7 +25,28 @@ use std::sync::Arc;
/// 3. We want to use a dispatcher on the second group because we don't want to
/// target a particular child in the first to process the message.
///
/// The output looks like:
/// ```
/// Running `target\debug\examples\round_robin_dispatcher.exe`
/// Aug 20 16:52:19.925 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:19.926 WARN round_robin_dispatcher: Received data_1
/// Aug 20 16:52:20.932 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:20.933 WARN round_robin_dispatcher: Received data_2
/// Aug 20 16:52:21.939 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:21.941 WARN round_robin_dispatcher: Received data_3
/// Aug 20 16:52:22.947 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:22.948 WARN round_robin_dispatcher: Received data_4
/// Aug 20 16:52:23.954 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:23.955 WARN round_robin_dispatcher: Received data_5
/// ```
fn main() {
// Initialize tracing logger
// so we get nice output on the console.
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
// We need bastion to run our program
Bastion::init();
// We create the supervisor and we add both groups on it
@ -57,8 +81,10 @@ fn caller_group(children: Children) -> Children {
let target = BroadcastTarget::Group("Receiver".to_string());
// We iterate on each data
for data in data_to_send {
Delay::new(Duration::from_secs(1)).await;
tracing::warn!("sending message");
// We broadcast the message containing the data to the defined target
ctx.broadcast_message(target.clone(), data)
ctx.broadcast_message(target.clone(), data);
}
// We stop bastion here, because we don't have more data to send
Bastion::stop();
@ -70,8 +96,6 @@ fn caller_group(children: Children) -> Children {
fn receiver_group(children: Children) -> Children {
// We create the second group of children
children
// We want to have 5 children in this group
.with_redundancy(5)
// We want to have a disptacher named `Receiver`
.with_dispatcher(Dispatcher::with_type(DispatcherType::Named(
"Receiver".to_string(),
@ -93,7 +117,7 @@ fn receiver_group(children: Children) -> Children {
// Because it's a broadcasted message we can use directly the ref
ref data: &str => {
// And we print it
println!("Received {}", data);
tracing::warn!("Received {}", data);
};
_: _ => ();
}

View File

@ -0,0 +1,131 @@
use crate::actor::mailbox::Mailbox;
use crate::actor::state_codes::ActorState;
use crate::message::*;
use crate::routing::path::*;
use async_channel::unbounded;
use lever::sync::atomics::AtomicBox;
use std::sync::Arc;
/// A structure that defines actor's state, mailbox with
/// messages and a local storage for user's data.
///
/// Each actor in Bastion has an attached context which
/// helps to understand what is the type of actor has been
/// launched in the system, its path, current execution state
/// and various data that can be attached to it.
pub struct Context<T>
where
T: TypedMessage,
{
/// Path to the actor in the system
path: ActorPath,
/// Mailbox of the actor
mailbox: Mailbox<T>,
/// Current execution state of the actor
state: Arc<AtomicBox<ActorState>>,
}
impl<T> Context<T>
where
T: TypedMessage,
{
// FIXME: Pass the correct system_rx instead of the fake one
pub(crate) fn new(path: ActorPath) -> Self {
let (_system_tx, system_rx) = unbounded();
let mailbox = Mailbox::new(system_rx);
let state = Arc::new(AtomicBox::new(ActorState::Init));
Context {
path,
mailbox,
state,
}
}
// Actor state machine.
//
// For more information about the actor's state machine
// see the actor/state_codes.rs module.
//
pub(crate) fn set_init(&self) {
self.state.replace_with(|_| ActorState::Init);
}
pub(crate) fn is_init(&self) -> bool {
*self.state.get() == ActorState::Init
}
pub(crate) fn set_sync(&self) {
self.state.replace_with(|_| ActorState::Sync);
}
pub(crate) fn is_sync(&self) -> bool {
*self.state.get() == ActorState::Sync
}
pub(crate) fn set_scheduled(&self) {
self.state.replace_with(|_| ActorState::Scheduled);
}
pub(crate) fn is_scheduled(&self) -> bool {
*self.state.get() == ActorState::Scheduled
}
pub(crate) fn set_awaiting(&self) {
self.state.replace_with(|_| ActorState::Awaiting);
}
pub(crate) fn is_awaiting(&self) -> bool {
*self.state.get() == ActorState::Awaiting
}
pub(crate) fn set_stopped(&self) {
self.state.replace_with(|_| ActorState::Stopped);
}
pub(crate) fn is_stopped(&self) -> bool {
*self.state.get() == ActorState::Stopped
}
pub(crate) fn set_terminated(&self) {
self.state.replace_with(|_| ActorState::Terminated);
}
pub(crate) fn is_terminated(&self) -> bool {
*self.state.get() == ActorState::Terminated
}
pub(crate) fn set_failed(&self) {
self.state.replace_with(|_| ActorState::Failed);
}
pub(crate) fn is_failed(&self) -> bool {
*self.state.get() == ActorState::Failed
}
pub(crate) fn set_finished(&self) {
self.state.replace_with(|_| ActorState::Finished);
}
pub(crate) fn is_finished(&self) -> bool {
*self.state.get() == ActorState::Finished
}
pub(crate) fn set_deinit(&self) {
self.state.replace_with(|_| ActorState::Deinit);
}
pub(crate) fn is_deinit(&self) -> bool {
*self.state.get() == ActorState::Deinit
}
pub(crate) fn set_removed(&self) {
self.state.replace_with(|_| ActorState::Removed);
}
pub(crate) fn is_removed(&self) -> bool {
*self.state.get() == ActorState::Removed
}
}

View File

@ -0,0 +1,93 @@
use crate::actor::traits::Actor;
use crate::routing::path::ActorPath;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
/// A structure that holds configuration of the Bastion actor.
#[derive(Clone)]
pub struct Definition {
/// The certain implementation of the Bastion actor that
/// needs to be spawned.
implementation: Arc<Box<dyn Actor>>,
/// Defines actors that must be spawned in the hierarchy
/// in the beginning of the actor's lifetime. The further
/// amount of children may vary in runtime a won't be
/// adjusted to the initial definition.
children: Vec<Definition>,
/// The path to the actor in the node.
path: ActorPath,
}
impl Definition {
/// Returns a new Definition instance.
pub fn new(implementation: impl Actor + 'static) -> Self {
let children = Vec::new();
let path = ActorPath::default();
Definition {
implementation: Arc::new(Box::new(implementation)),
children,
path,
}
}
/// Adds a single definition to the children list.
pub fn with_child(mut self, definition: Definition) -> Self {
self.children.push(definition);
self
}
/// Overrides the path on the user defined.
pub fn with_path(mut self, path: ActorPath) -> Self {
self.path = path;
self
}
}
impl Debug for Definition {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Definition")
.field("children", &self.children)
.field("path", &self.path)
.finish()
}
}
#[cfg(test)]
mod actor_path_tests {
use crate::actor::definition::Definition;
use crate::actor::traits::Actor;
use crate::message::Deployment::Children;
use crate::routing::path::ActorPath;
struct FakeParentActor;
impl Actor for FakeParentActor {}
struct FakeChildActor;
impl Actor for FakeChildActor {}
#[test]
fn test_default_definition_has_no_children() {
let definition = Definition::new(FakeChildActor);
assert_eq!(definition.children.is_empty(), true);
}
#[test]
fn test_set_custom_actor_path() {
let path = ActorPath::default().name("custom");
let definition = Definition::new(FakeChildActor).with_path(path.clone());
assert_eq!(definition.children.is_empty(), true);
assert_eq!(definition.path, path);
}
#[test]
fn test_add_relation_to_parent() {
let child_definition = Definition::new(FakeChildActor);
let definition = Definition::new(FakeParentActor).with_child(child_definition.clone());
assert_eq!(definition.children.is_empty(), false);
assert_eq!(definition.children.len(), 1);
}
}

View File

@ -2,62 +2,37 @@ use crate::actor::actor_ref::ActorRef;
use crate::actor::state_codes::*;
use crate::errors::*;
use crate::message::TypedMessage;
use async_channel::{Receiver, Sender};
use async_channel::{unbounded, Receiver, Sender};
use lever::sync::atomics::AtomicBox;
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub struct MailboxInner<T>
where
T: TypedMessage,
{
/// User guardian receiver
user_rx: Receiver<Envelope<T>>,
/// System guardian receiver
sys_rx: Receiver<Envelope<T>>,
/// Mailbox state machine
state: Arc<AtomicBox<MailboxState>>,
}
impl<T> MailboxInner<T>
where
T: TypedMessage,
{
/// User messages receiver channel
fn user_rx(&self) -> &Receiver<Envelope<T>> {
&self.user_rx
}
/// System messages receiver channel
fn sys_rx(&self) -> &Receiver<Envelope<T>> {
&self.sys_rx
}
/// Mailbox state
fn state(&self) -> &Arc<AtomicBox<MailboxState>> {
&self.state
}
}
/// Struct that represents a message sender.
#[derive(Clone)]
pub struct MailboxTx<T>
where
T: TypedMessage,
{
/// Indicated the transmitter part of the actor's channel
/// which is using for passing messages.
tx: Sender<Envelope<T>>,
/// A field for checks that the message has been delivered to
/// the specific actor.
scheduled: Arc<AtomicBool>,
}
unsafe impl<T> Send for MailboxTx<T> where T: TypedMessage {}
unsafe impl<T> Sync for MailboxTx<T> where T: TypedMessage {}
impl<T> MailboxTx<T>
where
T: TypedMessage,
{
/// Return a new instance of MailboxTx that indicates sender.
pub(crate) fn new(tx: Sender<Envelope<T>>) -> Self {
let scheduled = Arc::new(AtomicBool::new(false));
MailboxTx { tx, scheduled }
}
/// Send the message to the actor by the channel.
pub fn try_send(&self, msg: Envelope<T>) -> Result<()> {
self.tx
.try_send(msg)
@ -65,85 +40,153 @@ where
}
}
/// A struct that holds everything related to messages that can be
/// retrieved from other actors. Each actor holds two queues: one for
/// messages that come from user-defined actors, and another for
/// internal messaging that must be handled separately.
///
/// For each used queue, mailbox always holds the latest requested message
/// by a user, to guarantee that the message won't be lost if something
/// happens wrong.
#[derive(Clone)]
pub struct Mailbox<T>
where
T: TypedMessage,
{
inner: Arc<MailboxInner<T>>,
/// User guardian sender
user_tx: MailboxTx<T>,
/// User guardian receiver
user_rx: Receiver<Envelope<T>>,
/// System guardian receiver
system_rx: Receiver<Envelope<T>>,
/// The current processing message, received from the
/// latest call to the user's queue
last_user_message: Option<Envelope<T>>,
/// The current processing message, received from the
/// latest call to the system's queue
last_system_message: Option<Envelope<T>>,
/// Mailbox state machine
state: Arc<AtomicBox<MailboxState>>,
}
// TODO: Add calls with recv with timeout
impl<T> Mailbox<T>
where
T: TypedMessage,
{
/// Creates a new mailbox for the actor.
pub fn new(inner: Arc<MailboxInner<T>>) -> Self {
Mailbox { inner }
pub(crate) fn new(system_rx: Receiver<Envelope<T>>) -> Self {
let (tx, user_rx) = unbounded();
let user_tx = MailboxTx::new(tx);
let state = Arc::new(AtomicBox::new(MailboxState::Scheduled));
let last_user_message = None;
let last_system_message = None;
Mailbox {
user_tx,
user_rx,
system_rx,
last_user_message,
last_system_message,
state,
}
}
/// Forced receive message from user queue
pub async fn recv(&self) -> Envelope<T> {
self.inner
.user_rx()
pub async fn recv(&mut self) -> Envelope<T> {
let message = self
.user_rx
.recv()
.await
.map_err(|e| BError::ChanRecv(e.to_string()))
.unwrap()
.unwrap();
self.last_user_message = Some(message);
self.last_user_message.clone().unwrap()
}
/// Try receiving message from user queue
pub async fn try_recv(&self) -> Result<Envelope<T>> {
self.inner
.user_rx()
.try_recv()
.map_err(|e| BError::ChanRecv(e.to_string()))
pub async fn try_recv(&mut self) -> Result<Envelope<T>> {
if self.last_user_message.is_some() {
return Err(BError::UnackedMessage);
}
match self.user_rx.try_recv() {
Ok(message) => {
self.last_user_message = Some(message);
Ok(self.last_user_message.clone().unwrap())
}
Err(e) => Err(BError::ChanRecv(e.to_string())),
}
}
/// Forced receive message from user queue
pub async fn sys_recv(&self) -> Envelope<T> {
self.inner
.sys_rx()
/// Forced receive message from system queue
pub async fn sys_recv(&mut self) -> Envelope<T> {
let message = self
.system_rx
.recv()
.await
.map_err(|e| BError::ChanRecv(e.to_string()))
.unwrap()
.unwrap();
self.last_system_message = Some(message);
self.last_system_message.clone().unwrap()
}
/// Try receiving message from user queue
pub async fn try_sys_recv(&self) -> Result<Envelope<T>> {
self.inner
.sys_rx()
.try_recv()
.map_err(|e| BError::ChanRecv(e.to_string()))
/// Try receiving message from system queue
pub async fn try_sys_recv(&mut self) -> Result<Envelope<T>> {
if self.last_system_message.is_some() {
return Err(BError::UnackedMessage);
}
match self.system_rx.try_recv() {
Ok(message) => {
self.last_system_message = Some(message);
Ok(self.last_system_message.clone().unwrap())
}
Err(e) => Err(BError::ChanRecv(e.to_string())),
}
}
//////////////////////
///// Mailbox state machine
//////////////////////
/// Returns the last retrieved message from the user channel
pub async fn get_last_user_message(&self) -> Option<Envelope<T>> {
self.last_user_message.clone()
}
/// Returns the last retrieved message from the system channel
pub async fn get_last_system_message(&self) -> Option<Envelope<T>> {
self.last_system_message.clone()
}
//
// Mailbox state machine
//
// For more information about the actor's state machine
// see the actor/state_codes.rs module.
//
pub(crate) fn set_scheduled(&self) {
self.inner.state().replace_with(|_| MailboxState::Scheduled);
self.state.replace_with(|_| MailboxState::Scheduled);
}
pub(crate) fn is_scheduled(&self) -> bool {
*self.inner.state().get() == MailboxState::Scheduled
*self.state.get() == MailboxState::Scheduled
}
pub(crate) fn set_sent(&self) {
self.inner.state().replace_with(|_| MailboxState::Sent);
self.state.replace_with(|_| MailboxState::Sent);
}
pub(crate) fn is_sent(&self) -> bool {
*self.inner.state().get() == MailboxState::Sent
*self.state.get() == MailboxState::Sent
}
pub(crate) fn set_awaiting(&self) {
self.inner.state().replace_with(|_| MailboxState::Awaiting);
self.state.replace_with(|_| MailboxState::Awaiting);
}
pub(crate) fn is_awaiting(&self) -> bool {
*self.inner.state().get() == MailboxState::Awaiting
*self.state.get() == MailboxState::Awaiting
}
}
@ -219,3 +262,5 @@ where
.finish()
}
}
// TODO: Add tests

View File

@ -1,5 +1,6 @@
mod actor_ref;
#[allow(unsafe_code)]
mod mailbox;
mod state;
mod state_codes;
pub mod actor_ref;
pub mod context;
pub mod definition;
pub mod mailbox;
pub mod state_codes;
pub mod traits;

View File

@ -1,74 +0,0 @@
use super::mailbox::*;
use crate::actor::state_codes::ActorState;
use crate::message::*;
use crate::routing::path::*;
use lever::sync::atomics::AtomicBox;
use std::sync::Arc;
pub struct ActorCell {}
///
/// State data of the actor
pub struct ActorStateData<T>
where
T: TypedMessage,
{
/// Mailbox of the actor
mailbox: MailboxTx<T>,
/// State of the actor
state: Arc<AtomicBox<ActorState>>,
}
impl<T> ActorStateData<T>
where
T: TypedMessage,
{
pub(crate) fn new(_path: ActorPath) -> Self {
todo!()
}
//////////////////////
///// Actor state machine
//////////////////////
pub(crate) fn set_init(&self) {
self.state.replace_with(|_| ActorState::Init);
}
pub(crate) fn is_init(&self) -> bool {
*self.state.get() == ActorState::Init
}
pub(crate) fn set_sync(&self) {
self.state.replace_with(|_| ActorState::Sync);
}
pub(crate) fn is_sync(&self) -> bool {
*self.state.get() == ActorState::Sync
}
pub(crate) fn set_scheduled(&self) {
self.state.replace_with(|_| ActorState::Scheduled);
}
pub(crate) fn is_scheduled(&self) -> bool {
*self.state.get() == ActorState::Scheduled
}
pub(crate) fn set_awaiting(&self) {
self.state.replace_with(|_| ActorState::Awaiting);
}
pub(crate) fn is_awaiting(&self) -> bool {
*self.state.get() == ActorState::Awaiting
}
pub(crate) fn set_deinit(&self) {
self.state.replace_with(|_| ActorState::Deinit);
}
pub(crate) fn is_deinit(&self) -> bool {
*self.state.get() == ActorState::Deinit
}
}

View File

@ -5,7 +5,7 @@
/// The whole lifecycle of the message can be described by the
/// next schema:
///
/// +------ Message processed -----+
/// +------ Message processed ----+
/// ↓ |
/// Scheduled -> Sent -> Awaiting ---+
/// ↑ |
@ -27,23 +27,52 @@ pub(crate) enum MailboxState {
/// The whole state machine of the actor can be represented by
/// the following schema:
///
/// Init -> Sync -> Scheduled -> Awaiting -> Deinit
/// ↑ |
/// +------------+
/// +---> Stopped ----+
/// | |
/// | |
/// Init -> Sync -> Scheduled -+---> Terminated -+---> Deinit -> Removed
/// ↑ | | |
/// | ↓ | |
/// Awaiting +---> Failed -----+
/// | |
/// | |
/// +---> Finished ---+
///
pub(crate) enum ActorState {
/// This is the first state for the actors,
/// right after the creation, but the actors wasn't started retrieving
/// messages and doing work (e.g. registering themselves in dispatchers)
/// The first state for actors. This state is the initial point
/// after being created or added to the Bastion node in runtime.
/// At this stage, actor isn't doing any useful job and retrieving
/// any messages from other parts of the cluster yet.
/// However, it can do some initialization steps (e.g. register itself
/// in dispatchers or adding the initial data to the local state),
/// before being available to the rest of the cluster.
Init,
/// Remote or local state synchronization, this behaves like a half state
/// Remote or local state synchronization. this behaves like a half state
/// to converging consensus between multiple actor states.
Sync,
/// Currently scheduler is processing this actor's mailbox.
/// The main state in which actor can stay for indefinite amount of time.
/// During this state, actor doing useful work (e.g. processing the incoming
/// message from other actors) that doesn't require any asynchronous calls.
Scheduled,
/// Answer is awaited currently.
/// Special kind of the scheduled state which help to understand that
/// actor is awaiting for other futures or response messages from other
/// actors in the Bastion cluster.
Awaiting,
/// State representing removing the actors from the cluster, unregistering
/// from dispatchers, and started to hit their etc.
/// Actor has been stopped by the system or a user's call.
Stopped,
/// Actor has been terminated by the system or a user's call.
Terminated,
/// Actor has stopped doing any useful work because of a raised panic
/// or user's error during the execution.
Failed,
/// Actor has completed an execution with the success.
Finished,
/// The deinitialization state for the actor. During this stage the actor
/// must unregister itself from the node, used dispatchers and any other
/// parts where was initialized in the beginning. Can contain an additional
/// user logic before being removed from the cluster.
Deinit,
/// The final state of the actor. The actor can be removed
/// gracefully from the node, because is not available anymore.
Removed,
}

View File

@ -0,0 +1 @@
pub trait Actor {}

View File

@ -19,9 +19,28 @@ pub struct ChildRef {
sender: Sender,
name: String,
path: Arc<BastionPath>,
// True if the ChildRef references a child that will receive user defined messages.
// use `ChildRef::new_internal` to set it to false, for internal use children,
// such as the heartbeat children for example
is_public: bool,
}
impl ChildRef {
pub(crate) fn new_internal(
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
) -> ChildRef {
ChildRef {
id,
sender,
name,
path,
is_public: false,
}
}
pub(crate) fn new(
id: BastionId,
sender: Sender,
@ -33,6 +52,7 @@ impl ChildRef {
sender,
name,
path,
is_public: true,
}
}
@ -67,6 +87,38 @@ impl ChildRef {
&self.id
}
/// Returns true if the child this `ChildRef` is referencing is public,
/// Which means it can receive messages. private `ChildRef`s
/// reference bastion internal children, such as the heartbeat child for example.
/// This function comes in handy when implementing your own dispatchers.
///
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx| {
/// async move {
/// if ctx.current().is_public() {
/// // ...
/// }
/// # Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
pub fn is_public(&self) -> bool {
self.is_public
}
/// Sends a message to the child this `ChildRef` is referencing.
/// This message is intended to be used outside of Bastion context when
/// there is no way for receiver to identify message sender

View File

@ -453,26 +453,24 @@ impl Children {
/// # use bastion::prelude::*;
/// # use std::time::Duration;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
/// [`std::time::Duration`]: https://doc.rust-lang.org/nightly/core/time/struct.Duration.html
pub fn with_heartbeat_tick(mut self, interval: Duration) -> Self {
@ -936,7 +934,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), name, path);
let child_ref = ChildRef::new_internal(id.clone(), sender.clone(), name, path);
let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();

View File

@ -9,10 +9,10 @@ use lever::prelude::*;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
};
use tracing::{trace, warn};
use tracing::{debug, trace, warn};
/// Type alias for the concurrency hashmap. Each key-value pair stores
/// the Bastion identifier as the key and the module name as the value.
@ -66,7 +66,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler;
/// Dispatcher that will do simple round-robin distribution
#[derive(Default, Debug)]
pub struct RoundRobinHandler {
index: AtomicU64,
index: AtomicUsize,
}
impl DispatcherHandler for RoundRobinHandler {
@ -80,25 +80,27 @@ impl DispatcherHandler for RoundRobinHandler {
}
// Each child in turn will receive a message.
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>) {
if entries.len() == 0 {
let entries = entries
.iter()
.filter(|entry| entry.0.is_public())
.collect::<Vec<_>>();
if entries.is_empty() {
debug!("no public children to broadcast message to");
return;
}
let current_index = self.index.load(Ordering::SeqCst) % entries.len();
let current_index = self.index.load(Ordering::SeqCst) % entries.len() as u64;
let mut skipped = 0;
for pair in entries.iter() {
if skipped != current_index {
skipped += 1;
continue;
}
let entry = pair.0;
entry.tell_anonymously(message.clone()).unwrap();
break;
}
self.index.store(current_index + 1, Ordering::SeqCst);
if let Some(entry) = entries.get(current_index) {
warn!(
"sending message to child {}/{} - {}",
current_index + 1,
entries.len(),
entry.0.path()
);
entry.0.tell_anonymously(message.clone()).unwrap();
self.index.store(current_index + 1, Ordering::SeqCst);
};
}
}
/// Generic trait which any custom dispatcher handler must implement for

View File

@ -1,5 +1,3 @@
use std::result;
use std::time::Duration;
@ -10,6 +8,7 @@ pub enum BError {
Receive(ReceiveError),
ChanSend(String),
ChanRecv(String),
UnackedMessage,
}
#[derive(Debug)]

View File

@ -1,6 +1,6 @@
use crate::routing::ActorPath;
use lazy_static::lazy_static;
use regex::{escape, CaptureMatches, Captures, Regex};
use regex::Regex;
lazy_static! {
static ref WILDCARD_REGEX: Regex = Regex::new(r"(\*/)").unwrap();

View File

@ -189,13 +189,13 @@ pub enum RestartPolicy {
///
/// The default strategy used is `ActorRestartStrategy::Immediate`
/// with the `RestartPolicy::Always` restart policy.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct RestartStrategy {
restart_policy: RestartPolicy,
strategy: ActorRestartStrategy,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
/// The strategy for restating an actor as far as it
/// returned an failure.
///
@ -219,10 +219,31 @@ pub enum ActorRestartStrategy {
/// An initial delay before the restarting an actor.
timeout: Duration,
/// Defines a multiplier how fast the timeout will be increasing.
multiplier: u64,
multiplier: f64,
},
}
impl ActorRestartStrategy {
/// Calculate the expected restart delay for given strategy after n restarts.
pub fn calculate(&self, restarts_count: usize) -> Option<Duration> {
match *self {
ActorRestartStrategy::LinearBackOff { timeout } => {
let delay = timeout.mul_f64(restarts_count as f64);
Some(timeout + delay)
}
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let factor = multiplier * restarts_count as f64;
let delay = timeout.mul_f64(factor);
Some(timeout + delay)
}
_ => None,
}
}
}
impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
@ -733,7 +754,7 @@ impl Supervisor {
/// .with_actor_restart_strategy(
/// ActorRestartStrategy::ExponentialBackOff {
/// timeout: Duration::from_millis(5000),
/// multiplier: 3,
/// multiplier: 3.0,
/// }
/// )
/// )
@ -1892,21 +1913,9 @@ impl RestartStrategy {
}
pub(crate) async fn apply_strategy(&self, restarts_count: usize) {
match self.strategy {
ActorRestartStrategy::LinearBackOff { timeout } => {
let start_in = timeout.as_secs() + (timeout.as_secs() * restarts_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let start_in =
timeout.as_secs() + (timeout.as_secs() * multiplier * restarts_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
_ => {}
};
if let Some(dur) = self.strategy.calculate(restarts_count) {
Delay::new(dur).await;
}
}
}

View File

@ -43,3 +43,61 @@ fn override_restart_strategy_and_policy() {
assert_eq!(restart_strategy.restart_policy(), policy);
assert_eq!(restart_strategy.strategy(), strategy);
}
#[test]
fn calculate_immediate_strategy() {
let strategy = ActorRestartStrategy::Immediate;
assert_eq!(strategy.calculate(0), None);
assert_eq!(strategy.calculate(1), None);
assert_eq!(strategy.calculate(100), None);
}
#[test]
fn calculate_linear_strategy() {
let strategy = ActorRestartStrategy::LinearBackOff {
timeout: Duration::from_millis(100),
};
assert_eq!(strategy.calculate(0), Some(Duration::from_millis(100)));
assert_eq!(
strategy.calculate(1),
Some(Duration::from_millis(100 + 1 * 100))
);
assert_eq!(
strategy.calculate(99),
Some(Duration::from_millis(100 + 99 * 100))
);
}
#[test]
fn calculate_exp_strategy_with_multiplier_zero() {
let strategy = ActorRestartStrategy::ExponentialBackOff {
timeout: Duration::from_millis(100),
multiplier: 0.0,
};
assert_eq!(strategy.calculate(0), Some(Duration::from_millis(100)));
assert_eq!(strategy.calculate(1), Some(Duration::from_millis(100)));
assert_eq!(strategy.calculate(100), Some(Duration::from_millis(100)));
}
#[test]
fn calculate_exp_strategy_with_multiplier_non_zero() {
let strategy = ActorRestartStrategy::ExponentialBackOff {
timeout: Duration::from_millis(100),
multiplier: 5.0,
};
assert_eq!(strategy.calculate(0), Some(Duration::from_millis(100)));
assert_eq!(
strategy.calculate(1),
Some(Duration::from_millis(100 + 1 * 5 * 100))
);
assert_eq!(
strategy.calculate(99),
Some(Duration::from_millis(100 + 99 * 5 * 100))
);
}