Add MessageHandler (#309)

* Add initial implementation for MessageHandler

This commit adds very basic implementation of MessageHandler. This structure
stores a SignedMessage, and calls, depending on which kind of message it is, and
on its underlying type, either a specified closure, or a fallback one.

The goal is to provide an API that would be nicer to work with than the msg!
macro.

Current implementation features a state-machine like algorithm and currently
only handles messages that can responded to (aka "questions").

* Make AnswerSender carry its own signature.

This allows us not to trust caller of AnswerSender::reply to provide a
correct signature. As such, the corresponding method can be documented.

This is necessary because such method may be called in the closure that
are passed to MessageHandler::with_question.

Note: this commit renames AnswerSender::send to AnswerSender::respond, and
removes the signature part. This method is public but not documented. As
such, this theorically breaking change should not break any code.

* Add on_* functions

This allows us to match on both regular messages (the ones we can't
respond to) as well as the broadcasts. It follows the same model
established previously.

* Add documentation for MessageHandler API

* Make sender address available for each on_* function

* Allow MessageHandler to return something

Previous implementation of MessageHandler always returned nothing, as it
was not considered important. However, returning something is important
at least in the fibonacci example.

This commit allows the MessageHandler to return some data. It requires
every matcher to return the same data type. This data is stored in the
MessageHandler and returned by the on_fallback function.

* Rewrite the fibonacci example with MessageHanlder

* Remove useless clone by destructuring on fallback.

This allows us to remove additional code.

* Add a proof of concept that we can match over different types using the MessageHandler

Co-authored-by: Jeremy Lempereur <jeremy.lempereur@gmail.com>
This commit is contained in:
Sasha Pourcelot 2021-03-02 18:31:25 +01:00 committed by GitHub
parent 8d6707fdf2
commit 835be0586d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 495 additions and 15 deletions

View File

@ -0,0 +1,147 @@
use bastion::{message::MessageHandler, prelude::*};
use tracing::{error, info};
// This terribly slow implementation
// will allow us to be rough on the cpu
fn fib(n: usize) -> usize {
if n == 0 || n == 1 {
n
} else {
fib(n - 1) + fib(n - 2)
}
}
// This terrible helper is converting `fib 50` into a tuple ("fib", 50)
// we might want to use actual serializable / deserializable structures
// in the real world
fn deserialize_into_fib_command(message: String) -> (String, usize) {
let arguments: Vec<&str> = message.split(' ').collect();
let command = arguments.first().map(|s| s.to_string()).unwrap_or_default();
let number = usize::from_str_radix(arguments.get(1).unwrap_or(&"0"), 10).unwrap_or(0);
(command, number)
}
// This is the heavylifting.
// A child will wait for a message, and try to process it.
async fn fib_child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|request: String, sender| {
let (command, number) = deserialize_into_fib_command(request);
if command == "fib" {
sender
.reply(format!("{}", fib(number)))
.expect("couldn't reply :(");
} else {
sender
.reply(format!(
"I'm sorry I didn't understand the task I was supposed to do"
))
.expect("couldn't reply :(");
}
})
.on_broadcast(|broadcast: &String, _sender_addr| {
info!("received broadcast: {:?}", *broadcast);
})
.on_tell(|message: String, _sender_addr| {
info!("someone told me something: {}", message);
})
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}
}
// This little helper allows me to send a request, and get a reply.
// The types are `String` for this quick example, but there's a way for us to do better.
// We will see this in other examples.
async fn request(child: &ChildRef, body: String) -> std::io::Result<String> {
let answer = child
.ask_anonymously(body)
.expect("couldn't perform request");
Ok(
MessageHandler::new(answer.await.expect("couldn't receive answer"))
.on_tell(|reply, _sender_addr| reply)
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand: {:?}",
unknown
);
"".to_string()
}),
)
}
// RUST_LOG=info cargo run --example fibonacci_message_handler
fn main() {
// This will allow us to have nice colored logs when we run the program
env_logger::init();
// We need a bastion in order to run everything
Bastion::init();
Bastion::start();
// Spawn 4 children that will execute our fibonacci task
let children =
Bastion::children(|children| children.with_redundancy(4).with_exec(fib_child_task))
.expect("couldn't create children");
// Broadcasting 1 message to the children
// Have a look at the console output
// to see 1 log entry for each child!
children
.broadcast("Hello there :)".to_string())
.expect("Couldn't broadcast to the children.");
let mut fib_to_compute = 35;
for child in children.elems() {
child
.tell_anonymously("shhh here's a message, don't tell anyone.".to_string())
.expect("Couldn't whisper to child.");
let now = std::time::Instant::now();
// by using run!, we are blocking.
// we could have used spawn! instead,
// to run everything in parallel.
let fib_reply = run!(request(child, format!("fib {}", fib_to_compute)))
.expect("send_command_to_child failed");
println!(
"fib({}) = {} - Computed in {}ms",
fib_to_compute,
fib_reply,
now.elapsed().as_millis()
);
// Let's not go too far with the fib sequence
// Otherwise the computer may take a while!
fib_to_compute += 2;
}
Bastion::stop();
Bastion::block_until_stopped();
}
// Compiling bastion v0.3.5-alpha (/home/ignition/Projects/oss/bastion/src/bastion)
// Finished dev [unoptimized + debuginfo] target(s) in 1.07s
// Running `target/debug/examples/fibonacci`
// [2020-05-08T14:00:53Z INFO bastion::system] System: Initializing.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launched.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Starting.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// fib(35) = 9227465 - Computed in 78ms
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(37) = 24157817 - Computed in 196ms
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(39) = 63245986 - Computed in 512ms
// [2020-05-08T14:00:54Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(41) = 165580141 - Computed in 1327ms
// [2020-05-08T14:00:55Z INFO bastion::system] System: Stopping.

View File

@ -0,0 +1,63 @@
use bastion::message::MessageHandler;
use bastion::prelude::*;
use std::fmt::Debug;
use tracing::error;
// This example shows that it is possible to use the MessageHandler to match
// over different types of message.
async fn child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|n: i32, sender| {
if n == 42 {
sender.reply(101).expect("Failed to reply to sender");
} else {
error!("Expected number `42`, found `{}`", n);
}
})
.on_question(|s: &str, sender| {
if s == "marco" {
sender.reply("polo").expect("Failed to reply to sender");
} else {
panic!("Expected string `marco`, found `{}`", s);
}
})
.on_fallback(|v, addr| panic!("Wrong message from {:?}: got {:?}", addr, v))
}
}
async fn request<T: 'static + Debug + Send + Sync>(
child: &ChildRef,
body: T,
) -> std::io::Result<()> {
let answer = child
.ask_anonymously(body)
.expect("Couldn't perform request")
.await
.expect("Couldn't receive answer");
MessageHandler::new(answer)
.on_tell(|n: i32, _| assert_eq!(n, 101))
.on_tell(|s: &str, _| assert_eq!(s, "polo"))
.on_fallback(|_, _| panic!("Unknown message"));
Ok(())
}
fn main() {
env_logger::init();
Bastion::init();
Bastion::start();
let children =
Bastion::children(|c| c.with_exec(child_task)).expect("Failed to spawn children");
let child = &children.elems()[0];
run!(request(child, 42)).unwrap();
run!(request(child, "marco")).unwrap();
// run!(request(child, "foo")).unwrap();
}

View File

@ -305,7 +305,7 @@ impl ChildRef {
///
pub fn ask_anonymously<M: Message>(&self, msg: M) -> Result<Answer, M> {
debug!("ChildRef({}): Asking message: {:?}", self.id(), msg);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.addr());
let env = Envelope::from_dead_letters(msg);
// FIXME: panics?
self.send(env).map_err(|env| env.into_msg().unwrap())?;

View File

@ -730,7 +730,7 @@ impl BastionContext {
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.signature());
let env = Envelope::new_with_sign(msg, self.signature());
// FIXME: panics?
to.sender()

View File

@ -33,9 +33,14 @@ use tracing::{debug, trace};
pub trait Message: Any + Send + Sync + Debug {}
impl<T> Message for T where T: Any + Send + Sync + Debug {}
/// Allows to respond to questions.
///
/// This type features the [`respond`] method, that allows to respond to a
/// question.
///
/// [`respond`]: #method.respond
#[derive(Debug)]
#[doc(hidden)]
pub struct AnswerSender(oneshot::Sender<SignedMessage>);
pub struct AnswerSender(oneshot::Sender<SignedMessage>, RefAddr);
#[derive(Debug)]
/// A [`Future`] returned when successfully "asking" a
@ -257,14 +262,17 @@ pub(crate) enum Deployment {
}
impl AnswerSender {
// FIXME: we can't let manipulating Signature in a public API
// but now it's being called only by a macro so we are trusting it
#[doc(hidden)]
pub fn send<M: Message>(self, msg: M, sign: RefAddr) -> Result<(), M> {
/// Sends data back to the original sender.
///
/// Returns `Ok` if the data was sent successfully, otherwise returns the
/// original data.
pub fn reply<M: Message>(self, msg: M) -> Result<(), M> {
debug!("{:?}: Sending answer: {:?}", self, msg);
let msg = Msg::tell(msg);
trace!("{:?}: Sending message: {:?}", self, msg);
self.0
let AnswerSender(sender, sign) = self;
sender
.send(SignedMessage::new(msg, sign))
.map_err(|smsg| smsg.msg.try_unwrap().unwrap())
}
@ -281,10 +289,10 @@ impl Msg {
Msg(inner)
}
pub(crate) fn ask<M: Message>(msg: M) -> (Self, Answer) {
pub(crate) fn ask<M: Message>(msg: M, sign: RefAddr) -> (Self, Answer) {
let msg = Box::new(msg);
let (sender, recver) = oneshot::channel();
let sender = AnswerSender(sender);
let sender = AnswerSender(sender, sign);
let answer = Answer(recver);
let sender = Some(sender);
@ -397,6 +405,16 @@ impl Msg {
}
}
impl AsRef<dyn Any> for Msg {
fn as_ref(&self) -> &dyn Any {
match &self.0 {
MsgInner::Broadcast(msg) => msg.as_ref(),
MsgInner::Tell(msg) => msg.as_ref(),
MsgInner::Ask { msg, .. } => msg.as_ref(),
}
}
}
impl BastionMessage {
pub(crate) fn start() -> Self {
BastionMessage::Start
@ -456,8 +474,8 @@ impl BastionMessage {
BastionMessage::Message(msg)
}
pub(crate) fn ask<M: Message>(msg: M) -> (Self, Answer) {
let (msg, answer) = Msg::ask(msg);
pub(crate) fn ask<M: Message>(msg: M, sign: RefAddr) -> (Self, Answer) {
let (msg, answer) = Msg::ask(msg, sign);
(BastionMessage::Message(msg), answer)
}
@ -763,7 +781,7 @@ macro_rules! msg {
($ctx:expr, $answer:expr) => {
{
let sign = $ctx.signature();
sender.send($answer, sign)
sender.reply($answer)
}
};
}
@ -858,6 +876,258 @@ macro_rules! answer {
($msg:expr, $answer:expr) => {{
let (mut msg, sign) = $msg.extract();
let sender = msg.take_sender().expect("failed to take render");
sender.send($answer, sign)
sender.reply($answer)
}};
}
#[derive(Debug)]
enum MessageHandlerState<O> {
Matched(O),
Unmatched(SignedMessage),
}
impl<O> MessageHandlerState<O> {
fn take_message(self) -> Result<SignedMessage, O> {
match self {
MessageHandlerState::Unmatched(msg) => Ok(msg),
MessageHandlerState::Matched(output) => Err(output),
}
}
fn output_or_else(self, f: impl FnOnce(SignedMessage) -> O) -> O {
match self {
MessageHandlerState::Matched(output) => output,
MessageHandlerState::Unmatched(msg) => f(msg),
}
}
}
/// Matches a [`Msg`] (as returned by [`BastionContext::recv`]
/// or [`BastionContext::try_recv`]) with different types.
///
/// This type may replace the [`msg!`] macro in the future.
///
/// The [`new`] function creates a new [`MessageHandler`], which is then
/// matched on with the `on_*` functions.
///
/// There are different kind of messages:
/// - messages that are broadcasted, which can be matched with the
/// [`on_broadcast`] method,
/// - messages that can be responded to, which are matched with the
/// [`on_question`] method,
/// - messages that can not be responded to, which are matched with
/// [`on_tell`],
/// - fallback case, which matches everything, entitled [`on_fallback`].
///
/// The closure passed to the functions described previously must return the
/// same type. This value is retrieved when [`on_fallback`] is invoked.
///
/// Questions can be responded to by calling [`reply`] on the provided
/// sender.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// # use bastion::message::MessageHandler;
/// #
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # }
/// #
/// # fn run() {
/// # Bastion::init();
/// // The message that will be broadcasted...
/// const BCAST_MSG: &'static str = "A message containing data (broadcast).";
/// // The message that will be "told" to the child...
/// const TELL_MSG: &'static str = "A message containing data (tell).";
/// // The message that will be "asked" to the child...
/// const ASK_MSG: &'static str = "A message containing data (ask).";
///
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// # ctx.tell(&ctx.current().addr(), TELL_MSG).unwrap();
/// # ctx.ask(&ctx.current().addr(), ASK_MSG).unwrap();
/// #
/// loop {
/// MessageHandler::new(ctx.recv().await?)
/// // We match on broadcasts of &str
/// .on_broadcast(|msg: &&str, _sender_addr| {
/// assert_eq!(*msg, BCAST_MSG);
/// // Handle the message...
/// })
/// // We match on messages of &str
/// .on_tell(|msg: &str, _sender_addr| {
/// assert_eq!(msg, TELL_MSG);
/// // Handle the message...
/// })
/// // We match on questions of &str
/// .on_question(|msg: &str, sender| {
/// assert_eq!(msg, ASK_MSG);
/// // Handle the message...
///
/// // ...and eventually answer to it...
/// sender.reply("An answer to the message.");
/// })
/// // We are only broadcasting, "telling" and "asking" a
/// // `&str` in this example, so we know that this won't
/// // happen...
/// .on_fallback(|msg, _sender_addr| ());
/// }
/// }
/// })
/// }).expect("Couldn't start the children group.");
/// #
/// # Bastion::start();
/// # Bastion::broadcast(BCAST_MSG).unwrap();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`BastionContext::recv`]: crate::context::BastionContext::recv
/// [`BastionContext::try_recv`]: crate::context::BastionContext::try_recv
/// [`new`]: Self::new
/// [`on_broadcast`]: Self::on_broadcast
/// [`on_question`]: Self::on_question
/// [`on_tell`]: Self::on_tell
/// [`on_fallback`]: Self::on_fallback
/// [`reply`]: AnswerSender::reply
#[derive(Debug)]
pub struct MessageHandler<O> {
state: MessageHandlerState<O>,
}
impl<O> MessageHandler<O> {
/// Creates a new [`MessageHandler`] with an incoming message.
pub fn new(msg: SignedMessage) -> MessageHandler<O> {
let state = MessageHandlerState::Unmatched(msg);
MessageHandler { state }
}
/// Matches on a question of a specific type.
///
/// This will consume the inner data and call `f` if the contained message
/// can be replied to.
pub fn on_question<T, F>(self, f: F) -> MessageHandler<O>
where
T: 'static,
F: FnOnce(T, AnswerSender) -> O,
{
match self.try_into_question::<T>() {
Ok((arg, sender)) => {
let val = f(arg, sender);
MessageHandler::matched(val)
}
Err(this) => this,
}
}
/// Calls a fallback function if the message has still not matched yet.
///
/// This consumes the [`MessageHandler`], so that no matching can be
/// performed anymore.
pub fn on_fallback<F>(self, f: F) -> O
where
F: FnOnce(&dyn Any, RefAddr) -> O,
{
self.state
.output_or_else(|SignedMessage { msg, sign }| f(msg.as_ref(), sign))
}
/// Calls a function if the incoming message is a broadcast and has a
/// specific type.
pub fn on_broadcast<T, F>(self, f: F) -> MessageHandler<O>
where
T: 'static + Send + Sync,
F: FnOnce(&T, RefAddr) -> O,
{
match self.try_into_broadcast::<T>() {
Ok((arg, addr)) => {
let val = f(arg.as_ref(), addr);
MessageHandler::matched(val)
}
Err(this) => this,
}
}
/// Calls a function if the incoming message can't be replied to and has a
/// specific type.
pub fn on_tell<T, F>(self, f: F) -> MessageHandler<O>
where
T: 'static,
F: FnOnce(T, RefAddr) -> O,
{
match self.try_into_tell::<T>() {
Ok((msg, addr)) => {
let val = f(msg, addr);
MessageHandler::matched(val)
}
Err(this) => this,
}
}
fn matched(output: O) -> MessageHandler<O> {
let state = MessageHandlerState::Matched(output);
MessageHandler { state }
}
fn try_into_question<T: 'static>(self) -> Result<(T, AnswerSender), MessageHandler<O>> {
match self.state.take_message() {
Ok(SignedMessage {
msg:
Msg(MsgInner::Ask {
msg,
sender: Some(sender),
}),
..
}) if msg.is::<T>() => {
let msg: Box<dyn Any> = msg;
Ok((*msg.downcast::<T>().unwrap(), sender))
}
Ok(anything) => Err(MessageHandler::new(anything)),
Err(output) => Err(MessageHandler::matched(output)),
}
}
fn try_into_broadcast<T: Send + Sync + 'static>(
self,
) -> Result<(Arc<T>, RefAddr), MessageHandler<O>> {
match self.state.take_message() {
Ok(SignedMessage {
msg: Msg(MsgInner::Broadcast(msg)),
sign,
}) if msg.is::<T>() => {
let msg: Arc<dyn Any + Send + Sync + 'static> = msg;
Ok((msg.downcast::<T>().unwrap(), sign))
}
Ok(anything) => Err(MessageHandler::new(anything)),
Err(output) => Err(MessageHandler::matched(output)),
}
}
fn try_into_tell<T: 'static>(self) -> Result<(T, RefAddr), MessageHandler<O>> {
match self.state.take_message() {
Ok(SignedMessage {
msg: Msg(MsgInner::Tell(msg)),
sign,
}) if msg.is::<T>() => {
let msg: Box<dyn Any> = msg;
Ok((*msg.downcast::<T>().unwrap(), sign))
}
Ok(anything) => Err(MessageHandler::new(anything)),
Err(output) => Err(MessageHandler::matched(output)),
}
}
}