Compare commits

...

2 Commits

Author SHA1 Message Date
Valeryi Savich d79afe82c0 Fixed tests with async calls 2021-07-16 23:26:56 +02:00
Valeryi Savich 2eac8214c3 Fixed issuew with reading a message from envelopes 2021-07-16 22:57:31 +02:00
5 changed files with 44 additions and 53 deletions

View File

@ -47,6 +47,7 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[dependencies]
async-channel = "1.6.1"
async-trait = "0.1.50"
async-mutex = "1.4.0"
bastion-executor = "0.4.1"
crossbeam = "0.8.1"
thiserror = "1.0.25"
@ -57,3 +58,4 @@ regex = "1.5.4"
uuid = { version = "0.8.2", features = ["v4"] }
[dev-dependencies]
tokio-test = "0.4.2"

View File

@ -20,7 +20,7 @@ pub struct Context {
/// Path to the actor in the system
path: Arc<ActorPath>,
/// Mailbox of the actor
mailbox: Mailbox<Box<dyn Message>>,
mailbox: Mailbox,
/// Local storage for actor's data
local_state: LocalState,
/// Current execution state of the actor
@ -28,7 +28,7 @@ pub struct Context {
}
impl Context {
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope<impl Message>>) {
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope>) {
let (system_tx, system_rx) = unbounded();
let path = Arc::new(path);

View File

@ -5,6 +5,9 @@ use crate::error::Result;
#[async_trait]
pub trait Actor: Sync {
fn new() -> Self
where
Self: Sized;
async fn on_init(&self, _ctx: &mut Context) {}
async fn on_sync(&self, _ctx: &mut Context) {}
async fn on_stopped(&self, _ctx: &mut Context) {}

View File

@ -1,33 +1,33 @@
use std::cell::RefCell;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use async_mutex::Mutex;
use crate::actor::actor_ref::ActorRef;
use crate::mailbox::message::{Message, MessageType};
/// Struct that represents an incoming message in the actor's mailbox.
#[derive(Clone)]
pub struct Envelope<T>
where
T: Message,
{
pub struct Envelope {
/// The sending side of a channel. In actor's world
/// represented is a message sender. Can be used
/// for acking message when it possible.
sender: Option<ActorRef>,
/// An actual data sent by the channel
message: RefCell<Option<T>>,
message: Arc<Mutex<Option<Box<dyn Message>>>>,
/// Message type that helps to figure out how to deliver message
/// and how to ack it after the processing.
message_type: MessageType,
}
impl<T> Envelope<T>
where
T: Message,
{
impl Envelope {
/// Create a message with the given sender and inner data.
pub fn new(sender: Option<ActorRef>, data: T, message_type: MessageType) -> Self {
let message = RefCell::new(Some(data));
pub fn new(
sender: Option<ActorRef>,
data: Box<dyn Message>,
message_type: MessageType,
) -> Self {
let message = Arc::new(Mutex::new(Some(data)));
Envelope {
sender,
@ -44,8 +44,9 @@ where
/// Extracts the message data and returns it to the caller. Each further
/// method call will return `None`.
pub fn read(&self) -> Option<T> {
self.message.replace(None)
pub async fn read(&self) -> Option<Box<dyn Message>> {
let mut guard = self.message.lock().await;
guard.take()
}
// TODO: Return a boolean flag once operation has finished?
@ -59,10 +60,7 @@ where
}
}
impl<T> Debug for Envelope<T>
where
T: Message,
{
impl Debug for Envelope {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Message")
.field("message", &self.message)
@ -87,19 +85,19 @@ mod envelope_tests {
#[test]
fn test_message_read() {
let message_data = FakeMessage::new();
let message_data = Box::new(FakeMessage::new());
let instance = Envelope::new(None, message_data, MessageType::Tell);
let expected_data = instance.read();
let expected_data = tokio_test::block_on(instance.read());
assert_eq!(expected_data.is_some(), true);
let another_read_attempt_data = instance.read();
let another_read_attempt_data = tokio_test::block_on(instance.read());
assert_eq!(another_read_attempt_data.is_none(), true);
}
#[test]
fn test_match_against_ask_message_type() {
let message_data = FakeMessage::new();
let message_data = Box::new(FakeMessage::new());
let instance = Envelope::new(None, message_data, MessageType::Ask);
assert_eq!(instance.message_type, MessageType::Ask);
@ -107,7 +105,7 @@ mod envelope_tests {
#[test]
fn test_match_against_broadcast_message_type() {
let message_data = FakeMessage::new();
let message_data = Box::new(FakeMessage::new());
let instance = Envelope::new(None, message_data, MessageType::Broadcast);
assert_eq!(instance.message_type, MessageType::Broadcast);
@ -115,7 +113,7 @@ mod envelope_tests {
#[test]
fn test_match_against_tell_message_type() {
let message_data = FakeMessage::new();
let message_data = Box::new(FakeMessage::new());
let instance = Envelope::new(None, message_data, MessageType::Tell);
assert_eq!(instance.message_type, MessageType::Tell);

View File

@ -10,30 +10,24 @@ use crate::mailbox::state::MailboxState;
/// Struct that represents a message sender.
#[derive(Clone)]
pub struct MailboxTx<T>
where
T: Message,
{
pub struct MailboxTx {
/// Indicated the transmitter part of the actor's channel
/// which is using for passing messages.
tx: Sender<Envelope<T>>,
tx: Sender<Envelope>,
/// A field for checks that the message has been delivered to
/// the specific actor.
scheduled: Arc<AtomicBool>,
}
impl<T> MailboxTx<T>
where
T: Message,
{
impl MailboxTx {
/// Return a new instance of MailboxTx that indicates sender.
pub(crate) fn new(tx: Sender<Envelope<T>>) -> Self {
pub(crate) fn new(tx: Sender<Envelope>) -> Self {
let scheduled = Arc::new(AtomicBool::new(false));
MailboxTx { tx, scheduled }
}
/// Send the message to the actor by the channel.
pub fn try_send(&self, msg: Envelope<T>) -> Result<()> {
pub fn try_send(&self, msg: Envelope) -> Result<()> {
self.tx
.try_send(msg)
.map_err(|e| BastionError::ChanSend(e.to_string()))
@ -49,27 +43,21 @@ where
/// by a user, to guarantee that the message won't be lost if something
/// happens wrong.
#[derive(Clone)]
pub struct Mailbox<T>
where
T: Message,
{
pub struct Mailbox {
/// Actor guardian sender
actor_tx: MailboxTx<T>,
actor_tx: MailboxTx,
/// Actor guardian receiver
actor_rx: Receiver<Envelope<T>>,
actor_rx: Receiver<Envelope>,
/// System guardian receiver
system_rx: Receiver<Envelope<T>>,
system_rx: Receiver<Envelope>,
/// Mailbox state machine
state: Arc<MailboxState>,
}
// TODO: Add calls with recv with timeout
impl<T> Mailbox<T>
where
T: Message,
{
impl Mailbox {
/// Creates a new mailbox for the actor.
pub(crate) fn new(system_rx: Receiver<Envelope<T>>) -> Self {
pub(crate) fn new(system_rx: Receiver<Envelope>) -> Self {
let (tx, actor_rx) = unbounded();
let actor_tx = MailboxTx::new(tx);
let state = Arc::new(MailboxState::new());
@ -83,7 +71,7 @@ where
}
/// Forced receive message from the actor's queue.
pub async fn recv(&mut self) -> Envelope<T> {
pub async fn recv(&mut self) -> Envelope {
self.actor_rx
.recv()
.await
@ -92,14 +80,14 @@ where
}
/// Try receiving message from the actor's queue.
pub async fn try_recv(&mut self) -> Result<Envelope<T>> {
pub async fn try_recv(&mut self) -> Result<Envelope> {
self.actor_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
}
/// Forced receive message from the internal system queue.
pub async fn sys_recv(&mut self) -> Envelope<T> {
pub async fn sys_recv(&mut self) -> Envelope {
self.system_rx
.recv()
.await
@ -108,7 +96,7 @@ where
}
/// Try receiving message from the internal system queue.
pub async fn try_sys_recv(&mut self) -> Result<Envelope<T>> {
pub async fn try_sys_recv(&mut self) -> Result<Envelope> {
self.system_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))