diff --git a/Cargo.toml b/Cargo.toml index b733c33..38620c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "src/bastion", "src/bastion-executor", "src/bastion-utils", "src/lightproc" diff --git a/README.md b/README.md index 0da597c..e86c8cf 100644 --- a/README.md +++ b/README.md @@ -121,12 +121,16 @@ If you answer any of the questions below with yes, then Bastion is just for you: Bastion Ecosystem is here to provide you a way to customize it. If you don't need to, or if you are a newcomers, you can install and use Bastion without knowing how everything works under the hood. We hope to find you on this section soon. +### [Nuclei](https://github.com/vertexclique/nuclei) +Nuclei is proactive IO system that can be independently used without executor restriction. It is also powering Bastion's IO system. +You can learn more about Nuclei [here](https://github.com/vertexclique/nuclei), check out Nuclei's repo for more sophisticated use cases. + ### [LightProc](https://github.com/bastion-rs/bastion/tree/master/src/lightproc) LightProc is Lightweight Process abstraction for Rust. -It uses futures with lifecycle callbacks to implement Erlang like processes and contains basic pid to identify processes. -All panics inside futures are propagated to upper layers. +It uses futures with lifecycle callbacks to implement Erlang like processes and contains basic pid to identify processes. +All panics inside futures are propagated to upper layers. ### [Bastion Executor](https://github.com/bastion-rs/bastion/tree/master/src/bastion-executor) @@ -144,7 +148,7 @@ Check the [getting started example](https://github.com/bastion-rs/bastion/blob/m Include bastion to your project with: ```toml -bastion = "0.3" +bastion = "0.4" ``` For more information please check [Bastion Documentation](https://docs.rs/bastion) @@ -153,15 +157,6 @@ For more information please check [Bastion Documentation](https://docs.rs/bastio Runtime is structured by the user. Only root supervision comes in batteries-included fashion. Worker code, worker group redundancy, supervisors and their supervision strategies are defined by the user. -You can see architecture of the framework [HERE](https://github.com/bastion-rs/bastion/blob/master/img/bastion-arch.png). - -## Projects using Bastion -If you are using Bastion open a PR so we can include it in our showcase. -* Various proprietary Rust database implementations are using Bastion. -* In AWS Lambdas we have used Bastion to enable retry mechanism and try different parsing strategies for data to be processed. -* [SkyNet](https://github.com/vertexclique/skynet) (a Discord bot which is resending deleted messages) - * Skynet is running since 0.1.3 release of Bastion on the cloud and haven't killed yet. - ## License Licensed under either of diff --git a/src/bastion-executor/Cargo.toml b/src/bastion-executor/Cargo.toml index 76d8d6d..d0ea2d8 100644 --- a/src/bastion-executor/Cargo.toml +++ b/src/bastion-executor/Cargo.toml @@ -30,8 +30,8 @@ unstable = ["numanji", "allocator-suite", "jemallocator"] [dependencies] lightproc = "0.3.5" bastion-utils = "0.3.2" -# lightproc = { version = "= 0.3.6-alpha.0", path = "../lightproc" } -# bastion-utils = { version = "0.3.2", path = "../bastion-utils" } +# lightproc = { path = "../lightproc" } +# bastion-utils = { path = "../bastion-utils" } crossbeam-utils = "0.7" crossbeam-channel = "0.4" diff --git a/src/bastion/Cargo.toml b/src/bastion/Cargo.toml index a433dd4..94dbb79 100644 --- a/src/bastion/Cargo.toml +++ b/src/bastion/Cargo.toml @@ -51,8 +51,11 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [dependencies] -bastion-executor = { version = "= 0.3.5-alpha", path = "../bastion-executor" } -lightproc = { version = "= 0.3.6-alpha.0", path = "../lightproc" } +nuclei = "0.1.1" +bastion-executor = "0.3.5" +lightproc = "0.3.5" +# bastion-executor = { version = "= 0.3.5-alpha", path = "../bastion-executor" } +# lightproc = { version = "= 0.3.6-alpha.0", path = "../lightproc" } lever = "0.1.1-alpha.8" futures = "0.3.5" @@ -67,7 +70,7 @@ async-mutex = "1.1" uuid = { version = "0.8", features = ["v4"] } # Distributed -artillery-core = { version = "0.1.2-alpha.1", optional = true } +artillery-core = { version = "0.1.2-alpha.3", optional = true } # Log crates tracing-subscriber = "0.2.6" diff --git a/src/bastion/data/distwrite b/src/bastion/data/distwrite new file mode 100644 index 0000000..e69de29 diff --git a/src/bastion/examples/distributed-fwrite.rs b/src/bastion/examples/distributed-fwrite.rs new file mode 100644 index 0000000..6825181 --- /dev/null +++ b/src/bastion/examples/distributed-fwrite.rs @@ -0,0 +1,100 @@ +use bastion::prelude::*; +use futures::*; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; +use std::sync::Arc; + +/// +/// Parallel (MapReduce) job which async writes results to a single output file +/// +/// Prologue: +/// This example maps a stream of cycling values([0,1,2,3,4,0,1,2,3,4...]) one by one: +/// to 10 workers and every worker compute the double of what they receive and send back. +/// +/// Then mapper aggregates the doubled values and write them in their receiving order. +/// +/// Try increasing the worker count. Yes! +fn main() { + env_logger::init(); + + Bastion::init(); + + // Workers that process the work. + let workers = Bastion::children(|children: Children| { + children + .with_redundancy(10) // Let's have a pool of ten workers. + .with_exec(move |ctx: BastionContext| { + async move { + println!("Worker started!"); + + // Start receiving work + loop { + msg! { ctx.recv().await?, + msg: u64 =!> { + let data: u64 = msg.wrapping_mul(2); + println!("Child doubled the value of {} and gave {}", msg, data); // true + let _ = answer!(ctx, data); + }; + _: _ => (); + } + } + } + }) + }) + .expect("Couldn't start a new children group."); + + // Get a shadowed sharable reference of workers. + let workers = Arc::new(workers); + + // + // Mapper that generates work. + Bastion::children(|children: Children| { + children.with_exec(move |ctx: BastionContext| { + let workers = workers.clone(); + async move { + println!("Mapper started!"); + + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("data"); + path.push("distwrite"); + + let fo = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .unwrap(); + let mut file = Handle::::new(fo).unwrap(); + + // Distribute your workload to workers + for id_worker_pair in workers.elems().iter().enumerate() { + let data = cycle(id_worker_pair.0 as u64, 5); + + let computed: Answer = ctx.ask(&id_worker_pair.1.addr(), data).unwrap(); + msg! { computed.await?, + msg: u64 => { + // Handle the answer... + println!("Source received the computed value: {}", msg); + file.write_all(msg.to_string().as_bytes()).await.unwrap(); + }; + _: _ => (); + } + } + + // Send a signal to system that computation is finished. + Bastion::stop(); + + Ok(()) + } + }) + }) + .expect("Couldn't start a new children group."); + + Bastion::start(); + Bastion::block_until_stopped(); +} + +fn cycle(x: u64, at_most: u64) -> u64 { + let mut x = x; + x += 1; + x % at_most +} diff --git a/src/bastion/examples/tcp-servers.rs b/src/bastion/examples/tcp-servers.rs new file mode 100644 index 0000000..2be414f --- /dev/null +++ b/src/bastion/examples/tcp-servers.rs @@ -0,0 +1,55 @@ +use bastion::prelude::*; +use futures::io; +use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +async fn echo(stream: Handle) -> io::Result<()> { + io::copy(&stream, &mut &stream).await?; + Ok(()) +} + +const TCP_SERVER_COUNT: usize = 10; +static TCP_SERVERS: AtomicUsize = AtomicUsize::new(TCP_SERVER_COUNT); + +/// +/// 10 async tcp servers +/// +/// Prologue: +/// +/// This example demonstrates using 10 parallel tcp servers + +fn main() { + env_logger::init(); + + Bastion::init(); + + let _tcp_servers = Bastion::children(|children: Children| { + children + .with_redundancy(TCP_SERVER_COUNT) // Let's have 40 tcp echo servers :) + .with_exec(move |_ctx: BastionContext| { + async move { + println!("Server is starting!"); + let port = TCP_SERVERS.fetch_sub(1, Ordering::SeqCst) + 2000; + let addr = format!("127.0.0.1:{}", port); + + let listener = Handle::::bind(addr).unwrap(); + println!("Listening on {}", listener.get_ref().local_addr().unwrap()); + + // Accept clients in a loop. + loop { + let (stream, peer_addr) = listener.accept().await.unwrap(); + println!("Accepted client: {}", peer_addr); + + // Spawn a task that echoes messages from the client back to it. + spawn(echo(stream)); + } + + Ok(()) + } + }) + }) + .expect("Couldn't start a new children group."); + + Bastion::start(); + Bastion::block_until_stopped(); +} diff --git a/src/bastion/src/io.rs b/src/bastion/src/io.rs new file mode 100644 index 0000000..a79ff33 --- /dev/null +++ b/src/bastion/src/io.rs @@ -0,0 +1,5 @@ +//! +//! IO subsystem for Bastion + +pub use nuclei::join_handle::*; +pub use nuclei::*; diff --git a/src/bastion/src/lib.rs b/src/bastion/src/lib.rs index 7bc767f..2d46e77 100644 --- a/src/bastion/src/lib.rs +++ b/src/bastion/src/lib.rs @@ -74,6 +74,7 @@ pub mod context; pub mod dispatcher; pub mod envelope; pub mod executor; +pub mod io; pub mod message; pub mod path; #[cfg(feature = "scaling")] @@ -100,6 +101,7 @@ pub mod prelude { DispatcherType, NotificationType, }; pub use crate::envelope::{RefAddr, SignedMessage}; + pub use crate::io::*; pub use crate::message::{Answer, AnswerSender, Message, Msg}; pub use crate::msg; pub use crate::path::{BastionPath, BastionPathElement};