Compare commits

...

3 Commits

Author SHA1 Message Date
Valeryi Savich fc43221edf Added tests for custom defintions 2021-07-18 13:59:10 +02:00
Valeryi Savich bf06b6d88a Added support for sized mailboxes for actors 2021-07-18 13:52:53 +02:00
Valeryi Savich 97f8214086 Replaced async-channel onto flume 2021-07-18 12:47:52 +02:00
7 changed files with 179 additions and 17 deletions

View File

@ -45,11 +45,11 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]
[dependencies]
async-channel = "1.6.1"
async-trait = "0.1.50"
async-mutex = "1.4.0"
bastion-executor = "0.4.1"
crossbeam = "0.8.1"
flume = "0.10.7"
thiserror = "1.0.25"
once_cell = "1.8.0"
lever = "0.1.1"

View File

@ -1,12 +1,12 @@
use std::sync::Arc;
use async_channel::{unbounded, Sender};
use flume::{unbounded, Sender};
use crate::actor::local_state::LocalState;
use crate::actor::state::ActorState;
use crate::mailbox::envelope::Envelope;
use crate::mailbox::mailbox::{Mailbox, MailboxSize};
use crate::mailbox::message::Message;
use crate::mailbox::Mailbox;
use crate::routing::path::ActorPath;
/// A structure that defines actor's state, mailbox with
@ -28,11 +28,11 @@ pub struct Context {
}
impl Context {
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope>) {
pub(crate) fn new(path: ActorPath, mailbox_size: MailboxSize) -> (Self, Sender<Envelope>) {
let (system_tx, system_rx) = unbounded();
let path = Arc::new(path);
let mailbox = Mailbox::new(system_rx);
let mailbox = Mailbox::new(mailbox_size, system_rx);
let local_state = LocalState::new();
let internal_state = ActorState::new();

View File

@ -10,4 +10,6 @@ pub enum BastionError {
ChanSend(String),
#[error("The message cannot be received from the channel. Reason: {0}")]
ChanRecv(String),
#[error("The actors channel is empty.")]
EmptyChannel,
}

View File

@ -36,6 +36,11 @@ impl Envelope {
}
}
/// Returns a reference to the sender (if was declared).
pub fn sender(&self) -> Option<ActorRef> {
self.sender.clone()
}
/// Returns a message type. Can be use for pattern matching and filtering
/// incoming message from other actors.
pub fn message_type(&self) -> MessageType {

View File

@ -1,13 +1,23 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use async_channel::{unbounded, Receiver, Sender};
use flume::{bounded, unbounded, Receiver, Sender, TryRecvError};
use crate::error::{BastionError, Result};
use crate::mailbox::envelope::Envelope;
use crate::mailbox::message::Message;
use crate::mailbox::state::MailboxState;
/// An enum that defines the mailbox boundaries.
#[derive(Debug, Eq, PartialEq)]
pub enum MailboxSize {
/// Store only max N messages. Any other incoming messages
/// that can't be put in the mailbox will be ignored.
Limited(usize),
/// No limits for messages. Default value.
Unlimited,
}
/// Struct that represents a message sender.
#[derive(Clone)]
pub struct MailboxTx {
@ -57,8 +67,11 @@ pub struct Mailbox {
// TODO: Add calls with recv with timeout
impl Mailbox {
/// Creates a new mailbox for the actor.
pub(crate) fn new(system_rx: Receiver<Envelope>) -> Self {
let (tx, actor_rx) = unbounded();
pub(crate) fn new(mailbox_size: MailboxSize, system_rx: Receiver<Envelope>) -> Self {
let (tx, actor_rx) = match mailbox_size {
MailboxSize::Limited(limit) => bounded(limit),
MailboxSize::Unlimited => unbounded(),
};
let actor_tx = MailboxTx::new(tx);
let state = Arc::new(MailboxState::new());
@ -70,10 +83,15 @@ impl Mailbox {
}
}
/// Returns an actor sender to the caller.
pub(crate) fn get_actor_rx(&self) -> Sender<Envelope> {
self.actor_tx.tx.clone()
}
/// Forced receive message from the actor's queue.
pub async fn recv(&mut self) -> Envelope {
self.actor_rx
.recv()
.recv_async()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap()
@ -81,15 +99,16 @@ impl Mailbox {
/// Try receiving message from the actor's queue.
pub async fn try_recv(&mut self) -> Result<Envelope> {
self.actor_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
self.actor_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => BastionError::EmptyChannel,
_ => BastionError::ChanRecv(e.to_string()),
})
}
/// Forced receive message from the internal system queue.
pub async fn sys_recv(&mut self) -> Envelope {
self.system_rx
.recv()
.recv_async()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap()
@ -97,8 +116,106 @@ impl Mailbox {
/// Try receiving message from the internal system queue.
pub async fn try_sys_recv(&mut self) -> Result<Envelope> {
self.system_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
self.system_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => BastionError::EmptyChannel,
_ => BastionError::ChanRecv(e.to_string()),
})
}
}
#[cfg(test)]
mod mailbox_tests {
use flume::unbounded;
use tokio_test::block_on;
use crate::error::Result;
use crate::mailbox::envelope::Envelope;
use crate::mailbox::mailbox::{Mailbox, MailboxSize};
use crate::mailbox::message::{Message, MessageType};
#[test]
fn test_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.recv());
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.try_recv()).unwrap();
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_recv_returns_error_on_empty_channel() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
assert_eq!(result.is_none(), true);
}
#[test]
fn test_sys_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(system_tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.sys_recv());
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_sys_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(system_tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.try_sys_recv()).unwrap();
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_sys_recv_returns_error_on_empty_channel() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
assert_eq!(result.is_none(), true);
}
}

View File

@ -4,5 +4,5 @@ pub mod message;
pub(crate) mod mailbox;
pub(crate) mod state;
pub use crate::mailbox::mailbox::Mailbox;
pub use crate::mailbox::mailbox::MailboxSize;
pub use crate::mailbox::message::{Message, MessageType};

View File

@ -5,6 +5,7 @@ use std::sync::Arc;
use uuid::Uuid;
use crate::actor::traits::Actor;
use crate::mailbox::MailboxSize;
use crate::routing::path::{ActorPath, Scope};
type CustomActorNameFn = dyn Fn() -> String + Send + 'static;
@ -21,6 +22,8 @@ pub struct Definition {
actor_name_fn: Option<Arc<CustomActorNameFn>>,
/// Defines how much actors must be instantiated in the beginning.
redundancy: usize,
/// Defines the boundaries of the actor's mailbox.
mailbox_size: MailboxSize,
}
impl Definition {
@ -31,6 +34,7 @@ impl Definition {
let actor_name_fn = None;
let redundancy = 1;
let actor = None;
let mailbox_size = MailboxSize::Unlimited;
Definition {
name,
@ -38,6 +42,7 @@ impl Definition {
scope,
actor_name_fn,
redundancy,
mailbox_size,
}
}
@ -81,6 +86,12 @@ impl Definition {
self
}
/// Overrides a default channel size for the actor's mailbox.
pub fn mailbox_size(mut self, custom_mailbox_size: MailboxSize) -> Self {
self.mailbox_size = custom_mailbox_size;
self
}
pub(crate) fn generate_actor_path(&self) -> ActorPath {
match &self.actor_name_fn {
Some(func) => {
@ -120,6 +131,7 @@ mod tests {
use crate::actor::context::Context;
use crate::actor::traits::Actor;
use crate::error::Result;
use crate::mailbox::MailboxSize;
use crate::routing::path::Scope;
use crate::system::definition::Definition;
@ -149,6 +161,7 @@ mod tests {
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_none(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
}
#[test]
@ -159,6 +172,7 @@ mod tests {
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_none(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
}
#[test]
@ -169,6 +183,7 @@ mod tests {
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_some(), false);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
}
#[test]
@ -178,6 +193,7 @@ mod tests {
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_some(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
let actor_path = instance.generate_actor_path();
assert_eq!(actor_path.to_string(), "bastion://node/user/Actor_1");
@ -195,6 +211,7 @@ mod tests {
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_some(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
let actor_path = instance.generate_actor_path();
assert_eq!(actor_path.to_string(), "bastion://node/user/Actor_1");
@ -215,9 +232,30 @@ mod tests {
assert_eq!(instance.scope, Scope::Temporary);
assert_eq!(instance.actor_name_fn.is_some(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
let actor_path = instance.generate_actor_path();
assert_eq!(actor_path.to_string(), "bastion://node/temporary/Actor_1");
assert_eq!(actor_path.is_temporary_scope(), true);
}
#[test]
fn test_definition_with_custom_redundancy() {
let instance = Definition::new().redundancy(5);
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_none(), true);
assert_eq!(instance.redundancy, 5);
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
}
#[test]
fn test_definition_with_custom_mailbox_size() {
let instance = Definition::new().mailbox_size(MailboxSize::Limited(5));
assert_eq!(instance.scope, Scope::User);
assert_eq!(instance.actor_name_fn.is_none(), true);
assert_eq!(instance.redundancy, 1);
assert_eq!(instance.mailbox_size, MailboxSize::Limited(5));
}
}