proposal: distributor request fn (#321)

* proposal: distributor request fn

* go async by default, add a sync variant backed by mpsc::channel

* let the example runwith or without the tokio-runtime feature

* fix: sometimes the children weren't attached to the dispatcher

* use RwLock for now

* add an after_start callback to know exactly when the children have spawned and the distributor is ready to receive and dispatch messages to everyone

* wip

* merge tests

* split the tokio and the regular test runner so that both pass
This commit is contained in:
Jeremy Lempereur 2021-04-13 08:31:20 +02:00 committed by GitHub
parent 1c4ffb3e3f
commit 90b8a0918d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 713 additions and 155 deletions

View File

@ -80,9 +80,19 @@ struct ConferenceSchedule {
misc: String,
}
/// cargo r --features=tokio-runtime distributor
/// cargo r --features=tokio-runtime --example distributor
#[cfg(feature = "tokio-runtime")]
#[tokio::main]
async fn main() -> AnyResult<()> {
run()
}
#[cfg(not(feature = "tokio-runtime"))]
fn main() -> AnyResult<()> {
run()
}
fn run() -> AnyResult<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.finish();
@ -119,23 +129,22 @@ async fn main() -> AnyResult<()> {
Bastion::start();
// Wait a bit until everyone is ready
// std::thread::sleep(std::time::Duration::from_secs(1));
sleep(std::time::Duration::from_secs(5));
let staff = Distributor::named("staff");
let enthusiasts = Distributor::named("enthusiasts");
let attendees = Distributor::named("attendees");
// Enthusiast -> Ask one of the staff members "when is the conference going to happen ?"
let answer = staff.ask_one("when is the next conference going to happen?")?;
MessageHandler::new(
answer
let reply: Result<String, SendError> = run!(async {
staff
.request("when is the next conference going to happen?")
.await
.expect("coulnd't find out when the next conference is going to happen :("),
)
.on_tell(|reply: String, _sender_addr| {
tracing::info!("received a reply to my message:\n{}", reply);
.expect("couldn't receive reply")
});
tracing::error!("{:?}", reply); // Ok("Next month!")
// "hey conference <awesomeconference> is going to happen. will you be there?"
// Broadcast / Question -> if people reply with YES => fill the 3rd group
let answers = enthusiasts
@ -143,23 +152,25 @@ async fn main() -> AnyResult<()> {
.expect("couldn't ask everyone");
for answer in answers.into_iter() {
MessageHandler::new(answer.await.expect("couldn't receive reply"))
.on_tell(|rsvp: RSVP, _| {
if rsvp.attends {
tracing::info!("{:?} will be there! :)", rsvp.child_ref.id());
attendees
.subscribe(rsvp.child_ref)
.expect("couldn't subscribe attendee");
} else {
tracing::error!("{:?} won't make it :(", rsvp.child_ref.id());
}
})
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"distributor_test: uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
run!(async move {
MessageHandler::new(answer.await.expect("couldn't receive reply"))
.on_tell(|rsvp: RSVP, _| {
if rsvp.attends {
tracing::info!("{:?} will be there! :)", rsvp.child_ref.id());
attendees
.subscribe(rsvp.child_ref)
.expect("couldn't subscribe attendee");
} else {
tracing::error!("{:?} won't make it :(", rsvp.child_ref.id());
}
})
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"distributor_test: uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
});
}
// Ok now that attendees have subscribed, let's send information around!
@ -176,14 +187,15 @@ async fn main() -> AnyResult<()> {
tracing::error!("total number of attendees: {}", total_sent.len());
tracing::info!("the conference is running!");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// Let's wait until the conference is over 8D
sleep(std::time::Duration::from_secs(5));
// An attendee sends a thank you note to one staff member (and not bother everyone)
staff
.tell_one("the conference was amazing thank you so much!")
.context("couldn't thank the staff members :(")?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// And we're done!
Bastion::stop();
@ -198,9 +210,7 @@ async fn organize_the_event(ctx: BastionContext) -> Result<(), ()> {
MessageHandler::new(ctx.recv().await?)
.on_question(|message: &str, sender| {
tracing::info!("received a question: \n{}", message);
sender
.reply("uh i think it will be next month!".to_string())
.unwrap();
sender.reply("Next month!".to_string()).unwrap();
})
.on_tell(|message: &str, _| {
tracing::info!("received a message: \n{}", message);
@ -243,3 +253,15 @@ async fn be_interested_in_the_conference(ctx: BastionContext) -> Result<(), ()>
});
}
}
#[cfg(feature = "tokio-runtime")]
fn sleep(duration: std::time::Duration) {
run!(async {
tokio::time::sleep(duration).await;
});
}
#[cfg(not(feature = "tokio-runtime"))]
fn sleep(duration: std::time::Duration) {
std::thread::sleep(duration);
}

View File

@ -8,6 +8,7 @@ pub(crate) enum CallbackType {
AfterStop,
BeforeRestart,
BeforeStart,
AfterStart,
}
#[derive(Default, Clone)]
@ -22,12 +23,12 @@ pub(crate) enum CallbackType {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -60,6 +61,7 @@ pub(crate) enum CallbackType {
/// [`Children`]: crate::children::Children
pub struct Callbacks {
before_start: Option<Arc<dyn Fn() + Send + Sync>>,
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
before_restart: Option<Arc<dyn Fn() + Send + Sync>>,
after_restart: Option<Arc<dyn Fn() + Send + Sync>>,
after_stop: Option<Arc<dyn Fn() + Send + Sync>>,
@ -77,12 +79,12 @@ impl Callbacks {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -136,12 +138,12 @@ impl Callbacks {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -191,6 +193,74 @@ impl Callbacks {
self
}
/// Sets the method that will get called right after the [`Supervisor`]
/// or [`Children`] is launched.
/// This method will be called after the child has subscribed to its distributors and dispatchers.
///
/// Once the callback has run, the child has caught up it's message backlog,
/// and is waiting for new messages to process.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # }
/// #
/// # fn run() {
/// # Bastion::init();
/// #
/// # Bastion::supervisor(|supervisor| {
/// supervisor.children(|children| {
/// let callbacks = Callbacks::new()
/// .with_after_start(|| println!("Children group ready to process messages."));
///
/// children
/// .with_exec(|ctx| {
/// // -- Children group started.
/// // with_after_start called
/// async move {
/// // ...
///
/// // This will stop the children group...
/// Ok(())
/// // Note that because the children group stopped by itself,
/// // if its supervisor restarts it, its `before_start` callback
/// // will get called and not `after_restart`.
/// }
/// // -- Children group stopped.
/// })
/// .with_callbacks(callbacks)
/// })
/// # }).unwrap();
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`Supervisor`]: crate::supervisor::Supervisor
/// [`Children`]: crate::children::Children
/// [`with_after_restart`]: Self::with_after_restart
pub fn with_after_start<C>(mut self, after_start: C) -> Self
where
C: Fn() + Send + Sync + 'static,
{
let after_start = Arc::new(after_start);
self.after_start = Some(after_start);
self
}
/// Sets the method that will get called before the [`Supervisor`]
/// or [`Children`] is reset if:
/// - the supervisor of the supervised element using this callback
@ -208,12 +278,12 @@ impl Callbacks {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -282,12 +352,12 @@ impl Callbacks {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -360,12 +430,12 @@ impl Callbacks {
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # run();
/// # }
/// #
/// # fn run() {
@ -493,6 +563,12 @@ impl Callbacks {
}
}
pub(crate) fn after_start(&self) {
if let Some(after_start) = &self.after_start {
after_start()
}
}
pub(crate) fn before_restart(&self) {
if let Some(before_restart) = &self.before_restart {
before_restart()

View File

@ -283,6 +283,7 @@ impl Child {
CallbackType::BeforeRestart => self.callbacks.before_restart(),
CallbackType::AfterRestart => self.callbacks.after_restart(),
CallbackType::AfterStop => self.callbacks.after_stop(),
CallbackType::AfterStart => self.callbacks.after_start(),
}
}
@ -312,6 +313,8 @@ impl Child {
return;
};
self.callbacks.after_start();
loop {
#[cfg(feature = "scaling")]
self.update_stats().await;
@ -416,7 +419,7 @@ impl Child {
distributors
.iter()
.map(|&distributor| {
global_dispatcher.register_recipient(distributor, child_ref.clone())
global_dispatcher.register_recipient(&distributor, child_ref.clone())
})
.collect::<AnyResult<Vec<_>>>()?;
}

View File

@ -2,63 +2,15 @@
//! Allows users to communicate with Child through the mailboxes.
use crate::context::BastionId;
use crate::envelope::{Envelope, RefAddr};
use crate::{broadcast::Sender, message::Msg};
use crate::{
distributor::Distributor,
message::{Answer, BastionMessage, Message},
};
use crate::{path::BastionPath, system::STRING_INTERNER};
use futures::channel::mpsc::TrySendError;
use crate::message::{Answer, BastionMessage, Message};
use crate::path::BastionPath;
use crate::{broadcast::Sender, prelude::SendError};
use std::cmp::{Eq, PartialEq};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use thiserror::Error;
use tracing::{debug, trace};
#[derive(Error, Debug)]
/// `SendError`s occur when a message couldn't be dispatched through a distributor
pub enum SendError {
#[error("couldn't send message. Channel Disconnected.")]
/// Channel has been closed before we could send a message
Disconnected(Msg),
#[error("couldn't send message. Channel is Full.")]
/// Channel is full, can't send a message
Full(Msg),
#[error("couldn't send a message I should have not sent. {0}")]
/// This error is returned when we try to send a message
/// that is not a BastionMessage::Message variant
Other(anyhow::Error),
#[error("No available Distributor matching {0}")]
/// The distributor we're trying to dispatch messages to is not registered in the system
NoDistributor(String),
#[error("Distributor has 0 Recipients")]
/// The distributor we're trying to dispatch messages to has no recipients
EmptyRecipient,
}
impl From<TrySendError<Envelope>> for SendError {
fn from(tse: TrySendError<Envelope>) -> Self {
let is_disconnected = tse.is_disconnected();
match tse.into_inner().msg {
BastionMessage::Message(msg) => {
if is_disconnected {
Self::Disconnected(msg)
} else {
Self::Full(msg)
}
}
other => Self::Other(anyhow::anyhow!("{:?}", other)),
}
}
}
impl From<Distributor> for SendError {
fn from(distributor: Distributor) -> Self {
Self::NoDistributor(STRING_INTERNER.resolve(distributor.interned()).to_string())
}
}
#[derive(Debug, Clone)]
/// A "reference" to an element of a children group, allowing to
/// communicate with it.

View File

@ -468,6 +468,8 @@ impl Children {
/// ```
/// [`RecipientHandler`]: crate::dispatcher::RecipientHandler
pub fn with_distributor(mut self, distributor: Distributor) -> Self {
// Try to register the distributor as soon as we're aware of it
let _ = SYSTEM.dispatcher().register_distributor(&distributor);
self.distributors.push(distributor);
self
}
@ -1159,6 +1161,16 @@ impl Children {
Ok(())
}
/// Registers all declared local distributors in the global dispatcher.
pub(crate) fn register_distributors(&self) -> AnyResult<()> {
let global_dispatcher = SYSTEM.dispatcher();
for distributor in self.distributors.iter() {
global_dispatcher.register_distributor(distributor)?;
}
Ok(())
}
/// Removes all declared local distributors from the global dispatcher.
pub(crate) fn remove_distributors(&self) -> AnyResult<()> {
let global_dispatcher = SYSTEM.dispatcher();

View File

@ -851,14 +851,11 @@ mod context_tests {
Bastion::init();
Bastion::start();
run_test(test_recv);
run_test(test_try_recv);
run_test(test_try_recv_fail);
run_test(test_try_recv_timeout);
run_test(test_try_recv_timeout_fail);
Bastion::stop();
Bastion::block_until_stopped();
test_recv();
test_try_recv();
test_try_recv_fail();
test_try_recv_timeout();
test_try_recv_timeout_fail();
}
fn test_recv() {
@ -917,7 +914,7 @@ mod context_tests {
let children =
Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(1)).await.expect("recv_timeout failed"),
msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(5)).await.expect("recv_timeout failed"),
ref msg: &'static str => {
assert_eq!(msg, &"test recv timeout");
};
@ -949,15 +946,6 @@ mod context_tests {
run!(async { Delay::new(std::time::Duration::from_millis(2)).await });
// The child panicked, but we should still be able to send things to it
assert!(children.broadcast("test recv timeout").is_ok());
}
fn run_test<T>(test: T) -> ()
where
T: FnOnce() -> () + panic::UnwindSafe,
{
let result = panic::catch_unwind(|| test());
assert!(result.is_ok())
children.broadcast("test recv timeout").unwrap();
}
}

View File

@ -3,18 +3,23 @@
//! group of actors through the dispatchers that holds information about
//! actors grouped together.
use crate::{
child_ref::{ChildRef, SendError},
child_ref::ChildRef,
message::{Answer, Message},
prelude::SendError,
};
use crate::{distributor::Distributor, envelope::SignedMessage};
use anyhow::Result as AnyResult;
use lever::prelude::*;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::RwLock;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::{
collections::HashMap,
fmt::{self, Debug},
};
use tracing::{debug, trace};
/// Type alias for the concurrency hashmap. Each key-value pair stores
@ -345,7 +350,8 @@ impl Into<DispatcherType> for String {
pub(crate) struct GlobalDispatcher {
/// Storage for all registered group of actors.
pub dispatchers: LOTable<DispatcherType, Arc<Box<Dispatcher>>>,
pub distributors: LOTable<Distributor, Arc<Box<(dyn RecipientHandler)>>>,
// TODO: switch to LOTable once lever implements write optimized granularity
pub distributors: Arc<RwLock<HashMap<Distributor, Box<(dyn RecipientHandler)>>>>,
}
impl GlobalDispatcher {
@ -353,7 +359,12 @@ impl GlobalDispatcher {
pub(crate) fn new() -> Self {
GlobalDispatcher {
dispatchers: LOTable::new(),
distributors: LOTable::new(),
distributors: Arc::new(RwLock::new(HashMap::new()))
// TODO: switch to LOTable once lever implements write optimized granularity
// distributors: LOTableBuilder::new()
//.with_concurrency(TransactionConcurrency::Optimistic)
//.with_isolation(TransactionIsolation::Serializable)
//.build(),
}
}
@ -495,6 +506,13 @@ impl GlobalDispatcher {
fn next(&self, distributor: Distributor) -> Result<Option<ChildRef>, SendError> {
self.distributors
.read()
.map_err(|error| {
SendError::Other(anyhow::anyhow!(
"couldn't get read lock on distributors {:?}",
error
))
})?
.get(&distributor)
.map(|recipient| recipient.next())
.ok_or_else(|| SendError::from(distributor))
@ -502,6 +520,13 @@ impl GlobalDispatcher {
fn all(&self, distributor: Distributor) -> Result<Vec<ChildRef>, SendError> {
self.distributors
.read()
.map_err(|error| {
SendError::Other(anyhow::anyhow!(
"couldn't get read lock on distributors {:?}",
error
))
})?
.get(&distributor)
.map(|recipient| recipient.all())
.ok_or_else(|| SendError::from(distributor))
@ -534,17 +559,22 @@ impl GlobalDispatcher {
/// Appends the information about actor to the recipients.
pub(crate) fn register_recipient(
&self,
distributor: Distributor,
distributor: &Distributor,
child_ref: ChildRef,
) -> AnyResult<()> {
if let Some(recipient) = self.distributors.get(&distributor) {
recipient.register(child_ref);
let mut distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
})?;
if let Some(recipients) = distributors.get(&distributor) {
recipients.register(child_ref);
} else {
let actors = DefaultRecipientHandler::default();
actors.register(child_ref);
self.distributors
.insert(distributor, Arc::new(Box::new(actors)))?;
}
let recipients = DefaultRecipientHandler::default();
recipients.register(child_ref);
distributors.insert(
distributor.clone(),
Box::new(recipients) as Box<(dyn RecipientHandler)>,
);
};
Ok(())
}
@ -553,17 +583,42 @@ impl GlobalDispatcher {
distributor_list: &[Distributor],
child_ref: ChildRef,
) -> AnyResult<()> {
let distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
})?;
distributor_list.iter().for_each(|distributor| {
if let Some(recipient) = self.distributors.get(distributor) {
recipient.remove(&child_ref);
}
distributors
.get(&distributor)
.map(|recipients| recipients.remove(&child_ref));
});
Ok(())
}
/// Adds distributor to the global registry.
pub(crate) fn register_distributor(&self, distributor: &Distributor) -> AnyResult<()> {
let mut distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
})?;
if distributors.contains_key(&distributor) {
debug!(
"The distributor with the '{:?}' name already registered in the cluster.",
distributor
);
} else {
distributors.insert(
distributor.clone(),
Box::new(DefaultRecipientHandler::default()),
);
}
Ok(())
}
/// Removes distributor from the global registry.
pub(crate) fn remove_distributor(&self, distributor: &Distributor) -> AnyResult<()> {
self.distributors.remove(distributor)?;
let mut distributors = self.distributors.write().map_err(|error| {
anyhow::anyhow!("couldn't get read lock on distributors {:?}", error)
})?;
distributors.remove(distributor);
Ok(())
}
}

View File

@ -1,14 +1,17 @@
//! `Distributor` is a mechanism that allows you to send messages to children.
use crate::{
child_ref::SendError,
message::{Answer, Message},
prelude::ChildRef,
message::{Answer, Message, MessageHandler},
prelude::{ChildRef, SendError},
system::{STRING_INTERNER, SYSTEM},
};
use anyhow::Result as AnyResult;
use futures::channel::oneshot;
use lasso::Spur;
use std::fmt::Debug;
use std::{
fmt::Debug,
sync::mpsc::{channel, Receiver},
};
// Copy is fine here because we're working
// with interned strings here
@ -36,11 +39,200 @@ impl Distributor {
Self(STRING_INTERNER.get_or_intern(name.as_ref()))
}
/// Ask a question to a recipient attached to the `Distributor`
/// and wait for a reply.
///
/// This can be achieved manually using a `MessageHandler` and `ask_one`.
/// Ask a question to a recipient attached to the `Distributor`
///
///
/// ```no_run
/// # use bastion::prelude::*;
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # }
/// #
/// # async fn run() {
/// # Bastion::init();
/// # Bastion::start();
///
/// # Bastion::supervisor(|supervisor| {
/// # supervisor.children(|children| {
/// // attach a named distributor to the children
/// children
/// # .with_redundancy(1)
/// .with_distributor(Distributor::named("my distributor"))
/// .with_exec(|ctx: BastionContext| {
/// async move {
/// loop {
/// // The message handler needs an `on_question` section
/// // that matches the `question` you're going to send,
/// // and that will reply with the Type the request expects.
/// // In our example, we ask a `&str` question, and expect a `bool` reply.
/// MessageHandler::new(ctx.recv().await?)
/// .on_question(|message: &str, sender| {
/// if message == "is it raining today?" {
/// sender.reply(true).unwrap();
/// }
/// });
/// }
/// Ok(())
/// }
/// })
/// # })
/// # });
///
/// let distributor = Distributor::named("my distributor");
///
/// let reply: Result<String, SendError> = distributor
/// .request("is it raining today?")
/// .await
/// .expect("couldn't receive reply");
///
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
pub fn request<R: Message>(
&self,
question: impl Message,
) -> oneshot::Receiver<Result<R, SendError>> {
let (sender, receiver) = oneshot::channel();
let s = *self;
spawn!(async move {
match SYSTEM.dispatcher().ask(s, question) {
Ok(response) => match response.await {
Ok(message) => {
let message_to_send = MessageHandler::new(message)
.on_tell(|reply: R, _| Ok(reply))
.on_fallback(|_, _| {
Err(SendError::Other(anyhow::anyhow!(
"received a message with the wrong type"
)))
});
let _ = sender.send(message_to_send);
}
Err(e) => {
let _ = sender.send(Err(SendError::Other(anyhow::anyhow!(
"couldn't receive reply: {:?}",
e
))));
}
},
Err(error) => {
let _ = sender.send(Err(error));
}
};
});
receiver
}
/// Ask a question to a recipient attached to the `Distributor`
/// and wait for a reply.
///
/// this is the sync variant of the `request` function, backed by a futures::channel::oneshot
/// # Example
///
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # }
/// #
/// # fn run() {
/// # Bastion::init();
/// # Bastion::start();
/// # Bastion::supervisor(|supervisor| {
/// # supervisor.children(|children| {
/// // attach a named distributor to the children
/// children
/// # .with_redundancy(1)
/// .with_distributor(Distributor::named("my distributor"))
/// .with_exec(|ctx: BastionContext| {
/// async move {
/// loop {
/// // The message handler needs an `on_question` section
/// // that matches the `question` you're going to send,
/// // and that will reply with the Type the request expects.
/// // In our example, we ask a `&str` question, and expect a `bool` reply.
/// MessageHandler::new(ctx.recv().await?)
/// .on_question(|message: &str, sender| {
/// if message == "is it raining today?" {
/// sender.reply(true).unwrap();
/// }
/// });
/// }
/// Ok(())
/// }
/// })
/// # })
/// # });
///
/// let distributor = Distributor::named("my distributor");
///
/// let reply: Result<bool, SendError> = distributor
/// .request_sync("is it raining today?")
/// .recv()
/// .expect("couldn't receive reply"); // Ok(true)
///
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
pub fn request_sync<R: Message>(
&self,
question: impl Message,
) -> Receiver<Result<R, SendError>> {
let (sender, receiver) = channel();
let s = *self;
spawn!(async move {
match SYSTEM.dispatcher().ask(s, question) {
Ok(response) => {
if let Ok(message) = response.await {
let message_to_send = MessageHandler::new(message)
.on_tell(|reply: R, _| Ok(reply))
.on_fallback(|_, _| {
Err(SendError::Other(anyhow::anyhow!(
"received a message with the wrong type"
)))
});
let _ = sender.send(message_to_send);
} else {
let _ = sender.send(Err(SendError::Other(anyhow::anyhow!(
"couldn't receive reply"
))));
}
}
Err(error) => {
let _ = sender.send(Err(error));
}
};
});
receiver
}
/// Ask a question to a recipient attached to the `Distributor`
///
/// # Example
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -73,8 +265,6 @@ impl Distributor {
/// # });
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
///
/// let distributor = Distributor::named("my distributor");
///
@ -93,7 +283,7 @@ impl Distributor {
/// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient)
/// # Example
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -126,8 +316,6 @@ impl Distributor {
/// # });
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
///
/// let distributor = Distributor::named("my distributor");
///
@ -145,7 +333,7 @@ impl Distributor {
///
/// # Example
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -178,8 +366,6 @@ impl Distributor {
/// # });
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
///
/// let distributor = Distributor::named("my distributor");
///
@ -198,7 +384,7 @@ impl Distributor {
/// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient)
/// # Example
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -231,8 +417,6 @@ impl Distributor {
/// # });
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
///
/// let distributor = Distributor::named("my distributor");
///
@ -248,7 +432,7 @@ impl Distributor {
/// subscribe a `ChildRef` to the named `Distributor`
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -280,8 +464,6 @@ impl Distributor {
/// # }).unwrap();
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// #
/// let child_ref = children.elems()[0].clone();
///
@ -295,13 +477,12 @@ impl Distributor {
/// # }
/// ```
pub fn subscribe(&self, child_ref: ChildRef) -> AnyResult<()> {
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.register_recipient(*self, child_ref)
SYSTEM.dispatcher().register_recipient(self, child_ref)
}
/// unsubscribe a `ChildRef` to the named `Distributor`
///
/// ```rust
/// ```no_run
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
@ -333,8 +514,6 @@ impl Distributor {
/// # }).unwrap();
/// #
/// # Bastion::start();
/// # // Wait until everyone is up
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// #
/// let child_ref = children.elems()[0].clone();
///
@ -356,3 +535,216 @@ impl Distributor {
&self.0
}
}
#[cfg(test)]
mod distributor_tests {
use crate::prelude::*;
use futures::channel::mpsc::channel;
use futures::{SinkExt, StreamExt};
const TEST_DISTRIBUTOR: &str = "test distributor";
const SUBSCRIBE_TEST_DISTRIBUTOR: &str = "subscribe test";
#[cfg(feature = "tokio-runtime")]
#[tokio::test]
async fn test_tokio_distributor() {
blocking!({
run_tests();
});
}
#[cfg(not(feature = "tokio-runtime"))]
#[test]
fn distributor_tests() {
run_tests();
}
fn run_tests() {
setup();
test_tell();
test_ask();
test_request();
test_subscribe();
}
fn test_subscribe() {
let temp_distributor = Distributor::named("temp distributor");
assert!(
temp_distributor.tell_one("hello!").is_err(),
"should not be able to send message to an empty distributor"
);
let one_child: ChildRef = run!(async {
Distributor::named(SUBSCRIBE_TEST_DISTRIBUTOR)
.request(())
.await
.unwrap()
.unwrap()
});
temp_distributor.subscribe(one_child.clone()).unwrap();
temp_distributor
.tell_one("hello!")
.expect("should be able to send message a distributor that has a subscriber");
temp_distributor.unsubscribe(one_child).unwrap();
assert!(
temp_distributor.tell_one("hello!").is_err(),
"should not be able to send message to a distributor who's sole subscriber unsubscribed"
);
}
fn test_tell() {
let test_distributor = Distributor::named(TEST_DISTRIBUTOR);
test_distributor
.tell_one("don't panic and carry a towel")
.unwrap();
let sent = test_distributor
.tell_everyone("so long, and thanks for all the fish")
.unwrap();
assert_eq!(
5,
sent.len(),
"test distributor is supposed to have 5 children"
);
}
fn test_ask() {
let test_distributor = Distributor::named(TEST_DISTRIBUTOR);
let question: String =
"What is the answer to life, the universe and everything?".to_string();
run!(async {
let answer = test_distributor.ask_one(question.clone()).unwrap();
MessageHandler::new(answer.await.unwrap())
.on_tell(|answer: u8, _| {
assert_eq!(42, answer);
})
.on_fallback(|unknown, _sender_addr| {
panic!("unknown message\n {:?}", unknown);
});
});
run!(async {
let answers = test_distributor.ask_everyone(question.clone()).unwrap();
assert_eq!(
5,
answers.len(),
"test distributor is supposed to have 5 children"
);
let meanings = futures::future::join_all(answers.into_iter().map(|answer| async {
MessageHandler::new(answer.await.unwrap())
.on_tell(|answer: u8, _| {
assert_eq!(42, answer);
answer
})
.on_fallback(|unknown, _sender_addr| {
panic!("unknown message\n {:?}", unknown);
})
}))
.await;
assert_eq!(
42 * 5,
meanings.iter().sum::<u8>(),
"5 children returning 42 should sum to 42 * 5"
);
});
}
fn test_request() {
let test_distributor = Distributor::named(TEST_DISTRIBUTOR);
let question: String =
"What is the answer to life, the universe and everything?".to_string();
run!(async {
let answer: u8 = test_distributor
.request(question.clone())
.await
.unwrap()
.unwrap();
assert_eq!(42, answer);
});
let answer_sync: u8 = test_distributor
.request_sync(question)
.recv()
.unwrap()
.unwrap();
assert_eq!(42, answer_sync);
}
fn setup() {
Bastion::init();
Bastion::start();
const NUM_CHILDREN: usize = 5;
// This channel and the use of callbacks will allow us to know when all of the children are spawned.
let (sender, receiver) = channel(NUM_CHILDREN);
Bastion::supervisor(|supervisor| {
let test_ready = sender.clone();
let subscribe_test_ready = sender.clone();
supervisor
.children(|children| {
children
.with_redundancy(NUM_CHILDREN)
.with_distributor(Distributor::named(TEST_DISTRIBUTOR))
.with_callbacks(Callbacks::new().with_after_start(move || {
let mut test_ready = test_ready.clone();
spawn!(async move { test_ready.send(()).await });
}))
.with_exec(|ctx| async move {
loop {
let child_ref = ctx.current().clone();
MessageHandler::new(ctx.recv().await?)
.on_question(|_: String, sender| {
let _ = sender.reply(42_u8);
})
// send your child ref
.on_question(|_: (), sender| {
let _ = sender.reply(child_ref);
});
}
})
// Subscribe / unsubscribe tests
})
.children(|children| {
children
.with_distributor(Distributor::named(SUBSCRIBE_TEST_DISTRIBUTOR))
.with_callbacks(Callbacks::new().with_after_start(move || {
let mut subscribe_test_ready = subscribe_test_ready.clone();
spawn!(async move { subscribe_test_ready.send(()).await });
}))
.with_exec(|ctx| async move {
loop {
let child_ref = ctx.current().clone();
MessageHandler::new(ctx.recv().await?).on_question(
|_: (), sender| {
let _ = sender.reply(child_ref);
},
);
}
})
})
})
.unwrap();
// Wait until the children have spawned
run!(async {
// NUM_CHILDREN for the test distributor group,
// 1 for the subscribe test group
receiver.take(NUM_CHILDREN + 1).collect::<Vec<_>>().await;
});
}
}

View File

@ -4,7 +4,14 @@
//! A ReceiveError may however be raised when calling try_recv() or try_recv_timeout()
//! More errors may happen in the future.
use crate::envelope::Envelope;
use crate::message::Msg;
use crate::system::STRING_INTERNER;
use crate::{distributor::Distributor, message::BastionMessage};
use futures::channel::mpsc::TrySendError;
use std::fmt::Debug;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug)]
/// These errors happen
@ -18,3 +25,46 @@ pub enum ReceiveError {
/// Generic error. Not used yet
Other,
}
#[derive(Error, Debug)]
/// `SendError`s occur when a message couldn't be dispatched through a distributor
pub enum SendError {
#[error("couldn't send message. Channel Disconnected.")]
/// Channel has been closed before we could send a message
Disconnected(Msg),
#[error("couldn't send message. Channel is Full.")]
/// Channel is full, can't send a message
Full(Msg),
#[error("couldn't send a message I should have not sent. {0}")]
/// This error is returned when we try to send a message
/// that is not a BastionMessage::Message variant
Other(anyhow::Error),
#[error("No available Distributor matching {0}")]
/// The distributor we're trying to dispatch messages to is not registered in the system
NoDistributor(String),
#[error("Distributor has 0 Recipients")]
/// The distributor we're trying to dispatch messages to has no recipients
EmptyRecipient,
}
impl From<TrySendError<Envelope>> for SendError {
fn from(tse: TrySendError<Envelope>) -> Self {
let is_disconnected = tse.is_disconnected();
match tse.into_inner().msg {
BastionMessage::Message(msg) => {
if is_disconnected {
Self::Disconnected(msg)
} else {
Self::Full(msg)
}
}
other => Self::Other(anyhow::anyhow!("{:?}", other)),
}
}
}
impl From<Distributor> for SendError {
fn from(distributor: Distributor) -> Self {
Self::NoDistributor(STRING_INTERNER.resolve(distributor.interned()).to_string())
}
}

View File

@ -647,10 +647,18 @@ impl Supervisor {
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
// FIXME: children group elems launched without the group itself being launched
if let Err(e) = children.register_dispatchers() {
warn!("couldn't register all dispatchers into the registry: {}", e);
};
if let Err(e) = children.register_distributors() {
warn!(
"couldn't register all distributors into the registry: {}",
e
);
};
children.launch_elems();
debug!(

View File

@ -19,7 +19,7 @@ fn spawn_responders() -> ChildrenRef {
msg! { ctx.recv().await?,
msg: &'static str =!> {
if msg == "Hello" {
assert!(signature!().is_sender_identified(), false);
assert!(signature!().is_sender_identified(), "sender is not identified");
answer!(ctx, "Goodbye").unwrap();
}
};