This commit is contained in:
o0Ignition0o 2020-08-30 18:45:15 +02:00
parent bac1019d7d
commit a3896dc675
4 changed files with 262 additions and 49 deletions

View File

@ -1,8 +1,8 @@
use async_mutex::Mutex;
use bastion::prelude::*;
use futures_timer::Delay;
use lightproc::proc_state::State;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::{sync::Arc, time::Duration};
struct StateWithCounter {
@ -34,8 +34,7 @@ fn main() {
tracing::subscriber::set_global_default(subscriber).unwrap();
Bastion::init();
let state: Arc<Mutex<Box<dyn State>>> =
Arc::new(Mutex::new(Box::new(StateWithCounter::default())));
let state: Arc<Mutex<dyn State>> = Arc::new(Mutex::new(StateWithCounter::default()));
std::thread::sleep(Duration::from_secs(1));
@ -50,36 +49,116 @@ fn main() {
move |ctx: BastionContext| async move {
loop {
msg! { ctx.recv().await?,
msg: usize => {
tracing::error!("YAY");
};
msg: usize => {
tracing::error!("YAY");
};
msg: &usize => {
tracing::error!("&YAY");
};
ref msg: usize => {
tracing::error!("ref YAY");
};
ref msg: &usize => {
tracing::error!("ref &YAY");
};
ref msg: &str => {
match msg {
&"increment" => {
tracing::error!("INCREMENT!!!");
ctx.with_state(|state: &StateWithCounter| state.increment()).await;
},
&"get" => {
let mut counter = 0;
ctx.with_state(|state: &StateWithCounter| {counter = state.get();}).await;
tracing::error!("GET {}", counter);
// answer!(ctx, format!("the counter is {}", counter)).unwrap();
},
_ => {
tracing::error!("NOPE");
}
}
};
msg: &str => {
match msg {
"increment" => {
tracing::error!("INCREMENT!!!");
ctx.with_state(|state: &StateWithCounter| state.increment()).await;
},
"get" => {
let mut counter = 0;
ctx.with_state(|state: &StateWithCounter| {counter = state.get();}).await;
tracing::error!("GET {}", counter);
// answer!(ctx, format!("the counter is {}", counter)).unwrap();
},
_ => {
tracing::error!("NOPE");
}
}
};
// We define the behavior when we receive a new msg
ref raw_message: SignedMessage => {
panic!("aw")
};
// We define the behavior when we receive a new msg
raw_message: Arc<SignedMessage> => {
// We open the message
let message = Arc::try_unwrap(raw_message).unwrap();
msg! { message,
ref msg: &str => {
match msg {
&"increment" => {
let mut state = ctx.state().unwrap().lock().await;
let dyn_state = state.as_any();
let downcasted = dyn_state.downcast_ref::<StateWithCounter>().unwrap();
downcasted.increment();
},
&"get" => {
let mut state = ctx.state().unwrap().lock().await;
let dyn_state = state.as_any();
let downcasted = dyn_state.downcast_ref::<StateWithCounter>().unwrap();
let counter = downcasted.get();
tracing::error!("GET {}", counter);
// answer!(ctx, format!("the counter is {}", counter)).unwrap();
},
_ => {
tracing::error!("NOPE");
}
}
};
_: _ => {panic!(); ()};
}
panic!("no")
// // We open the message
// msg! { Arc::try_unwrap(raw_message).unwrap(),
// msg: usize => {
// tracing::error!("sub YAY");
// };
// msg: &usize => {
// tracing::error!("sub &YAY");
// };
// ref msg: usize => {
// tracing::error!("sub ref YAY");
// };
// ref msg: &usize => {
// tracing::error!("sub ref &YAY");
// };
// ref msg: &str => {
// match msg {
// &"increment" => {
// tracing::error!("INCREMENT!!!");
// ctx.with_state(|state: &StateWithCounter| state.increment()).await;
// },
// &"get" => {
// let mut counter = 0;
// ctx.with_state(|state: &StateWithCounter| {counter = state.get();}).await;
// tracing::error!("GET {}", counter);
// // answer!(ctx, format!("the counter is {}", counter)).unwrap();
// },
// _ => {
// tracing::error!("NOPE");
// }
// }
// };
// msg: &str => {
// tracing::error!("msg!");
// match msg {
// "increment" => {
// tracing::error!("INCREMENT!!!");
// ctx.with_state(|state: &StateWithCounter| state.increment()).await;
// },
// "get" => {
// let mut counter = 0;
// ctx.with_state(|state: &StateWithCounter| {counter = state.get();}).await;
// tracing::error!("GET {}", counter);
// // answer!(ctx, format!("the counter is {}", counter)).unwrap();
// },
// _ => {
// tracing::error!("NOPE");
// }
// }
// };
// _: _ => {panic!("neither"); ()};
// }
};
_: _ => ();
_: _ => {tracing::error!("nope"); ()};
}
}
},
@ -89,7 +168,21 @@ fn main() {
children.with_exec(move |ctx: BastionContext| async move {
let target = BroadcastTarget::Group("counter".to_string());
for _ in 0..1000 {
ctx.broadcast_message(target.clone(), "get");
ctx.tell_one(target.clone(), 1usize);
Delay::new(Duration::from_millis(200)).await;
ctx.tell_all(target.clone(), 1usize);
Delay::new(Duration::from_millis(200)).await;
ctx.tell_one(target.clone(), &1usize);
Delay::new(Duration::from_millis(200)).await;
// ctx.broadcast_message(target.clone(), &1usize);
Delay::new(Duration::from_millis(200)).await;
// ctx.broadcast_message(target.clone(), 1usize);
ctx.tell_one(target.clone(), "get");
// msg! {
// ctx.recv().await?,
// ref answer: &str => {
@ -100,7 +193,7 @@ fn main() {
Delay::new(Duration::from_millis(200)).await;
ctx.broadcast_message(target.clone(), "increment");
ctx.tell_one(target.clone(), "increment");
Delay::new(Duration::from_millis(200)).await;
}

View File

@ -110,7 +110,7 @@ pub struct Children {
helper_actors: FxHashMap<BastionId, (Sender, RecoverableHandle<()>)>,
// Shared state between the children.
// can be retrieved from a bastioncontext.
state: Option<Arc<Mutex<Box<dyn State>>>>,
state: Option<ProcState>,
}
impl Children {
@ -280,7 +280,7 @@ impl Children {
self
}
pub fn with_state(mut self, state: Arc<Mutex<Box<dyn State>>>) -> Self {
pub fn with_state(mut self, state: ProcState) -> Self {
trace!("Children({}): Setting state.", self.id());
self.state = Some(state);
self

View File

@ -16,7 +16,7 @@ use futures::FutureExt;
use futures_timer::Delay;
#[cfg(feature = "scaling")]
use lever::table::lotable::LOTable;
use lightproc::proc_state::{EmptyState, State};
use lightproc::proc_state::{EmptyState, ProcState, State};
use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
#[cfg(feature = "scaling")]
@ -111,7 +111,7 @@ pub struct BastionContext {
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Pin<Box<ContextState>>>,
shared_state: Option<Arc<Mutex<Box<dyn State>>>>,
shared_state: Option<ProcState>,
}
#[derive(Debug)]
@ -138,7 +138,7 @@ impl BastionContext {
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Pin<Box<ContextState>>>,
shared_state: Option<Arc<Mutex<Box<dyn State>>>>,
shared_state: Option<ProcState>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
@ -147,7 +147,7 @@ impl BastionContext {
children,
supervisor,
state,
shared_state: shared_state,
shared_state,
}
}
pub(crate) fn new(
@ -160,10 +160,35 @@ impl BastionContext {
Self::with_shared_state(id, child, children, supervisor, state, None)
}
pub fn state(&self) -> Option<&Arc<Mutex<Box<dyn State>>>> {
self.shared_state.as_ref()
pub async fn with_state<S>(&self, mut f: impl FnMut(&S)) -> Result<(), ()>
where
S: State,
{
let id = &self.id;
if let Some(state) = self.shared_state.as_ref() {
let mut locked = state.lock().unwrap();
let dyn_state = locked.as_any();
let state = dyn_state.downcast_ref::<S>().unwrap();
// .ok_or_else(|| {
// error!(
// "BastionContext({}):'Couldn't downcast state - type missmatch",
// id
// )
// });
f(state);
Ok(())
} else {
error!("BastionContext({}):'No state available", id);
Err(())
}
}
// pub fn state(&self) -> Option<&Arc<Mutex<Box<dyn State>>>> {
// self.shared_state.as_ref()
// }
/// Returns a [`ChildRef`] referencing the children group's
/// element that is linked to this `BastionContext`.
///
@ -667,7 +692,7 @@ impl BastionContext {
}
/// Sends a message (unicast) to a recipient among the target.
pub fn notify_one<M: Message>(&self, target: BroadcastTarget, message: M) {
pub fn tell_one<M: Message>(&self, target: BroadcastTarget, message: M) {
debug!(
"{:?}: notifying one member of {:?} (message type: {})",
self.current().path(),
@ -681,7 +706,25 @@ impl BastionContext {
};
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.notify_one(target, msg);
global_dispatcher.tell_one(target, msg);
}
/// Sends a message (unicast) to a recipient among the target.
pub fn tell_all<M: Message>(&self, target: BroadcastTarget, message: M) {
debug!(
"{:?}: notifying one member of {:?} (message type: {})",
self.current().path(),
target,
std::any::type_name::<M>(),
);
let msg = Arc::new(SignedMessage {
msg: Msg::tell(message),
sign: self.signature(),
});
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.tell_all(target, msg);
}
/// Sends a message (unicast) to a recipient among the target.
@ -716,7 +759,7 @@ impl BastionContext {
};
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.notify_one(target, msg);
global_dispatcher.tell_one(target, msg);
}
pub fn ask_all<M: Message>(&self, target: BroadcastTarget, message: M) -> Result<usize, ()> {

View File

@ -102,7 +102,63 @@ impl DispatcherHandler for RoundRobinHandler {
self.index.store(current_index + 1, Ordering::SeqCst);
};
}
fn tell_one(&self, entries: &DispatcherMap, message: SignedMessage) -> Result<usize, ()> {
let entries = entries
.iter()
.filter(|entry| entry.0.is_public())
.collect::<Vec<_>>();
if entries.is_empty() {
warn!("no public children to broadcast message to");
return Ok(0);
}
let current_index = self.index.load(Ordering::SeqCst) % entries.len();
if let Some(entry) = entries.get(current_index) {
trace!(
"sending message to child {}/{} - {}",
current_index + 1,
entries.len(),
entry.0.path()
);
if let Ok(_) = entry.0.tell_anonymously(message) {
self.index.store(current_index + 1, Ordering::SeqCst);
Ok(1)
} else {
trace!("couldn't send message to child");
Err(())
}
} else {
warn!(
"no public child with index {} to broadcast message to",
current_index
);
Ok(0)
}
}
fn tell_all(&self, entries: &DispatcherMap, message: Arc<SignedMessage>) -> Result<usize, ()> {
let entries = entries
.iter()
.filter(|entry| entry.0.is_public())
.collect::<Vec<_>>();
if entries.is_empty() {
debug!("no public children to broadcast message to");
return Ok(0);
}
trace!("broadcasting message to {} children", entries.len(),);
entries.iter().for_each(|entry| {
entry.0.tell_anonymously(message.clone()).unwrap();
});
Ok(entries.len())
}
}
/// Generic trait which any custom dispatcher handler must implement for
/// the further usage by the `Dispatcher` instances.
pub trait DispatcherHandler {
@ -115,6 +171,10 @@ pub trait DispatcherHandler {
);
/// Broadcasts the message to actors in according to the implemented behaviour.
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>);
/// Sends the message to one actor in a group.
fn tell_one(&self, entries: &DispatcherMap, message: SignedMessage) -> Result<usize, ()>;
/// Sends the message to all actors in a group.
fn tell_all(&self, entries: &DispatcherMap, message: impl SignedMessage + Clone) -> Result<usize, ()>;
}
/// A generic implementation of the Bastion dispatcher
@ -210,14 +270,14 @@ impl Dispatcher {
/// Gives the message to one of the dispatchers
/// according to the specified target.
pub fn notify_one(&self, message: SignedMessage) -> Result<usize, ()> {
todo!();
pub fn tell_one(&self, message: SignedMessage) -> Result<usize, ()> {
self.handler.tell_one(&self.actors, message)
}
/// Gives the message to all of the dispatchers
/// according to the specified target.
pub fn notify_all(&self, message: Arc<SignedMessage>) -> Result<usize, ()> {
todo!();
pub fn tell_all(&self, message: Arc<SignedMessage>) -> Result<usize, ()> {
self.handler.tell_all(&self.actors, message)
}
}
@ -377,7 +437,7 @@ impl GlobalDispatcher {
/// Gives the message to one of the dispatchers
/// according to the specified target.
pub(crate) fn notify_one(
pub(crate) fn tell_one(
&self,
target: BroadcastTarget,
message: SignedMessage,
@ -385,7 +445,7 @@ impl GlobalDispatcher {
let target_dispatchers = self.get_dispatchers(target);
if let Some(dispatcher) = target_dispatchers.first() {
(**dispatcher).notify_one(message);
(**dispatcher).tell_one(message);
Ok(1)
} else {
error!("The message can't be delivered. No available dispatcher.");
@ -393,6 +453,23 @@ impl GlobalDispatcher {
}
}
/// Gives the message to one of the dispatchers
/// according to the specified target.
pub(crate) fn tell_all(
&self,
target: BroadcastTarget,
message: Arc<SignedMessage>,
) -> Result<usize, ()> {
let target_dispatchers = self.get_dispatchers(target);
let mut recipients = 0;
for dispatcher in target_dispatchers {
recipients += (**dispatcher).tell_all(message.clone())?;
}
Ok(recipients)
}
/// Gives the message to all of the dispatchers
/// according to the specified target.
pub(crate) fn notify_all(