This commit is contained in:
o0Ignition0o 2020-05-31 18:56:17 +02:00
parent 1ac60239c4
commit 6455af10db
4 changed files with 148 additions and 7 deletions

View File

@ -25,7 +25,7 @@ travis-ci = { repository = "bastion-rs/bastion", branch = "master" }
maintenance = { status = "actively-developed" }
[features]
unstable = ["numanji", "allocator-suite", "jemallocator"]
unstable = ["numanji", "jemallocator"]
[dependencies]
lightproc = { version = "= 0.3.5-alpha.0", path = "../lightproc" }

View File

@ -0,0 +1,115 @@
use bastion::prelude::*;
use std::fmt::{Display, Formatter, Result};
use std::sync::Arc;
use tracing::{error, info};
#[derive(Debug)]
struct Ping {
count: usize,
}
impl Display for Ping {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(f, "Ping ! {}", self.count)
}
}
#[derive(Debug)]
struct Pong {
count: usize,
}
impl Display for Pong {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(f, "Pong ! {}", self.count)
}
}
///
/// Ping pong job example
///
/// Prologue:
/// This example will show two children cooperating by sending pings and pongs until MAX is reached.
fn main() {
env_logger::init();
Bastion::init();
// children that send pongs.
let pong_children = Bastion::children(|children: Children| {
children
.with_redundancy(10)
.with_exec(move |ctx: BastionContext| {
async move {
info!("pong child ready!");
// Start receiving work
loop {
msg! { ctx.recv().await?,
msg: Ping =!> {
let response = Pong {
count: msg.count + 1
};
println!("{}", msg);
let _ = answer!(ctx, response);
};
_: _ => {
println!("Pong child received a message it can't process");
};
}
}
}
})
})
.expect("Couldn't start a new children group.");
let pong_children = Arc::new(pong_children);
dbg!(&pong_children.dispatchers());
// children that send pings.
let ping_children = Bastion::children(|children: Children| {
children
.with_redundancy(10)
.with_exec(move |ctx: BastionContext| {
let pong_children = Arc::clone(&pong_children);
async move {
println!("ping child ready!");
// Start receiving work
loop {
msg! { ctx.recv().await?,
ref msg: &'static str => {
if msg == &"start" {
println!("leggo");
(*pong_children).ask(Ping { count: 0});
}
};
msg: Pong =!> {
let response = Ping {
count: msg.count + 1
};
println!("{}", msg);
let _ = answer!(ctx, response);
};
_: _ => {
println!("Pong child received a message it can't process");
};
}
}
}
})
})
.expect("Couldn't start a new children group.");
Bastion::start();
let handle = std::thread::spawn(move || {
println!("------------------let s do stuff");
std::thread::sleep_ms(5000);
println!("------------------let s start");
ping_children.tell("start");
});
handle.join().unwrap();
Bastion::block_until_stopped();
}

View File

@ -5,7 +5,7 @@ use crate::child_ref::ChildRef;
use crate::context::BastionId;
use crate::dispatcher::DispatcherType;
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Message};
use crate::message::{Answer, BastionMessage, Message};
use crate::path::BastionPath;
use std::cmp::{Eq, PartialEq};
use std::fmt::Debug;
@ -184,6 +184,24 @@ impl ChildrenRef {
self.send(env).map_err(|err| err.into_msg().unwrap())
}
pub fn tell<M: Message>(&self, msg: M) -> Result<(), M> {
debug!("ChildrenRef({}): Telling message: {:?}", self.id(), msg);
let msg = BastionMessage::tell(msg);
let env = Envelope::from_dead_letters(msg);
// FIXME: panics?
self.send(env).map_err(|err| err.into_msg().unwrap())
}
pub fn ask<M: Message>(&self, msg: M) -> Result<Answer, M> {
debug!("ChildrenRef({}): Asking message: {:?}", self.id(), msg);
let (msg, answer) = BastionMessage::ask(msg);
let env = Envelope::from_dead_letters(msg);
// FIXME: panics?
self.send(env)
.and_then(|_| Ok(answer))
.map_err(|err| err.into_msg().unwrap())
}
/// Sends a message to the children group this `ChildrenRef`
/// is referencing to tell it to stop all of its running
/// elements.

View File

@ -77,8 +77,10 @@ impl DispatcherHandler for RoundRobinHandler {
_notification_type: NotificationType,
) {
}
// Each child in turn will receive a message.
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>) {
self.dispatch_message(entries, message)
}
fn dispatch_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>) {
let current_index = self.index.load(Ordering::SeqCst) % entries.len() as u64;
let mut skipped = 0;
@ -106,8 +108,10 @@ pub trait DispatcherHandler {
entries: &DispatcherMap,
notification_type: NotificationType,
);
/// Broadcasts the message to actors in according to the implemented behaviour.
#[deprecated(since = "0.3.5", note = "please use dispatch_message instead.")]
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>);
/// Dispatch the message to actors in according to the implemented behaviour
fn dispatch_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>);
}
/// A generic implementation of the Bastion dispatcher
@ -196,8 +200,8 @@ impl Dispatcher {
/// Sends the message to the group of actors.
/// The logic of who and how should receive the message relies onto
/// the handler implementation.
pub fn broadcast_message(&self, message: &Arc<SignedMessage>) {
self.handler.broadcast_message(&self.actors, &message);
pub fn dispatch_message(&self, message: &Arc<SignedMessage>) {
self.handler.dispatch_message(&self.actors, &message);
}
}
@ -320,6 +324,10 @@ impl GlobalDispatcher {
/// Broadcasts the given message in according with the specified target.
pub(crate) fn broadcast_message(&self, target: BroadcastTarget, message: &Arc<SignedMessage>) {
self.dispatch_message(target, message)
}
pub(crate) fn dispatch_message(&self, target: BroadcastTarget, message: &Arc<SignedMessage>) {
let mut acked_dispatchers: Vec<DispatcherType> = Vec::new();
match target {
@ -337,7 +345,7 @@ impl GlobalDispatcher {
for dispatcher_type in acked_dispatchers {
match self.dispatchers.get(&dispatcher_type) {
Some(dispatcher) => {
dispatcher.broadcast_message(&message.clone());
dispatcher.dispatch_message(&message.clone());
}
// TODO: Put the message into the dead queue
None => {