Replaced async-channel onto flume

This commit is contained in:
Valeryi Savich 2021-07-18 12:47:52 +02:00
parent 8dadf99c2b
commit 97f8214086
5 changed files with 122 additions and 11 deletions

View File

@ -45,11 +45,11 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]
[dependencies]
async-channel = "1.6.1"
async-trait = "0.1.50"
async-mutex = "1.4.0"
bastion-executor = "0.4.1"
crossbeam = "0.8.1"
flume = "0.10.7"
thiserror = "1.0.25"
once_cell = "1.8.0"
lever = "0.1.1"

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use async_channel::{unbounded, Sender};
use flume::{unbounded, Sender};
use crate::actor::local_state::LocalState;
use crate::actor::state::ActorState;

View File

@ -10,4 +10,6 @@ pub enum BastionError {
ChanSend(String),
#[error("The message cannot be received from the channel. Reason: {0}")]
ChanRecv(String),
#[error("The actors channel is empty.")]
EmptyChannel,
}

View File

@ -36,6 +36,11 @@ impl Envelope {
}
}
/// Returns a reference to the sender (if was declared).
pub fn sender(&self) -> Option<ActorRef> {
self.sender.clone()
}
/// Returns a message type. Can be use for pattern matching and filtering
/// incoming message from other actors.
pub fn message_type(&self) -> MessageType {

View File

@ -1,7 +1,7 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use async_channel::{unbounded, Receiver, Sender};
use flume::{unbounded, Receiver, Sender, TryRecvError};
use crate::error::{BastionError, Result};
use crate::mailbox::envelope::Envelope;
@ -70,10 +70,15 @@ impl Mailbox {
}
}
/// Returns an actor sender to the caller.
pub(crate) fn get_actor_rx(&self) -> Sender<Envelope> {
self.actor_tx.tx.clone()
}
/// Forced receive message from the actor's queue.
pub async fn recv(&mut self) -> Envelope {
self.actor_rx
.recv()
.recv_async()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap()
@ -81,15 +86,16 @@ impl Mailbox {
/// Try receiving message from the actor's queue.
pub async fn try_recv(&mut self) -> Result<Envelope> {
self.actor_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
self.actor_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => BastionError::EmptyChannel,
_ => BastionError::ChanRecv(e.to_string()),
})
}
/// Forced receive message from the internal system queue.
pub async fn sys_recv(&mut self) -> Envelope {
self.system_rx
.recv()
.recv_async()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap()
@ -97,8 +103,106 @@ impl Mailbox {
/// Try receiving message from the internal system queue.
pub async fn try_sys_recv(&mut self) -> Result<Envelope> {
self.system_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
self.system_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => BastionError::EmptyChannel,
_ => BastionError::ChanRecv(e.to_string()),
})
}
}
#[cfg(test)]
mod envelope_tests {
use flume::unbounded;
use tokio_test::block_on;
use crate::error::Result;
use crate::mailbox::envelope::Envelope;
use crate::mailbox::message::{Message, MessageType};
use crate::mailbox::Mailbox;
#[test]
fn test_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.recv());
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(instance.actor_tx.tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.try_recv()).unwrap();
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_recv_returns_error_on_empty_channel() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
assert_eq!(result.is_none(), true);
}
#[test]
fn test_sys_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(system_tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.sys_recv());
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_sys_recv() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let envelope = Envelope::new(None, Box::new(1), MessageType::Tell);
tokio_test::block_on(system_tx.send_async(envelope));
let incoming_env = tokio_test::block_on(instance.try_sys_recv()).unwrap();
let actor_ref = incoming_env.sender();
let message = tokio_test::block_on(incoming_env.read());
let message_type = incoming_env.message_type();
assert_eq!(actor_ref.is_none(), true);
assert_eq!(message.is_some(), true);
assert_eq!(message_type, MessageType::Tell);
}
#[test]
fn test_try_sys_recv_returns_error_on_empty_channel() {
let (system_tx, system_rx) = unbounded();
let mut instance = Mailbox::new(system_rx);
let result = tokio_test::block_on(instance.try_sys_recv()).ok();
assert_eq!(result.is_none(), true);
}
}