diff --git a/.github/workflows/miri.yml b/.github/workflows/miri.yml index f5a2ff7..9bb85e8 100644 --- a/.github/workflows/miri.yml +++ b/.github/workflows/miri.yml @@ -14,7 +14,7 @@ jobs: - name: Install uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-01-01 + toolchain: nightly-2021-03-20 override: true - uses: davidB/rust-cargo-make@v1 with: @@ -24,13 +24,4 @@ jobs: RUST_BACKTRACE: full RUST_LOG: 'trace' run: | - rustup component add miri - cargo miri setup - cargo clean - # Do some Bastion shake - cd src/bastion && \ - MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly dispatcher && \ - MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly path && \ - MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly broadcast && \ - MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly children_ref && \ - cd - + tools/miri.sh diff --git a/.github/workflows/sanitizers.yml b/.github/workflows/sanitizers.yml index 31fe467..0578794 100644 --- a/.github/workflows/sanitizers.yml +++ b/.github/workflows/sanitizers.yml @@ -15,7 +15,7 @@ jobs: - name: Install uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-01-01 + toolchain: nightly-2021-03-20 override: true - uses: davidB/rust-cargo-make@v1 with: diff --git a/src/bastion-executor/Cargo.toml b/src/bastion-executor/Cargo.toml index a317b99..6967dc4 100644 --- a/src/bastion-executor/Cargo.toml +++ b/src/bastion-executor/Cargo.toml @@ -30,7 +30,8 @@ tokio-runtime = ["tokio"] [dependencies] bastion-utils = "0.3.2" -lightproc = "0.3" +# lightproc = "0.3" +lightproc = { path = "../lightproc" } # bastion-utils = { path = "../bastion-utils" } crossbeam-utils = "0.8" diff --git a/src/bastion-executor/src/placement.rs b/src/bastion-executor/src/placement.rs index 0bbefdd..1d3e0a7 100644 --- a/src/bastion-executor/src/placement.rs +++ b/src/bastion-executor/src/placement.rs @@ -18,7 +18,7 @@ pub fn get_num_cores() -> Option { /// /// Sets the current threads affinity pub fn set_for_current(core_id: CoreId) { - tracing::info!("Executor: placement: set affinity on core {}", core_id.id); + tracing::trace!("Executor: placement: set affinity on core {}", core_id.id); set_for_current_helper(core_id); } diff --git a/src/bastion/Cargo.toml b/src/bastion/Cargo.toml index eb4fb2b..dda7b0f 100644 --- a/src/bastion/Cargo.toml +++ b/src/bastion/Cargo.toml @@ -52,8 +52,8 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [dependencies] bastion-executor = { git = "https://github.com/bastion-rs/bastion.git" } lightproc = "0.3" -# bastion-executor = { version = "= 0.3.7-alpha.0", path = "../bastion-executor" } -# lightproc = { version = "= 0.3", path = "../lightproc" } +# bastion-executor = { path = "../bastion-executor" } +# lightproc = { path = "../lightproc" } lever = "0.1" futures = "0.3.5" @@ -73,9 +73,12 @@ artillery-core = { version = "0.1.2-alpha.3", optional = true } # Log crates tracing-subscriber = "0.2.6" tracing = "0.1.15" -anyhow = "1.0.31" +anyhow = "1.0" crossbeam-queue = "0.3.0" log = "0.4.14" +lasso = {version = "0.5", features = ["multi-threaded"] } +once_cell = "1.7.2" +thiserror = "1.0.24" [target.'cfg(not(windows))'.dependencies] nuclei = "0.1" @@ -91,6 +94,7 @@ rayon = "1.3.1" num_cpus = "1.13.0" # hello_tokio example tokio = { version="1.1", features = ["time", "macros"] } +# bastion-executor = { path = "../bastion-executor" } bastion-executor = { git = "https://github.com/bastion-rs/bastion.git" } once_cell = "1.5.2" tokio-test = "0.4.0" diff --git a/src/bastion/examples/distributor.rs b/src/bastion/examples/distributor.rs new file mode 100644 index 0000000..5b77178 --- /dev/null +++ b/src/bastion/examples/distributor.rs @@ -0,0 +1,245 @@ +///! Create a conference. +///! +///! 1st Group +///! Staff (5) - Going to organize the event // OK +///! +///! 2nd Group +///! Enthusiasts (50) - interested in participating to the conference (haven't registered yet) // OK +///! +///! 3rd Group +///! Attendees (empty for now) - Participate +///! +///! Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" // OK +///! Broadcast / Question => Answer 0 or 1 Staff members are going to reply eventually? // OK +///! +///! Staff -> Send a Leaflet to all of the enthusiasts, letting them know that they can register. // OK +///! +///! "hey conference is going to happen. will you be there?" +///! Broadcast / Question -> if people reply with YES => fill the 3rd group +///! some enthusiasts are now attendees +///! +///! Staff -> send the actual schedule and misc infos to Attendees +///! Broadcast / Statement (Attendees) +///! +///! An attendee sends a thank you note to one staff member (and not bother everyone) +///! One message / Statement (Staff) // OK +///! +///! ```rust +///! let staff = Distributor::named("staff"); +///! let enthusiasts = Distributor::named("enthusiasts"); +///! let attendees = Disitributor::named("attendees"); +///! // Enthusiast -> Ask the whole staff "when is the conference going to happen ?" +///! ask_one(Message + Clone) -> Result, CouldNotSendError> +///! // await_one // await_all +///! // first ? means "have we been able to send the question?" +///! // it s in a month +///! let replies = staff.ask_one("when is the conference going to happen ?")?.await?; +///! ask_everyone(Message + Clone) -> Result, CouldNotSendError> +///! let participants = enthusiasts.ask_everyone("here's our super nice conference, it s happening people!").await?; +///! for participant in participants { +///! // grab the sender and add it to the attendee recipient group +///! } +///! // send the schedule +///! tell_everyone(Message + Clone) -> Result<(), CouldNotSendError> +///! attendees.tell_everyone("hey there, conf is in a week, here s where and how it s going to happen")?; +///! // send a thank you note +///! tell(Message) -> Result<(), CouldNotSendError> +///! staff.tell_one("thank's it was amazing")?; +///! children +///! .with_redundancy(10) +///! .with_distributor(Distributor::named("staff")) +///! // We create the function to exec when each children is called +///! .with_exec(move |ctx: BastionContext| async move { /* ... */ }) +///! children +///! .with_redundancy(100) +///! .with_distributor(Distributor::named("enthusiasts")) +///! // We create the function to exec when each children is called +///! .with_exec(move |ctx: BastionContext| async move { /* ... */ }) +///! children +///! .with_redundancy(0) +///! .with_distributor(Distributor::named("attendees")) +///! // We create the function to exec when each children is called +///! .with_exec(move |ctx: BastionContext| async move { /* ... */ }) +///! ``` +use anyhow::{anyhow, Context, Result as AnyResult}; +use bastion::distributor::*; +use bastion::prelude::*; +use tracing::Level; + +// true if the person attends the conference +#[derive(Debug)] +struct RSVP { + attends: bool, + child_ref: ChildRef, +} + +#[derive(Debug, Clone)] +struct ConferenceSchedule { + start: std::time::Duration, + end: std::time::Duration, + misc: String, +} + +/// cargo r --features=tokio-runtime distributor +#[tokio::main] +async fn main() -> AnyResult<()> { + let subscriber = tracing_subscriber::fmt() + .with_max_level(Level::INFO) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + // Initialize bastion + Bastion::init(); + + // 1st Group + Bastion::supervisor(|supervisor| { + supervisor.children(|children| { + // Iniit staff + // Staff (5 members) - Going to organize the event + children + .with_redundancy(5) + .with_distributor(Distributor::named("staff")) + .with_exec(organize_the_event) + }) + }) + // 2nd Group + .and_then(|_| { + Bastion::supervisor(|supervisor| { + supervisor.children(|children| { + // Enthusiasts (50) - interested in participating to the conference (haven't registered yet) + children + .with_redundancy(50) + .with_distributor(Distributor::named("enthusiasts")) + .with_exec(be_interested_in_the_conference) + }) + }) + }) + .map_err(|_| anyhow!("couldn't setup the bastion"))?; + + Bastion::start(); + + // Wait a bit until everyone is ready + // std::thread::sleep(std::time::Duration::from_secs(1)); + + let staff = Distributor::named("staff"); + let enthusiasts = Distributor::named("enthusiasts"); + let attendees = Distributor::named("attendees"); + + // Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" + let answer = staff.ask_one("when is the next conference going to happen?")?; + MessageHandler::new( + answer + .await + .expect("coulnd't find out when the next conference is going to happen :("), + ) + .on_tell(|reply: String, _sender_addr| { + tracing::info!("received a reply to my message:\n{}", reply); + }); + + // "hey conference is going to happen. will you be there?" + // Broadcast / Question -> if people reply with YES => fill the 3rd group + let answers = enthusiasts + .ask_everyone("hey, the conference is going to happen, will you be there?") + .expect("couldn't ask everyone"); + + for answer in answers.into_iter() { + MessageHandler::new(answer.await.expect("couldn't receive reply")) + .on_tell(|rsvp: RSVP, _| { + if rsvp.attends { + tracing::info!("{:?} will be there! :)", rsvp.child_ref.id()); + attendees + .subscribe(rsvp.child_ref) + .expect("couldn't subscribe attendee"); + } else { + tracing::error!("{:?} won't make it :(", rsvp.child_ref.id()); + } + }) + .on_fallback(|unknown, _sender_addr| { + tracing::error!( + "distributor_test: uh oh, I received a message I didn't understand\n {:?}", + unknown + ); + }); + } + + // Ok now that attendees have subscribed, let's send information around! + tracing::info!("Let's send invitations!"); + // Staff -> send the actual schedule and misc infos to Attendees + let total_sent = attendees + .tell_everyone(ConferenceSchedule { + start: std::time::Duration::from_secs(60), + end: std::time::Duration::from_secs(3600), + misc: "it's going to be amazing!".to_string(), + }) + .context("couldn't let everyone know the conference is available!")?; + + tracing::error!("total number of attendees: {}", total_sent.len()); + + tracing::info!("the conference is running!"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + // An attendee sends a thank you note to one staff member (and not bother everyone) + staff + .tell_one("the conference was amazing thank you so much!") + .context("couldn't thank the staff members :(")?; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // And we're done! + Bastion::stop(); + + // BEWARE, this example doesn't return + Bastion::block_until_stopped(); + + Ok(()) +} + +async fn organize_the_event(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_question(|message: &str, sender| { + tracing::info!("received a question: \n{}", message); + sender + .reply("uh i think it will be next month!".to_string()) + .unwrap(); + }) + .on_tell(|message: &str, _| { + tracing::info!("received a message: \n{}", message); + }) + .on_fallback(|unknown, _sender_addr| { + tracing::error!( + "staff: uh oh, I received a message I didn't understand\n {:?}", + unknown + ); + }); + } +} + +async fn be_interested_in_the_conference(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_tell(|message: std::sync::Arc<&str>, _| { + tracing::info!( + "child {}, received a broadcast message:\n{}", + ctx.current().id(), + message + ); + }) + .on_tell(|schedule: ConferenceSchedule, _| { + tracing::info!( + "child {}, received broadcast conference schedule!:\n{:?}", + ctx.current().id(), + schedule + ); + }) + .on_question(|message: &str, sender| { + tracing::info!("received a question: \n{}", message); + // ILL BE THERE! + sender + .reply(RSVP { + attends: rand::random(), + child_ref: ctx.current().clone(), + }) + .unwrap(); + }); + } +} diff --git a/src/bastion/examples/hello_tokio.rs b/src/bastion/examples/hello_tokio.rs index 5779286..57d88ab 100644 --- a/src/bastion/examples/hello_tokio.rs +++ b/src/bastion/examples/hello_tokio.rs @@ -1,7 +1,10 @@ +#[cfg(feature = "tokio-runtime")] use anyhow::Result as AnyResult; +#[cfg(feature = "tokio-runtime")] use bastion::prelude::*; #[cfg(feature = "tokio-runtime")] use tokio; +#[cfg(feature = "tokio-runtime")] use tracing::{error, warn, Level}; /// `cargo run --features=tokio-runtime --example hello_tokio` diff --git a/src/bastion/examples/prime_numbers.rs b/src/bastion/examples/prime_numbers.rs index 3aeb5d6..649529c 100644 --- a/src/bastion/examples/prime_numbers.rs +++ b/src/bastion/examples/prime_numbers.rs @@ -72,10 +72,10 @@ mod prime_number { fn number_or_panic(number_to_return: u128) -> u128 { // Let's roll a dice if rand::random::() % 6 == 0 { - panic!(format!( + panic!( "I was about to return {} but I chose to panic instead!", number_to_return - )) + ) } number_to_return } diff --git a/src/bastion/src/README.md b/src/bastion/src/README.md new file mode 100644 index 0000000..e1f412f --- /dev/null +++ b/src/bastion/src/README.md @@ -0,0 +1,72 @@ +Create a conference. + +1st Group +Staff (5) - Going to organize the event // OK + +2nd Group +Enthusiasts (50) - interested in participating to the conference (haven't registered yet) // OK + +3rd Group +Attendees (empty for now) - Participate + +Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" // OK +Broadcast / Question => Answer 0 or 1 Staff members are going to reply eventually? // OK + +Staff -> Send a Leaflet to all of the enthusiasts, letting them know that they can register. // OK + +"hey conference is going to happen. will you be there?" +Broadcast / Question -> if people reply with YES => fill the 3rd group +some enthusiasts are now attendees + +Staff -> send the actual schedule and misc infos to Attendees +Broadcast / Statement (Attendees) + +An attendee sends a thank you note to one staff member (and not bother everyone) +One message / Statement (Staff) // OK + +```rust + let staff = Distributor::named("staff"); + + let enthusiasts = Distributor::named("enthusiasts"); + + let attendees = Disitributor::named("attendees"); + + // Enthusiast -> Ask the whole staff "when is the conference going to happen ?" + ask_one(Message + Clone) -> Result, CouldNotSendError> + // await_one // await_all + // first ? means "have we been able to send the question?" + // it s in a month + let replies = staff.ask_one("when is the conference going to happen ?")?.await?; + + ask_everyone(Message + Clone) -> Result, CouldNotSendError> + let participants = enthusiasts.ask_everyone("here's our super nice conference, it s happening people!").await?; + + for participant in participants { + // grab the sender and add it to the attendee recipient group + } + + // send the schedule + tell_everyone(Message + Clone) -> Result<(), CouldNotSendError> + attendees.tell_everyone("hey there, conf is in a week, here s where and how it s going to happen")?; + + // send a thank you note + tell(Message) -> Result<(), CouldNotSendError> + staff.tell_one("thank's it was amazing")?; + + children + .with_redundancy(10) + .with_distributor(Distributor::named("staff")) + // We create the function to exec when each children is called + .with_exec(move |ctx: BastionContext| async move { /* ... */ }) + children + .with_redundancy(100) + .with_distributor(Distributor::named("enthusiasts")) + // We create the function to exec when each children is called + .with_exec(move |ctx: BastionContext| async move { /* ... */ }) + + children + .with_redundancy(0) + .with_distributor(Distributor::named("attendees")) + // We create the function to exec when each children is called + .with_exec(move |ctx: BastionContext| async move { /* ... */ }) +``` diff --git a/src/bastion/src/bastion.rs b/src/bastion/src/bastion.rs index cbeb9b1..82cf1b3 100644 --- a/src/bastion/src/bastion.rs +++ b/src/bastion/src/bastion.rs @@ -258,7 +258,7 @@ impl Bastion { std::panic::set_hook(Box::new(|_| ())); } - lazy_static::initialize(&SYSTEM); + let _ = &SYSTEM; } /// Creates a new [`Supervisor`], passes it through the specified diff --git a/src/bastion/src/child.rs b/src/bastion/src/child.rs index e53e437..e4b6b26 100644 --- a/src/bastion/src/child.rs +++ b/src/bastion/src/child.rs @@ -125,12 +125,14 @@ impl Child { fn stopped(&mut self) { debug!("Child({}): Stopped.", self.id()); self.remove_from_dispatchers(); + let _ = self.remove_from_distributors(); self.bcast.stopped(); } fn faulted(&mut self) { debug!("Child({}): Faulted.", self.id()); self.remove_from_dispatchers(); + let _ = self.remove_from_distributors(); let parent = self.bcast.parent().clone().into_children().unwrap(); let path = self.bcast.path().clone(); @@ -305,6 +307,10 @@ impl Child { error!("couldn't add actor to the registry: {}", e); return; }; + if let Err(e) = self.register_to_distributors() { + error!("couldn't add actor to the distributors: {}", e); + return; + }; loop { #[cfg(feature = "scaling")] @@ -400,6 +406,35 @@ impl Child { Ok(()) } + /// Adds the actor into each distributor declared in the parent node. + fn register_to_distributors(&self) -> AnyResult<()> { + if let Some(parent) = self.bcast.parent().clone().into_children() { + let child_ref = self.child_ref.clone(); + let distributors = parent.distributors(); + + let global_dispatcher = SYSTEM.dispatcher(); + distributors + .iter() + .map(|&distributor| { + global_dispatcher.register_recipient(distributor, child_ref.clone()) + }) + .collect::>>()?; + } + Ok(()) + } + + /// Cleanup the actor's record from each declared distributor. + fn remove_from_distributors(&self) -> AnyResult<()> { + if let Some(parent) = self.bcast.parent().clone().into_children() { + let child_ref = self.child_ref.clone(); + let distributors = parent.distributors(); + + let global_dispatcher = SYSTEM.dispatcher(); + global_dispatcher.remove_recipient(distributors, child_ref)?; + } + Ok(()) + } + /// Cleanup the actor's record from each declared dispatcher. fn remove_from_dispatchers(&self) { if let Some(parent) = self.bcast.parent().clone().into_children() { diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index 0ecef05..57778cb 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -1,16 +1,64 @@ //! //! Allows users to communicate with Child through the mailboxes. -use crate::broadcast::Sender; use crate::context::BastionId; use crate::envelope::{Envelope, RefAddr}; -use crate::message::{Answer, BastionMessage, Message}; -use crate::path::BastionPath; +use crate::{broadcast::Sender, message::Msg}; +use crate::{ + distributor::Distributor, + message::{Answer, BastionMessage, Message}, +}; +use crate::{path::BastionPath, system::STRING_INTERNER}; +use futures::channel::mpsc::TrySendError; use std::cmp::{Eq, PartialEq}; use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use thiserror::Error; use tracing::{debug, trace}; +#[derive(Error, Debug)] +/// `SendError`s occur when a message couldn't be dispatched through a distributor +pub enum SendError { + #[error("couldn't send message. Channel Disconnected.")] + /// Channel has been closed before we could send a message + Disconnected(Msg), + #[error("couldn't send message. Channel is Full.")] + /// Channel is full, can't send a message + Full(Msg), + #[error("couldn't send a message I should have not sent. {0}")] + /// This error is returned when we try to send a message + /// that is not a BastionMessage::Message variant + Other(anyhow::Error), + #[error("No available Distributor matching {0}")] + /// The distributor we're trying to dispatch messages to is not registered in the system + NoDistributor(String), + #[error("Distributor has 0 Recipients")] + /// The distributor we're trying to dispatch messages to has no recipients + EmptyRecipient, +} + +impl From> for SendError { + fn from(tse: TrySendError) -> Self { + let is_disconnected = tse.is_disconnected(); + match tse.into_inner().msg { + BastionMessage::Message(msg) => { + if is_disconnected { + Self::Disconnected(msg) + } else { + Self::Full(msg) + } + } + other => Self::Other(anyhow::anyhow!("{:?}", other)), + } + } +} + +impl From for SendError { + fn from(distributor: Distributor) -> Self { + Self::NoDistributor(STRING_INTERNER.resolve(distributor.interned()).to_string()) + } +} + #[derive(Debug, Clone)] /// A "reference" to an element of a children group, allowing to /// communicate with it. @@ -215,6 +263,75 @@ impl ChildRef { self.send(env).map_err(|env| env.into_msg().unwrap()) } + /// Try to send a message to the child this `ChildRef` is referencing. + /// This message is intended to be used outside of Bastion context when + /// there is no way for receiver to identify message sender + /// + /// This method returns `()` if it succeeded, or a `SendError`(../child_ref/enum.SendError.html) + /// otherwise. + /// + /// # Argument + /// + /// * `msg` - The message to send. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// // The message that will be "told"... + /// const TELL_MSG: &'static str = "A message containing data (tell)."; + /// + /// # let children_ref = + /// // Create a new child... + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// // ...which will receive the message "told"... + /// msg! { ctx.recv().await?, + /// msg: &'static str => { + /// assert_eq!(msg, TELL_MSG); + /// // Handle the message... + /// }; + /// // This won't happen because this example + /// // only "tells" a `&'static str`... + /// _: _ => (); + /// } + /// + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// + /// # let child_ref = &children_ref.elems()[0]; + /// // Later, the message is "told" to the child... + /// child_ref.try_tell_anonymously(TELL_MSG).expect("Couldn't send the message."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn try_tell_anonymously(&self, msg: M) -> Result<(), SendError> { + debug!("ChildRef({}): Try Telling message: {:?}", self.id(), msg); + let msg = BastionMessage::tell(msg); + let env = Envelope::from_dead_letters(msg); + self.try_send(env).map_err(Into::into) + } + /// Sends a message to the child this `ChildRef` is referencing, /// allowing it to answer. /// This message is intended to be used outside of Bastion context when @@ -302,7 +419,6 @@ impl ChildRef { /// # Bastion::block_until_stopped(); /// # } /// ``` - /// pub fn ask_anonymously(&self, msg: M) -> Result { debug!("ChildRef({}): Asking message: {:?}", self.id(), msg); let (msg, answer) = BastionMessage::ask(msg, self.addr()); @@ -313,6 +429,100 @@ impl ChildRef { Ok(answer) } + /// Try to send a message to the child this `ChildRef` is referencing, + /// allowing it to answer. + /// This message is intended to be used outside of Bastion context when + /// there is no way for receiver to identify message sender + /// + /// This method returns [`Answer`](../message/struct.Answer.html) if it succeeded, or a `SendError`(../child_ref/enum.SendError.html) + /// otherwise. + /// + /// # Argument + /// + /// * `msg` - The message to send. + /// + /// # Example + /// + /// ``` + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// // The message that will be "asked"... + /// const ASK_MSG: &'static str = "A message containing data (ask)."; + /// // The message the will be "answered"... + /// const ANSWER_MSG: &'static str = "A message containing data (answer)."; + /// + /// # let children_ref = + /// // Create a new child... + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// // ...which will receive the message asked... + /// msg! { ctx.recv().await?, + /// msg: &'static str =!> { + /// assert_eq!(msg, ASK_MSG); + /// // Handle the message... + /// + /// // ...and eventually answer to it... + /// answer!(ctx, ANSWER_MSG); + /// }; + /// // This won't happen because this example + /// // only "asks" a `&'static str`... + /// _: _ => (); + /// } + /// + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(move |ctx: BastionContext| { + /// # let child_ref = children_ref.elems()[0].clone(); + /// # async move { + /// // Later, the message is "asked" to the child... + /// let answer: Answer = child_ref.try_ask_anonymously(ASK_MSG).expect("Couldn't send the message."); + /// + /// // ...and the child's answer is received... + /// msg! { answer.await.expect("Couldn't receive the answer."), + /// msg: &'static str => { + /// assert_eq!(msg, ANSWER_MSG); + /// // Handle the answer... + /// }; + /// // This won't happen because this example + /// // only answers a `&'static str`... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn try_ask_anonymously(&self, msg: M) -> Result { + debug!("ChildRef({}): Try Asking message: {:?}", self.id(), msg); + let (msg, answer) = BastionMessage::ask(msg, self.addr()); + let env = Envelope::from_dead_letters(msg); + self.try_send(env).map(|_| answer) + } + /// Sends a message to the child this `ChildRef` is referencing /// to tell it to stop its execution. /// @@ -425,6 +635,11 @@ impl ChildRef { .map_err(|err| err.into_inner()) } + pub(crate) fn try_send(&self, env: Envelope) -> Result<(), SendError> { + trace!("ChildRef({}): Sending message: {:?}", self.id(), env); + self.sender.unbounded_send(env).map_err(Into::into) + } + pub(crate) fn sender(&self) -> &Sender { &self.sender } diff --git a/src/bastion/src/children.rs b/src/bastion/src/children.rs index a70a3d2..b33e652 100644 --- a/src/bastion/src/children.rs +++ b/src/bastion/src/children.rs @@ -1,6 +1,5 @@ //! //! Children are a group of child supervised under a supervisor -use crate::broadcast::{Broadcast, Parent, Sender}; use crate::callbacks::{CallbackType, Callbacks}; use crate::child::{Child, Init}; use crate::child_ref::ChildRef; @@ -13,6 +12,10 @@ use crate::path::BastionPathElement; #[cfg(feature = "scaling")] use crate::resizer::{ActorGroupStats, OptimalSizeExploringResizer, ScalingRule}; use crate::system::SYSTEM; +use crate::{ + broadcast::{Broadcast, Parent, Sender}, + distributor::Distributor, +}; use anyhow::Result as AnyResult; use bastion_executor::pool; @@ -106,6 +109,7 @@ pub struct Children { started: bool, // List of dispatchers attached to each actor in the group. dispatchers: Vec>>, + distributors: Vec, // The name of children name: Option, #[cfg(feature = "scaling")] @@ -131,6 +135,7 @@ impl Children { let pre_start_msgs = Vec::new(); let started = false; let dispatchers = Vec::new(); + let distributors = Vec::new(); let name = None; #[cfg(feature = "scaling")] let resizer = Box::new(OptimalSizeExploringResizer::default()); @@ -146,6 +151,7 @@ impl Children { pre_start_msgs, started, dispatchers, + distributors, name, #[cfg(feature = "scaling")] resizer, @@ -240,7 +246,9 @@ impl Children { .map(|dispatcher| dispatcher.dispatcher_type()) .collect(); - ChildrenRef::new(id, sender, path, children, dispatchers) + let distributors = self.distributors.clone(); + + ChildrenRef::new(id, sender, path, children, dispatchers, distributors) } /// Sets the name of this children group. @@ -420,6 +428,50 @@ impl Children { self } + /// Appends a distributor to the children. + /// + /// By default supervised elements aren't added to any distributor. + /// + /// # Arguments + /// + /// * `distributor` - An instance of struct that implements the + /// [`RecipientHandler`] trait. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children + /// .with_distributor(Distributor::named("my distributor")) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// [`RecipientHandler`]: crate::dispatcher::RecipientHandler + pub fn with_distributor(mut self, distributor: Distributor) -> Self { + self.distributors.push(distributor); + self + } + #[cfg(feature = "scaling")] /// Sets a custom resizer for the Children. /// @@ -450,6 +502,7 @@ impl Children { /// # /// Bastion::children(|children| { /// children + /// .with_redundancy(1) /// .with_resizer( /// OptimalSizeExploringResizer::default() /// .with_lower_bound(10) @@ -462,7 +515,7 @@ impl Children { /// # Bastion::block_until_stopped(); /// # } /// ``` - pub fn with_resizer(mut self, mut resizer: OptimalSizeExploringResizer) -> Self { + pub fn with_resizer(mut self, resizer: OptimalSizeExploringResizer) -> Self { self.redundancy = resizer.lower_bound() as usize; self.resizer = Box::new(resizer); self @@ -646,6 +699,9 @@ impl Children { if let Err(e) = self.remove_dispatchers() { warn!("couldn't remove all dispatchers from the registry: {}", e); }; + if let Err(e) = self.remove_distributors() { + warn!("couldn't remove all distributors from the registry: {}", e); + }; self.bcast.stopped(); } @@ -654,6 +710,9 @@ impl Children { if let Err(e) = self.remove_dispatchers() { warn!("couldn't remove all dispatchers from the registry: {}", e); }; + if let Err(e) = self.remove_distributors() { + warn!("couldn't remove all distributors from the registry: {}", e); + }; self.bcast.faulted(); } @@ -1099,4 +1158,14 @@ impl Children { } Ok(()) } + + /// Removes all declared local distributors from the global dispatcher. + pub(crate) fn remove_distributors(&self) -> AnyResult<()> { + let global_dispatcher = SYSTEM.dispatcher(); + + for distributor in self.distributors.iter() { + global_dispatcher.remove_distributor(distributor)?; + } + Ok(()) + } } diff --git a/src/bastion/src/children_ref.rs b/src/bastion/src/children_ref.rs index 3359500..5f08ae2 100644 --- a/src/bastion/src/children_ref.rs +++ b/src/bastion/src/children_ref.rs @@ -1,13 +1,13 @@ //! //! Allows users to communicate with children through the mailboxes. use crate::broadcast::Sender; -use crate::child_ref::ChildRef; use crate::context::BastionId; use crate::dispatcher::DispatcherType; use crate::envelope::Envelope; use crate::message::{BastionMessage, Message}; use crate::path::BastionPath; use crate::system::SYSTEM; +use crate::{child_ref::ChildRef, distributor::Distributor}; use std::cmp::{Eq, PartialEq}; use std::fmt::Debug; use std::sync::Arc; @@ -22,6 +22,7 @@ pub struct ChildrenRef { path: Arc, children: Vec, dispatchers: Vec, + distributors: Vec, } impl ChildrenRef { @@ -31,6 +32,7 @@ impl ChildrenRef { path: Arc, children: Vec, dispatchers: Vec, + distributors: Vec, ) -> Self { ChildrenRef { id, @@ -38,6 +40,7 @@ impl ChildrenRef { path, children, dispatchers, + distributors, } } @@ -116,6 +119,40 @@ impl ChildrenRef { &self.dispatchers } + /// Returns a list of distributors that can be used for + /// communication with other actors in the same group(s). + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let distributors = children_ref.distributors(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn distributors(&self) -> &Vec { + &self.distributors + } + /// Returns a list of [`ChildRef`] referencing the elements /// of the children group this `ChildrenRef` is referencing. /// diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 6766777..5ec8aef 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -2,8 +2,11 @@ //! Special module that allows users to interact and communicate with a //! group of actors through the dispatchers that holds information about //! actors grouped together. -use crate::child_ref::ChildRef; -use crate::envelope::SignedMessage; +use crate::{ + child_ref::{ChildRef, SendError}, + message::{Answer, Message}, +}; +use crate::{distributor::Distributor, envelope::SignedMessage}; use anyhow::Result as AnyResult; use lever::prelude::*; use std::fmt::{self, Debug}; @@ -18,6 +21,10 @@ use tracing::{debug, trace}; /// the Bastion identifier as the key and the module name as the value. pub type DispatcherMap = LOTable; +/// Type alias for the recipients hashset. +/// Each key-value pair stores the Bastion identifier as the key. +pub type RecipientMap = LOTable; + #[derive(Debug, Clone)] /// Defines types of the notifications handled by the dispatcher /// when the group of actors is changing. @@ -43,6 +50,27 @@ pub enum BroadcastTarget { Group(String), } +/// A `Recipient` is responsible for maintaining it's list +/// of recipients, and deciding which child gets to receive which message. +pub trait Recipient { + /// Provide this function to declare which recipient will receive the next message + fn next(&self) -> Option; + /// Return all recipients that will receive a broadcast message + fn all(&self) -> Vec; + /// Add this actor to your list of recipients + fn register(&self, actor: ChildRef); + /// Remove this actor from your list of recipients + fn remove(&self, actor: &ChildRef); +} + +/// A `RecipientHandler` is a `Recipient` implementor, that can be stored in the dispatcher +pub trait RecipientHandler: Recipient + Send + Sync + Debug {} + +impl RecipientHandler for RoundRobinHandler {} + +/// The default handler, which does round-robin. +pub type DefaultRecipientHandler = RoundRobinHandler; + #[derive(Debug, Clone, Eq, PartialEq)] /// Defines the type of the dispatcher. /// @@ -67,6 +95,48 @@ pub type DefaultDispatcherHandler = RoundRobinHandler; #[derive(Default, Debug)] pub struct RoundRobinHandler { index: AtomicUsize, + recipients: RecipientMap, +} + +impl RoundRobinHandler { + fn public_recipients(&self) -> Vec { + self.recipients + .iter() + .filter_map(|entry| { + if entry.0.is_public() { + Some(entry.0) + } else { + None + } + }) + .collect() + } +} + +impl Recipient for RoundRobinHandler { + fn next(&self) -> Option { + let entries = self.public_recipients(); + + if entries.is_empty() { + return None; + } + + let current_index = self.index.load(Ordering::SeqCst) % entries.len(); + self.index.store(current_index + 1, Ordering::SeqCst); + entries.get(current_index).map(std::clone::Clone::clone) + } + + fn all(&self) -> Vec { + self.public_recipients() + } + + fn register(&self, actor: ChildRef) { + let _ = self.recipients.insert(actor, ()); + } + + fn remove(&self, actor: &ChildRef) { + let _ = self.recipients.remove(&actor); + } } impl DispatcherHandler for RoundRobinHandler { @@ -80,25 +150,31 @@ impl DispatcherHandler for RoundRobinHandler { } // Each child in turn will receive a message. fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc) { - let entries = entries + let public_childrefs = entries .iter() - .filter(|entry| entry.0.is_public()) + .filter_map(|entry| { + if entry.0.is_public() { + Some(entry.0) + } else { + None + } + }) .collect::>(); - if entries.is_empty() { + if public_childrefs.is_empty() { debug!("no public children to broadcast message to"); return; } - let current_index = self.index.load(Ordering::SeqCst) % entries.len(); + let current_index = self.index.load(Ordering::SeqCst) % public_childrefs.len(); - if let Some(entry) = entries.get(current_index) { + if let Some(entry) = public_childrefs.get(current_index) { debug!( "sending message to child {}/{} - {}", current_index + 1, entries.len(), - entry.0.path() + entry.path() ); - entry.0.tell_anonymously(message.clone()).unwrap(); + entry.tell_anonymously(message.clone()).unwrap(); self.index.store(current_index + 1, Ordering::SeqCst); }; } @@ -269,6 +345,7 @@ impl Into for String { pub(crate) struct GlobalDispatcher { /// Storage for all registered group of actors. pub dispatchers: LOTable>>, + pub distributors: LOTable>>, } impl GlobalDispatcher { @@ -276,6 +353,7 @@ impl GlobalDispatcher { pub(crate) fn new() -> Self { GlobalDispatcher { dispatchers: LOTable::new(), + distributors: LOTable::new(), } } @@ -332,19 +410,17 @@ impl GlobalDispatcher { /// Broadcasts the given message in according with the specified target. pub(crate) fn broadcast_message(&self, target: BroadcastTarget, message: &Arc) { - let mut acked_dispatchers: Vec = Vec::new(); - - match target { + let acked_dispatchers = match target { BroadcastTarget::All => self .dispatchers .iter() .map(|pair| pair.0.name().into()) - .for_each(|group_name| acked_dispatchers.push(group_name)), + .collect(), BroadcastTarget::Group(name) => { let target_dispatcher = name.into(); - acked_dispatchers.push(target_dispatcher); + vec![target_dispatcher] } - } + }; for dispatcher_type in acked_dispatchers { match self.dispatchers.get(&dispatcher_type) { @@ -363,6 +439,74 @@ impl GlobalDispatcher { } } + pub(crate) fn tell(&self, distributor: Distributor, message: M) -> Result<(), SendError> + where + M: Message, + { + let child = self.next(distributor)?.ok_or(SendError::EmptyRecipient)?; + child.try_tell_anonymously(message).map(Into::into) + } + + pub(crate) fn ask(&self, distributor: Distributor, message: M) -> Result + where + M: Message, + { + let child = self.next(distributor)?.ok_or(SendError::EmptyRecipient)?; + child.try_ask_anonymously(message).map(Into::into) + } + + pub(crate) fn ask_everyone( + &self, + distributor: Distributor, + message: M, + ) -> Result, SendError> + where + M: Message + Clone, + { + let all_children = self.all(distributor)?; + if all_children.is_empty() { + Err(SendError::EmptyRecipient) + } else { + all_children + .iter() + .map(|child| child.try_ask_anonymously(message.clone())) + .collect::, _>>() + } + } + + pub(crate) fn tell_everyone( + &self, + distributor: Distributor, + message: M, + ) -> Result, SendError> + where + M: Message + Clone, + { + let all_children = self.all(distributor)?; + if all_children.is_empty() { + Err(SendError::EmptyRecipient) + } else { + all_children + .iter() + .map(|child| child.try_tell_anonymously(message.clone())) + .collect() + } + } + + fn next(&self, distributor: Distributor) -> Result, SendError> { + self.distributors + .get(&distributor) + .map(|recipient| recipient.next()) + .ok_or_else(|| SendError::from(distributor)) + } + + fn all(&self, distributor: Distributor) -> Result, SendError> { + self.distributors + .get(&distributor) + .map(|recipient| recipient.all()) + .ok_or_else(|| SendError::from(distributor)) + } + /// Adds dispatcher to the global registry. pub(crate) fn register_dispatcher(&self, dispatcher: &Arc>) -> AnyResult<()> { let dispatcher_type = dispatcher.dispatcher_type(); @@ -386,6 +530,42 @@ impl GlobalDispatcher { self.dispatchers.remove(&dispatcher.dispatcher_type())?; Ok(()) } + + /// Appends the information about actor to the recipients. + pub(crate) fn register_recipient( + &self, + distributor: Distributor, + child_ref: ChildRef, + ) -> AnyResult<()> { + if let Some(recipient) = self.distributors.get(&distributor) { + recipient.register(child_ref); + } else { + let actors = DefaultRecipientHandler::default(); + actors.register(child_ref); + self.distributors + .insert(distributor, Arc::new(Box::new(actors)))?; + } + Ok(()) + } + + pub(crate) fn remove_recipient( + &self, + distributor_list: &[Distributor], + child_ref: ChildRef, + ) -> AnyResult<()> { + distributor_list.iter().for_each(|distributor| { + if let Some(recipient) = self.distributors.get(distributor) { + recipient.remove(&child_ref); + } + }); + Ok(()) + } + + /// Removes distributor from the global registry. + pub(crate) fn remove_distributor(&self, distributor: &Distributor) -> AnyResult<()> { + self.distributors.remove(distributor)?; + Ok(()) + } } #[cfg(test)] diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs new file mode 100644 index 0000000..7d80c6c --- /dev/null +++ b/src/bastion/src/distributor.rs @@ -0,0 +1,358 @@ +//! `Distributor` is a mechanism that allows you to send messages to children. + +use crate::{ + child_ref::SendError, + message::{Answer, Message}, + prelude::ChildRef, + system::{STRING_INTERNER, SYSTEM}, +}; +use anyhow::Result as AnyResult; +use lasso::Spur; +use std::fmt::Debug; + +// Copy is fine here because we're working +// with interned strings here +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +/// The `Distributor` is the main message passing mechanism we will use. +/// it provides methods that will allow us to send messages +/// and add/remove actors to the Distribution list +pub struct Distributor(Spur); + +impl Distributor { + /// Create a new distributor to send messages to + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # + /// # fn run() { + /// # + /// let distributor = Distributor::named("my target group"); + /// // distributor is now ready to use + /// # } + /// ``` + pub fn named(name: impl AsRef) -> Self { + Self(STRING_INTERNER.get_or_intern(name.as_ref())) + } + + /// Ask a question to a recipient attached to the `Distributor` + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// children + /// .with_redundancy(1) + /// .with_distributor(Distributor::named("my distributor")) + /// .with_exec(|ctx: BastionContext| { // ... + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// }) + /// # }) + /// # }); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let answer: Answer = distributor.ask_one("hello?").expect("couldn't send question"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn ask_one(&self, question: impl Message) -> Result { + SYSTEM.dispatcher().ask(*self, question) + } + + /// Ask a question to all recipients attached to the `Distributor` + /// + /// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient) + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// # children + /// # .with_redundancy(1) + /// # .with_distributor(Distributor::named("my distributor")) + /// # .with_exec(|ctx: BastionContext| { + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// # }) + /// # }) + /// # }); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let answer: Vec = distributor.ask_everyone("hello?".to_string()).expect("couldn't send question"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn ask_everyone(&self, question: impl Message + Clone) -> Result, SendError> { + SYSTEM.dispatcher().ask_everyone(*self, question) + } + + /// Send a Message to a recipient attached to the `Distributor` + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// # children + /// # .with_redundancy(1) + /// # .with_distributor(Distributor::named("my distributor")) + /// # .with_exec(|ctx: BastionContext| { + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// # }) + /// # }) + /// # }); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let answer: () = distributor.tell_one("hello?").expect("couldn't send question"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn tell_one(&self, message: impl Message) -> Result<(), SendError> { + SYSTEM.dispatcher().tell(*self, message) + } + + /// Send a Message to each recipient attached to the `Distributor` + /// + /// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient) + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// # children + /// # .with_redundancy(1) + /// # .with_distributor(Distributor::named("my distributor")) + /// # .with_exec(|ctx: BastionContext| { + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// # }) + /// # }) + /// # }); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let answer: () = distributor.tell_one("hello?").expect("couldn't send question"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn tell_everyone(&self, message: impl Message + Clone) -> Result, SendError> { + SYSTEM.dispatcher().tell_everyone(*self, message) + } + + /// subscribe a `ChildRef` to the named `Distributor` + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # let children = + /// # Bastion::children(|children| { + /// # children + /// # .with_redundancy(1) + /// # .with_distributor(Distributor::named("my distributor")) + /// # .with_exec(|ctx: BastionContext| { + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// # + /// let child_ref = children.elems()[0].clone(); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// // child_ref will now be elligible to receive messages dispatched through distributor + /// distributor.subscribe(child_ref).expect("couldn't subscribe child to distributor"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn subscribe(&self, child_ref: ChildRef) -> AnyResult<()> { + let global_dispatcher = SYSTEM.dispatcher(); + global_dispatcher.register_recipient(*self, child_ref) + } + + /// unsubscribe a `ChildRef` to the named `Distributor` + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # let children = + /// # Bastion::children(|children| { + /// # children + /// # .with_redundancy(1) + /// # .with_distributor(Distributor::named("my distributor")) + /// # .with_exec(|ctx: BastionContext| { + /// # async move { + /// # loop { + /// # let _: Option = ctx.try_recv().await; + /// # } + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # // Wait until everyone is up + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// # + /// let child_ref = children.elems()[0].clone(); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// // child_ref will not receive messages dispatched through the distributor anymore + /// distributor.unsubscribe(child_ref).expect("couldn't unsubscribe child to distributor"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn unsubscribe(&self, child_ref: ChildRef) -> AnyResult<()> { + let global_dispatcher = SYSTEM.dispatcher(); + global_dispatcher.remove_recipient(&vec![*self], child_ref) + } + + pub(crate) fn interned(&self) -> &Spur { + &self.0 + } +} diff --git a/src/bastion/src/lib.rs b/src/bastion/src/lib.rs index 61769b8..c89337e 100644 --- a/src/bastion/src/lib.rs +++ b/src/bastion/src/lib.rs @@ -89,6 +89,8 @@ pub mod supervisor; pub mod errors; +pub mod distributor; + distributed_api! { // pub mod dist_messages; pub mod distributed; @@ -108,6 +110,7 @@ pub mod prelude { BroadcastTarget, DefaultDispatcherHandler, Dispatcher, DispatcherHandler, DispatcherMap, DispatcherType, NotificationType, }; + pub use crate::distributor::Distributor; pub use crate::envelope::{RefAddr, SignedMessage}; pub use crate::errors::*; #[cfg(not(target_os = "windows"))] diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 796936d..f4e28ee 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -1063,7 +1063,7 @@ impl MessageHandler { /// specific type. pub fn on_tell(self, f: F) -> MessageHandler where - T: 'static, + T: Debug + 'static, F: FnOnce(T, RefAddr) -> O, { match self.try_into_tell::() { @@ -1081,6 +1081,7 @@ impl MessageHandler { } fn try_into_question(self) -> Result<(T, AnswerSender), MessageHandler> { + debug!("try_into_question with type {}", std::any::type_name::()); match self.state.take_message() { Ok(SignedMessage { msg: @@ -1102,6 +1103,10 @@ impl MessageHandler { fn try_into_broadcast( self, ) -> Result<(Arc, RefAddr), MessageHandler> { + debug!( + "try_into_broadcast with type {}", + std::any::type_name::() + ); match self.state.take_message() { Ok(SignedMessage { msg: Msg(MsgInner::Broadcast(msg)), @@ -1116,7 +1121,8 @@ impl MessageHandler { } } - fn try_into_tell(self) -> Result<(T, RefAddr), MessageHandler> { + fn try_into_tell(self) -> Result<(T, RefAddr), MessageHandler> { + debug!("try_into_tell with type {}", std::any::type_name::()); match self.state.take_message() { Ok(SignedMessage { msg: Msg(MsgInner::Tell(msg)), @@ -1125,7 +1131,6 @@ impl MessageHandler { let msg: Box = msg; Ok((*msg.downcast::().unwrap(), sign)) } - Ok(anything) => Err(MessageHandler::new(anything)), Err(output) => Err(MessageHandler::matched(output)), } diff --git a/src/bastion/src/path.rs b/src/bastion/src/path.rs index 1c9011c..ee856ae 100644 --- a/src/bastion/src/path.rs +++ b/src/bastion/src/path.rs @@ -249,19 +249,8 @@ impl fmt::Display for BastionPath { impl fmt::Debug for BastionPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.this { - Some(this @ BastionPathElement::Supervisor(_)) => write!( - f, - "/{}", - self.parent_chain - .iter() - .map(|id| BastionPathElement::Supervisor(id.clone())) - .chain(vec![this.clone()]) - .map(|el| format!("{:?}", el)) - .collect::>() - .join("/") - ), - // TODO: combine with the pattern above when or-patterns become stable - Some(this @ BastionPathElement::Children(_)) => write!( + Some(this @ BastionPathElement::Supervisor(_)) + | Some(this @ BastionPathElement::Children(_)) => write!( f, "/{}", self.parent_chain diff --git a/src/bastion/src/system.rs b/src/bastion/src/system.rs index 69f229b..9ff74f1 100644 --- a/src/bastion/src/system.rs +++ b/src/bastion/src/system.rs @@ -12,15 +12,17 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use futures::{pending, poll}; use fxhash::{FxHashMap, FxHashSet}; -use lazy_static::lazy_static; +use lasso::ThreadedRodeo; use lightproc::prelude::*; +use once_cell::sync::Lazy; use std::sync::{Arc, Condvar, Mutex}; use std::task::Poll; use tracing::{debug, error, info, trace, warn}; -lazy_static! { - pub(crate) static ref SYSTEM: GlobalSystem = System::init(); -} +pub(crate) static STRING_INTERNER: Lazy> = + Lazy::new(|| Arc::new(Default::default())); + +pub(crate) static SYSTEM: Lazy = Lazy::new(System::init); pub(crate) struct GlobalSystem { sender: Sender, diff --git a/tools/miri.sh b/tools/miri.sh new file mode 100755 index 0000000..e0775f4 --- /dev/null +++ b/tools/miri.sh @@ -0,0 +1,10 @@ +rustup component add miri +cargo miri setup +cargo clean +# Do some Bastion shake +pushd src/bastion && \ + MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly dispatcher && \ + MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly path && \ + MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly broadcast && \ + MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly children_ref && \ +popd