mirror of https://github.com/bastion-rs/bastion
Compare commits
2 Commits
66b2325e8a
...
d79afe82c0
Author | SHA1 | Date |
---|---|---|
Valeryi Savich | d79afe82c0 | |
Valeryi Savich | 2eac8214c3 |
|
@ -47,6 +47,7 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
|
|||
[dependencies]
|
||||
async-channel = "1.6.1"
|
||||
async-trait = "0.1.50"
|
||||
async-mutex = "1.4.0"
|
||||
bastion-executor = "0.4.1"
|
||||
crossbeam = "0.8.1"
|
||||
thiserror = "1.0.25"
|
||||
|
@ -57,3 +58,4 @@ regex = "1.5.4"
|
|||
uuid = { version = "0.8.2", features = ["v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4.2"
|
||||
|
|
|
@ -20,7 +20,7 @@ pub struct Context {
|
|||
/// Path to the actor in the system
|
||||
path: Arc<ActorPath>,
|
||||
/// Mailbox of the actor
|
||||
mailbox: Mailbox<Box<dyn Message>>,
|
||||
mailbox: Mailbox,
|
||||
/// Local storage for actor's data
|
||||
local_state: LocalState,
|
||||
/// Current execution state of the actor
|
||||
|
@ -28,7 +28,7 @@ pub struct Context {
|
|||
}
|
||||
|
||||
impl Context {
|
||||
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope<impl Message>>) {
|
||||
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope>) {
|
||||
let (system_tx, system_rx) = unbounded();
|
||||
|
||||
let path = Arc::new(path);
|
||||
|
|
|
@ -5,6 +5,9 @@ use crate::error::Result;
|
|||
|
||||
#[async_trait]
|
||||
pub trait Actor: Sync {
|
||||
fn new() -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
async fn on_init(&self, _ctx: &mut Context) {}
|
||||
async fn on_sync(&self, _ctx: &mut Context) {}
|
||||
async fn on_stopped(&self, _ctx: &mut Context) {}
|
||||
|
|
|
@ -1,33 +1,33 @@
|
|||
use std::cell::RefCell;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_mutex::Mutex;
|
||||
|
||||
use crate::actor::actor_ref::ActorRef;
|
||||
use crate::mailbox::message::{Message, MessageType};
|
||||
|
||||
/// Struct that represents an incoming message in the actor's mailbox.
|
||||
#[derive(Clone)]
|
||||
pub struct Envelope<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
pub struct Envelope {
|
||||
/// The sending side of a channel. In actor's world
|
||||
/// represented is a message sender. Can be used
|
||||
/// for acking message when it possible.
|
||||
sender: Option<ActorRef>,
|
||||
/// An actual data sent by the channel
|
||||
message: RefCell<Option<T>>,
|
||||
message: Arc<Mutex<Option<Box<dyn Message>>>>,
|
||||
/// Message type that helps to figure out how to deliver message
|
||||
/// and how to ack it after the processing.
|
||||
message_type: MessageType,
|
||||
}
|
||||
|
||||
impl<T> Envelope<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
impl Envelope {
|
||||
/// Create a message with the given sender and inner data.
|
||||
pub fn new(sender: Option<ActorRef>, data: T, message_type: MessageType) -> Self {
|
||||
let message = RefCell::new(Some(data));
|
||||
pub fn new(
|
||||
sender: Option<ActorRef>,
|
||||
data: Box<dyn Message>,
|
||||
message_type: MessageType,
|
||||
) -> Self {
|
||||
let message = Arc::new(Mutex::new(Some(data)));
|
||||
|
||||
Envelope {
|
||||
sender,
|
||||
|
@ -44,8 +44,9 @@ where
|
|||
|
||||
/// Extracts the message data and returns it to the caller. Each further
|
||||
/// method call will return `None`.
|
||||
pub fn read(&self) -> Option<T> {
|
||||
self.message.replace(None)
|
||||
pub async fn read(&self) -> Option<Box<dyn Message>> {
|
||||
let mut guard = self.message.lock().await;
|
||||
guard.take()
|
||||
}
|
||||
|
||||
// TODO: Return a boolean flag once operation has finished?
|
||||
|
@ -59,10 +60,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Debug for Envelope<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
impl Debug for Envelope {
|
||||
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("Message")
|
||||
.field("message", &self.message)
|
||||
|
@ -87,19 +85,19 @@ mod envelope_tests {
|
|||
|
||||
#[test]
|
||||
fn test_message_read() {
|
||||
let message_data = FakeMessage::new();
|
||||
let message_data = Box::new(FakeMessage::new());
|
||||
let instance = Envelope::new(None, message_data, MessageType::Tell);
|
||||
|
||||
let expected_data = instance.read();
|
||||
let expected_data = tokio_test::block_on(instance.read());
|
||||
assert_eq!(expected_data.is_some(), true);
|
||||
|
||||
let another_read_attempt_data = instance.read();
|
||||
let another_read_attempt_data = tokio_test::block_on(instance.read());
|
||||
assert_eq!(another_read_attempt_data.is_none(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_match_against_ask_message_type() {
|
||||
let message_data = FakeMessage::new();
|
||||
let message_data = Box::new(FakeMessage::new());
|
||||
let instance = Envelope::new(None, message_data, MessageType::Ask);
|
||||
|
||||
assert_eq!(instance.message_type, MessageType::Ask);
|
||||
|
@ -107,7 +105,7 @@ mod envelope_tests {
|
|||
|
||||
#[test]
|
||||
fn test_match_against_broadcast_message_type() {
|
||||
let message_data = FakeMessage::new();
|
||||
let message_data = Box::new(FakeMessage::new());
|
||||
let instance = Envelope::new(None, message_data, MessageType::Broadcast);
|
||||
|
||||
assert_eq!(instance.message_type, MessageType::Broadcast);
|
||||
|
@ -115,7 +113,7 @@ mod envelope_tests {
|
|||
|
||||
#[test]
|
||||
fn test_match_against_tell_message_type() {
|
||||
let message_data = FakeMessage::new();
|
||||
let message_data = Box::new(FakeMessage::new());
|
||||
let instance = Envelope::new(None, message_data, MessageType::Tell);
|
||||
|
||||
assert_eq!(instance.message_type, MessageType::Tell);
|
||||
|
|
|
@ -10,30 +10,24 @@ use crate::mailbox::state::MailboxState;
|
|||
|
||||
/// Struct that represents a message sender.
|
||||
#[derive(Clone)]
|
||||
pub struct MailboxTx<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
pub struct MailboxTx {
|
||||
/// Indicated the transmitter part of the actor's channel
|
||||
/// which is using for passing messages.
|
||||
tx: Sender<Envelope<T>>,
|
||||
tx: Sender<Envelope>,
|
||||
/// A field for checks that the message has been delivered to
|
||||
/// the specific actor.
|
||||
scheduled: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T> MailboxTx<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
impl MailboxTx {
|
||||
/// Return a new instance of MailboxTx that indicates sender.
|
||||
pub(crate) fn new(tx: Sender<Envelope<T>>) -> Self {
|
||||
pub(crate) fn new(tx: Sender<Envelope>) -> Self {
|
||||
let scheduled = Arc::new(AtomicBool::new(false));
|
||||
MailboxTx { tx, scheduled }
|
||||
}
|
||||
|
||||
/// Send the message to the actor by the channel.
|
||||
pub fn try_send(&self, msg: Envelope<T>) -> Result<()> {
|
||||
pub fn try_send(&self, msg: Envelope) -> Result<()> {
|
||||
self.tx
|
||||
.try_send(msg)
|
||||
.map_err(|e| BastionError::ChanSend(e.to_string()))
|
||||
|
@ -49,27 +43,21 @@ where
|
|||
/// by a user, to guarantee that the message won't be lost if something
|
||||
/// happens wrong.
|
||||
#[derive(Clone)]
|
||||
pub struct Mailbox<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
pub struct Mailbox {
|
||||
/// Actor guardian sender
|
||||
actor_tx: MailboxTx<T>,
|
||||
actor_tx: MailboxTx,
|
||||
/// Actor guardian receiver
|
||||
actor_rx: Receiver<Envelope<T>>,
|
||||
actor_rx: Receiver<Envelope>,
|
||||
/// System guardian receiver
|
||||
system_rx: Receiver<Envelope<T>>,
|
||||
system_rx: Receiver<Envelope>,
|
||||
/// Mailbox state machine
|
||||
state: Arc<MailboxState>,
|
||||
}
|
||||
|
||||
// TODO: Add calls with recv with timeout
|
||||
impl<T> Mailbox<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
impl Mailbox {
|
||||
/// Creates a new mailbox for the actor.
|
||||
pub(crate) fn new(system_rx: Receiver<Envelope<T>>) -> Self {
|
||||
pub(crate) fn new(system_rx: Receiver<Envelope>) -> Self {
|
||||
let (tx, actor_rx) = unbounded();
|
||||
let actor_tx = MailboxTx::new(tx);
|
||||
let state = Arc::new(MailboxState::new());
|
||||
|
@ -83,7 +71,7 @@ where
|
|||
}
|
||||
|
||||
/// Forced receive message from the actor's queue.
|
||||
pub async fn recv(&mut self) -> Envelope<T> {
|
||||
pub async fn recv(&mut self) -> Envelope {
|
||||
self.actor_rx
|
||||
.recv()
|
||||
.await
|
||||
|
@ -92,14 +80,14 @@ where
|
|||
}
|
||||
|
||||
/// Try receiving message from the actor's queue.
|
||||
pub async fn try_recv(&mut self) -> Result<Envelope<T>> {
|
||||
pub async fn try_recv(&mut self) -> Result<Envelope> {
|
||||
self.actor_rx
|
||||
.try_recv()
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
}
|
||||
|
||||
/// Forced receive message from the internal system queue.
|
||||
pub async fn sys_recv(&mut self) -> Envelope<T> {
|
||||
pub async fn sys_recv(&mut self) -> Envelope {
|
||||
self.system_rx
|
||||
.recv()
|
||||
.await
|
||||
|
@ -108,7 +96,7 @@ where
|
|||
}
|
||||
|
||||
/// Try receiving message from the internal system queue.
|
||||
pub async fn try_sys_recv(&mut self) -> Result<Envelope<T>> {
|
||||
pub async fn try_sys_recv(&mut self) -> Result<Envelope> {
|
||||
self.system_rx
|
||||
.try_recv()
|
||||
.map_err(|e| BastionError::ChanRecv(e.to_string()))
|
||||
|
|
Loading…
Reference in New Issue