Merge branch 'master' into igni/sorted_load_perf

This commit is contained in:
o0Ignition0o 2020-08-20 20:33:10 +02:00
commit cf9ac625a3
10 changed files with 278 additions and 69 deletions

View File

@ -5,7 +5,7 @@ name = "bastion-executor"
# - Update CHANGELOG.md.
# - npm install -g auto-changelog && auto-changelog at the root
# - Create "v0.x.y" git tag at the root of the project.
version = "0.3.6-alpha.0"
version = "0.3.7-alpha.0"
description = "Cache affine NUMA-aware executor for Rust"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
keywords = ["fault-tolerant", "runtime", "actor", "system"]

View File

@ -5,11 +5,11 @@ extern crate test;
use bastion_executor::prelude::spawn;
use bastion_executor::run::run;
use futures::future::join_all;
use futures_timer::Delay;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::time::Duration;
use test::Bencher;
use futures_timer::Delay;
// Benchmark for a 10K burst task spawn
#[bench]
@ -20,7 +20,7 @@ fn spawn_lot(b: &mut Bencher) {
.map(|_| {
spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
@ -40,13 +40,16 @@ fn spawn_single(b: &mut Bencher) {
let handle = spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
);
run( async {
handle.await;
}, proc_stack)
run(
async {
handle.await;
},
proc_stack,
)
});
}

View File

@ -32,6 +32,10 @@
any(feature = "numanji", feature = "allocator-suite"),
feature(allocator_api)
)]
#![cfg_attr(
any(feature = "numanji", feature = "allocator-suite"),
feature(nonnull_slice_from_raw_parts)
)]
#[macro_use]
mod macros;

View File

@ -129,7 +129,6 @@ impl SmpStats for Stats {
sorted_load
}
fn mean(&self) -> usize {
self.mean_level.load(Ordering::SeqCst)
}

View File

@ -5,7 +5,7 @@ name = "bastion"
# - Update CHANGELOG.md.
# - npm install -g auto-changelog && auto-changelog at the root
# - Create "v0.x.y" git tag at the root of the project.
version = "0.4.2-alpha.0"
version = "0.4.3-alpha.0"
description = "Fault-tolerant Runtime for Rust applications"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
keywords = ["fault-tolerant", "runtime", "actor", "system"]
@ -50,9 +50,9 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]
[dependencies]
bastion-executor = "0.3.5"
bastion-executor = "0.3.6"
lightproc = "0.3.5"
# bastion-executor = { version = "= 0.3.5-alpha", path = "../bastion-executor" }
# bastion-executor = { version = "= 0.3.7-alpha.0", path = "../bastion-executor" }
# lightproc = { version = "= 0.3.6-alpha.0", path = "../lightproc" }
lever = "0.1.1-alpha.11"

View File

@ -1,5 +1,8 @@
use bastion::prelude::*;
use futures_timer::Delay;
use std::sync::Arc;
use std::time::Duration;
use tracing::Level;
///
/// Prologue:
@ -22,7 +25,28 @@ use std::sync::Arc;
/// 3. We want to use a dispatcher on the second group because we don't want to
/// target a particular child in the first to process the message.
///
/// The output looks like:
/// ```
/// Running `target\debug\examples\round_robin_dispatcher.exe`
/// Aug 20 16:52:19.925 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:19.926 WARN round_robin_dispatcher: Received data_1
/// Aug 20 16:52:20.932 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:20.933 WARN round_robin_dispatcher: Received data_2
/// Aug 20 16:52:21.939 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:21.941 WARN round_robin_dispatcher: Received data_3
/// Aug 20 16:52:22.947 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:22.948 WARN round_robin_dispatcher: Received data_4
/// Aug 20 16:52:23.954 WARN round_robin_dispatcher: sending message
/// Aug 20 16:52:23.955 WARN round_robin_dispatcher: Received data_5
/// ```
fn main() {
// Initialize tracing logger
// so we get nice output on the console.
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
// We need bastion to run our program
Bastion::init();
// We create the supervisor and we add both groups on it
@ -57,8 +81,10 @@ fn caller_group(children: Children) -> Children {
let target = BroadcastTarget::Group("Receiver".to_string());
// We iterate on each data
for data in data_to_send {
Delay::new(Duration::from_secs(1)).await;
tracing::warn!("sending message");
// We broadcast the message containing the data to the defined target
ctx.broadcast_message(target.clone(), data)
ctx.broadcast_message(target.clone(), data);
}
// We stop bastion here, because we don't have more data to send
Bastion::stop();
@ -70,8 +96,6 @@ fn caller_group(children: Children) -> Children {
fn receiver_group(children: Children) -> Children {
// We create the second group of children
children
// We want to have 5 children in this group
.with_redundancy(5)
// We want to have a disptacher named `Receiver`
.with_dispatcher(Dispatcher::with_type(DispatcherType::Named(
"Receiver".to_string(),
@ -93,7 +117,7 @@ fn receiver_group(children: Children) -> Children {
// Because it's a broadcasted message we can use directly the ref
ref data: &str => {
// And we print it
println!("Received {}", data);
tracing::warn!("Received {}", data);
};
_: _ => ();
}

View File

@ -19,9 +19,28 @@ pub struct ChildRef {
sender: Sender,
name: String,
path: Arc<BastionPath>,
// True if the ChildRef references a child that will receive user defined messages.
// use `ChildRef::new_internal` to set it to false, for internal use children,
// such as the heartbeat children for example
is_public: bool,
}
impl ChildRef {
pub(crate) fn new_internal(
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
) -> ChildRef {
ChildRef {
id,
sender,
name,
path,
is_public: false,
}
}
pub(crate) fn new(
id: BastionId,
sender: Sender,
@ -33,6 +52,7 @@ impl ChildRef {
sender,
name,
path,
is_public: true,
}
}
@ -67,6 +87,38 @@ impl ChildRef {
&self.id
}
/// Returns true if the child this `ChildRef` is referencing is public,
/// Which means it can receive messages. private `ChildRef`s
/// reference bastion internal children, such as the heartbeat child for example.
/// This function comes in handy when implementing your own dispatchers.
///
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx| {
/// async move {
/// if ctx.current().is_public() {
/// // ...
/// }
/// # Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
pub fn is_public(&self) -> bool {
self.is_public
}
/// Sends a message to the child this `ChildRef` is referencing.
/// This message is intended to be used outside of Bastion context when
/// there is no way for receiver to identify message sender

View File

@ -453,26 +453,24 @@ impl Children {
/// # use bastion::prelude::*;
/// # use std::time::Duration;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// children
/// .with_heartbeat_tick(Duration::from_secs(5))
/// .with_exec(|ctx| {
/// // -- Children group started.
/// async move {
/// // ...
/// # Ok(())
/// }
/// // -- Children group stopped.
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
/// [`std::time::Duration`]: https://doc.rust-lang.org/nightly/core/time/struct.Duration.html
pub fn with_heartbeat_tick(mut self, interval: Duration) -> Self {
@ -936,7 +934,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), name, path);
let child_ref = ChildRef::new_internal(id.clone(), sender.clone(), name, path);
let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();

View File

@ -306,17 +306,17 @@ impl BastionContext {
/// [`try_recv_timeout`]: #method.try_recv_timeout
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn try_recv(&self) -> Option<SignedMessage> {
debug!("BastionContext({}): Trying to receive message.", self.id);
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
self.try_recv_timeout(std::time::Duration::from_nanos(0))
.await
.map(|msg| {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
msg
})
.map_err(|e| {
trace!("BastionContext({}): Received no message.", self.id);
e
})
.ok()
}
/// Retrieves asynchronously a message received by the element
@ -420,11 +420,15 @@ impl BastionContext {
/// [`try_recv`]: #method.try_recv
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn try_recv_timeout(&self, timeout: Duration) -> Result<SignedMessage, ReceiveError> {
debug!(
"BastionContext({}): Waiting to receive message within {} milliseconds.",
self.id,
timeout.as_millis()
);
if timeout == std::time::Duration::from_nanos(0) {
debug!("BastionContext({}): Trying to receive message.", self.id);
} else {
debug!(
"BastionContext({}): Waiting to receive message within {} milliseconds.",
self.id,
timeout.as_millis()
);
}
futures::select! {
message = self.recv().fuse() => {
message.map_err(|_| ReceiveError::Other)
@ -694,3 +698,126 @@ impl Display for BastionId {
self.0.fmt(fmt)
}
}
#[cfg(test)]
mod context_tests {
use super::*;
use crate::prelude::*;
use crate::Bastion;
use std::panic;
#[test]
fn test_context() {
Bastion::init();
Bastion::start();
run_test(test_recv);
run_test(test_try_recv);
run_test(test_try_recv_fail);
run_test(test_try_recv_timeout);
run_test(test_try_recv_timeout_fail);
Bastion::stop();
Bastion::block_until_stopped();
}
fn test_recv() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
msg! { ctx.recv().await?,
ref msg: &'static str => {
assert_eq!(msg, &"test recv");
};
msg: _ => { panic!("didn't receive the expected message {:?}", msg);};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test recv")
.expect("couldn't send message");
}
fn test_try_recv() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
// make sure the message has been sent
Delay::new(std::time::Duration::from_millis(1)).await;
msg! { ctx.try_recv().await.expect("no message"),
ref msg: &'static str => {
assert_eq!(msg, &"test try recv");
};
_: _ => { panic!("didn't receive the expected message");};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test try recv")
.expect("couldn't send message");
}
fn test_try_recv_fail() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
assert!(ctx.try_recv().await.is_none());
Ok(())
})
})
.expect("Couldn't create the children group.");
// Not sending any message
}
fn test_try_recv_timeout() {
let children =
Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(1)).await.expect("recv_timeout failed"),
ref msg: &'static str => {
assert_eq!(msg, &"test recv timeout");
};
_: _ => { panic!("didn't receive the expected message");};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test recv timeout")
.expect("couldn't send message");
}
fn test_try_recv_timeout_fail() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
assert!(ctx
.try_recv_timeout(std::time::Duration::from_millis(1))
.await
.is_err());
Ok(())
})
})
.expect("Couldn't create the children group.");
// Triggering the timeout
run!(async { Delay::new(std::time::Duration::from_millis(2)).await });
// The child panicked, but we should still be able to send things to it
assert!(children.broadcast("test recv timeout").is_ok());
}
fn run_test<T>(test: T) -> ()
where
T: FnOnce() -> () + panic::UnwindSafe,
{
let result = panic::catch_unwind(|| test());
assert!(result.is_ok())
}
}

View File

@ -9,10 +9,10 @@ use lever::prelude::*;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
};
use tracing::{trace, warn};
use tracing::{debug, trace, warn};
/// Type alias for the concurrency hashmap. Each key-value pair stores
/// the Bastion identifier as the key and the module name as the value.
@ -66,7 +66,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler;
/// Dispatcher that will do simple round-robin distribution
#[derive(Default, Debug)]
pub struct RoundRobinHandler {
index: AtomicU64,
index: AtomicUsize,
}
impl DispatcherHandler for RoundRobinHandler {
@ -80,25 +80,27 @@ impl DispatcherHandler for RoundRobinHandler {
}
// Each child in turn will receive a message.
fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc<SignedMessage>) {
if entries.len() == 0 {
let entries = entries
.iter()
.filter(|entry| entry.0.is_public())
.collect::<Vec<_>>();
if entries.is_empty() {
debug!("no public children to broadcast message to");
return;
}
let current_index = self.index.load(Ordering::SeqCst) % entries.len();
let current_index = self.index.load(Ordering::SeqCst) % entries.len() as u64;
let mut skipped = 0;
for pair in entries.iter() {
if skipped != current_index {
skipped += 1;
continue;
}
let entry = pair.0;
entry.tell_anonymously(message.clone()).unwrap();
break;
}
self.index.store(current_index + 1, Ordering::SeqCst);
if let Some(entry) = entries.get(current_index) {
warn!(
"sending message to child {}/{} - {}",
current_index + 1,
entries.len(),
entry.0.path()
);
entry.0.tell_anonymously(message.clone()).unwrap();
self.index.store(current_index + 1, Ordering::SeqCst);
};
}
}
/// Generic trait which any custom dispatcher handler must implement for