mirror of https://github.com/bastion-rs/bastion
Compare commits
3 Commits
8dadf99c2b
...
fc43221edf
Author | SHA1 | Date |
---|---|---|
Valeryi Savich | fc43221edf | |
Valeryi Savich | bf06b6d88a | |
Valeryi Savich | 97f8214086 |
|
@ -45,11 +45,11 @@ features = ["docs"]
|
|||
rustdoc-args = ["--cfg", "feature=\"docs\""]
|
||||
|
||||
[dependencies]
|
||||
async-channel = "1.6.1"
|
||||
async-trait = "0.1.50"
|
||||
async-mutex = "1.4.0"
|
||||
bastion-executor = "0.4.1"
|
||||
crossbeam = "0.8.1"
|
||||
flume = "0.10.7"
|
||||
thiserror = "1.0.25"
|
||||
once_cell = "1.8.0"
|
||||
lever = "0.1.1"
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_channel::{unbounded, Sender};
|
||||
use flume::{unbounded, Sender};
|
||||
|
||||
use crate::actor::local_state::LocalState;
|
||||
use crate::actor::state::ActorState;
|
||||
use crate::mailbox::envelope::Envelope;
|
||||
use crate::mailbox::mailbox::{Mailbox, MailboxSize};
|
||||
use crate::mailbox::message::Message;
|
||||
use crate::mailbox::Mailbox;
|
||||
use crate::routing::path::ActorPath;
|
||||
|
||||
/// A structure that defines actor's state, mailbox with
|
||||
|
@ -28,11 +28,11 @@ pub struct Context {
|
|||
}
|
||||
|
||||
impl Context {
|
||||
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope>) {
|
||||
pub(crate) fn new(path: ActorPath, mailbox_size: MailboxSize) -> (Self, Sender<Envelope>) {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
|
||||
let path = Arc::new(path);
|
||||
let mailbox = Mailbox::new(system_rx);
|
||||
let mailbox = Mailbox::new(mailbox_size, system_rx);
|
||||
let local_state = LocalState::new();
|
||||
let internal_state = ActorState::new();
|
||||
|
||||
|
|
|
@ -10,4 +10,6 @@ pub enum BastionError {
|
|||
ChanSend(String),
|
||||
#[error("The message cannot be received from the channel. Reason: {0}")]
|
||||
ChanRecv(String),
|
||||
#[error("The actors channel is empty.")]
|
||||
EmptyChannel,
|
||||
}
|
||||
|
|
|
@ -36,6 +36,11 @@ impl Envelope {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the sender (if was declared).
|
||||
pub fn sender(&self) -> Option<ActorRef> {
|
||||
self.sender.clone()
|
||||
}
|
||||
|
||||
/// Returns a message type. Can be use for pattern matching and filtering
|
||||
/// incoming message from other actors.
|
||||
pub fn message_type(&self) -> MessageType {
|
||||
|
|
|
@ -1,13 +1,23 @@
|
|||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_channel::{unbounded, Receiver, Sender};
|
||||
use flume::{bounded, unbounded, Receiver, Sender, TryRecvError};
|
||||
|
||||
use crate::error::{BastionError, Result};
|
||||
use crate::mailbox::envelope::Envelope;
|
||||
use crate::mailbox::message::Message;
|
||||
use crate::mailbox::state::MailboxState;
|
||||
|
||||
/// An enum that defines the mailbox boundaries.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub enum MailboxSize {
|
||||
/// Store only max N messages. Any other incoming messages
|
||||
/// that can't be put in the mailbox will be ignored.
|
||||
Limited(usize),
|
||||
/// No limits for messages. Default value.
|
||||
Unlimited,
|
||||
}
|
||||
|
||||
/// Struct that represents a message sender.
|
||||
#[derive(Clone)]
|
||||
pub struct MailboxTx {
|
||||
|
@ -57,8 +67,11 @@ pub struct Mailbox {
|
|||
// TODO: Add calls with recv with timeout
|
||||
impl Mailbox {
|
||||
/// Creates a new mailbox for the actor.
|
||||
pub(crate) fn new(system_rx: Receiver<Envelope>) -> Self {
|
||||
let (tx, actor_rx) = unbounded();
|
||||
pub(crate) fn new(mailbox_size: MailboxSize, system_rx: Receiver<Envelope>) -> Self {
|
||||
let (tx, actor_rx) = match mailbox_size {
|
||||
MailboxSize::Limited(limit) => bounded(limit),
|
||||
MailboxSize::Unlimited => unbounded(),
|
||||
};
|
||||
let actor_tx = MailboxTx::new(tx);
|
||||
let state = Arc::new(MailboxState::new());
|
||||
|
||||
|
@ -70,10 +83,15 @@ impl Mailbox {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns an actor sender to the caller.
|
||||
pub(crate) fn get_actor_rx(&self) -> Sender<Envelope> {
|
||||
self.actor_tx.tx.clone()
|
||||
}
|
||||
|
||||
/// Forced receive message from the actor's queue.
|
||||
pub async fn recv(&mut self) -> Envelope {
|
||||
self.actor_rx
|
||||
.recv()
|
||||
.recv_async()
|
||||
.await
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
.unwrap()
|
||||
|
@ -81,15 +99,16 @@ impl Mailbox {
|
|||
|
||||
/// Try receiving message from the actor's queue.
|
||||
pub async fn try_recv(&mut self) -> Result<Envelope> {
|
||||
self.actor_rx
|
||||
.try_recv()
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
self.actor_rx.try_recv().map_err(|e| match e {
|
||||
TryRecvError::Empty => BastionError::EmptyChannel,
|
||||
_ => BastionError::ChanRecv(e.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Forced receive message from the internal system queue.
|
||||
pub async fn sys_recv(&mut self) -> Envelope {
|
||||
self.system_rx
|
||||
.recv()
|
||||
.recv_async()
|
||||
.await
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
.unwrap()
|
||||
|
@ -97,8 +116,106 @@ impl Mailbox {
|
|||
|
||||
/// Try receiving message from the internal system queue.
|
||||
pub async fn try_sys_recv(&mut self) -> Result<Envelope> {
|
||||
self.system_rx
|
||||
.try_recv()
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
self.system_rx.try_recv().map_err(|e| match e {
|
||||
TryRecvError::Empty => BastionError::EmptyChannel,
|
||||
_ => BastionError::ChanRecv(e.to_string()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod mailbox_tests {
|
||||
use flume::unbounded;
|
||||
use tokio_test::block_on;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::mailbox::envelope::Envelope;
|
||||
use crate::mailbox::mailbox::{Mailbox, MailboxSize};
|
||||
use crate::mailbox::message::{Message, MessageType};
|
||||
|
||||
#[test]
|
||||
fn test_recv() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
|
||||
|
||||
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
|
||||
let incoming_env = tokio_test::block_on(instance.recv());
|
||||
let actor_ref = incoming_env.sender();
|
||||
let message = tokio_test::block_on(incoming_env.read());
|
||||
let message_type = incoming_env.message_type();
|
||||
|
||||
assert_eq!(actor_ref.is_none(), true);
|
||||
assert_eq!(message.is_some(), true);
|
||||
assert_eq!(message_type, MessageType::Tell);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_recv() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
|
||||
|
||||
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
|
||||
let incoming_env = tokio_test::block_on(instance.try_recv()).unwrap();
|
||||
let actor_ref = incoming_env.sender();
|
||||
let message = tokio_test::block_on(incoming_env.read());
|
||||
let message_type = incoming_env.message_type();
|
||||
|
||||
assert_eq!(actor_ref.is_none(), true);
|
||||
assert_eq!(message.is_some(), true);
|
||||
assert_eq!(message_type, MessageType::Tell);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_recv_returns_error_on_empty_channel() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
|
||||
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
|
||||
assert_eq!(result.is_none(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sys_recv() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
|
||||
|
||||
tokio_test::block_on(system_tx.send_async(envelope));
|
||||
let incoming_env = tokio_test::block_on(instance.sys_recv());
|
||||
let actor_ref = incoming_env.sender();
|
||||
let message = tokio_test::block_on(incoming_env.read());
|
||||
let message_type = incoming_env.message_type();
|
||||
|
||||
assert_eq!(actor_ref.is_none(), true);
|
||||
assert_eq!(message.is_some(), true);
|
||||
assert_eq!(message_type, MessageType::Tell);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_sys_recv() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
|
||||
|
||||
tokio_test::block_on(system_tx.send_async(envelope));
|
||||
let incoming_env = tokio_test::block_on(instance.try_sys_recv()).unwrap();
|
||||
let actor_ref = incoming_env.sender();
|
||||
let message = tokio_test::block_on(incoming_env.read());
|
||||
let message_type = incoming_env.message_type();
|
||||
|
||||
assert_eq!(actor_ref.is_none(), true);
|
||||
assert_eq!(message.is_some(), true);
|
||||
assert_eq!(message_type, MessageType::Tell);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_sys_recv_returns_error_on_empty_channel() {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
let mut instance = Mailbox::new(MailboxSize::Unlimited, system_rx);
|
||||
|
||||
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
|
||||
assert_eq!(result.is_none(), true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,5 +4,5 @@ pub mod message;
|
|||
pub(crate) mod mailbox;
|
||||
pub(crate) mod state;
|
||||
|
||||
pub use crate::mailbox::mailbox::Mailbox;
|
||||
pub use crate::mailbox::mailbox::MailboxSize;
|
||||
pub use crate::mailbox::message::{Message, MessageType};
|
||||
|
|
|
@ -5,6 +5,7 @@ use std::sync::Arc;
|
|||
use uuid::Uuid;
|
||||
|
||||
use crate::actor::traits::Actor;
|
||||
use crate::mailbox::MailboxSize;
|
||||
use crate::routing::path::{ActorPath, Scope};
|
||||
|
||||
type CustomActorNameFn = dyn Fn() -> String + Send + 'static;
|
||||
|
@ -21,6 +22,8 @@ pub struct Definition {
|
|||
actor_name_fn: Option<Arc<CustomActorNameFn>>,
|
||||
/// Defines how much actors must be instantiated in the beginning.
|
||||
redundancy: usize,
|
||||
/// Defines the boundaries of the actor's mailbox.
|
||||
mailbox_size: MailboxSize,
|
||||
}
|
||||
|
||||
impl Definition {
|
||||
|
@ -31,6 +34,7 @@ impl Definition {
|
|||
let actor_name_fn = None;
|
||||
let redundancy = 1;
|
||||
let actor = None;
|
||||
let mailbox_size = MailboxSize::Unlimited;
|
||||
|
||||
Definition {
|
||||
name,
|
||||
|
@ -38,6 +42,7 @@ impl Definition {
|
|||
scope,
|
||||
actor_name_fn,
|
||||
redundancy,
|
||||
mailbox_size,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,6 +86,12 @@ impl Definition {
|
|||
self
|
||||
}
|
||||
|
||||
/// Overrides a default channel size for the actor's mailbox.
|
||||
pub fn mailbox_size(mut self, custom_mailbox_size: MailboxSize) -> Self {
|
||||
self.mailbox_size = custom_mailbox_size;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn generate_actor_path(&self) -> ActorPath {
|
||||
match &self.actor_name_fn {
|
||||
Some(func) => {
|
||||
|
@ -120,6 +131,7 @@ mod tests {
|
|||
use crate::actor::context::Context;
|
||||
use crate::actor::traits::Actor;
|
||||
use crate::error::Result;
|
||||
use crate::mailbox::MailboxSize;
|
||||
use crate::routing::path::Scope;
|
||||
use crate::system::definition::Definition;
|
||||
|
||||
|
@ -149,6 +161,7 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_none(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -159,6 +172,7 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_none(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -169,6 +183,7 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_some(), false);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -178,6 +193,7 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_some(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
|
||||
let actor_path = instance.generate_actor_path();
|
||||
assert_eq!(actor_path.to_string(), "bastion://node/user/Actor_1");
|
||||
|
@ -195,6 +211,7 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_some(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
|
||||
let actor_path = instance.generate_actor_path();
|
||||
assert_eq!(actor_path.to_string(), "bastion://node/user/Actor_1");
|
||||
|
@ -215,9 +232,30 @@ mod tests {
|
|||
assert_eq!(instance.scope, Scope::Temporary);
|
||||
assert_eq!(instance.actor_name_fn.is_some(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
|
||||
let actor_path = instance.generate_actor_path();
|
||||
assert_eq!(actor_path.to_string(), "bastion://node/temporary/Actor_1");
|
||||
assert_eq!(actor_path.is_temporary_scope(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_definition_with_custom_redundancy() {
|
||||
let instance = Definition::new().redundancy(5);
|
||||
|
||||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_none(), true);
|
||||
assert_eq!(instance.redundancy, 5);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Unlimited);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_definition_with_custom_mailbox_size() {
|
||||
let instance = Definition::new().mailbox_size(MailboxSize::Limited(5));
|
||||
|
||||
assert_eq!(instance.scope, Scope::User);
|
||||
assert_eq!(instance.actor_name_fn.is_none(), true);
|
||||
assert_eq!(instance.redundancy, 1);
|
||||
assert_eq!(instance.mailbox_size, MailboxSize::Limited(5));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue