From 8ddf7a48811f95d4c1fa523042d64c6107a0f5a3 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Fri, 9 Apr 2021 23:34:05 +0200 Subject: [PATCH 1/9] proposal: distributor request fn --- src/bastion/examples/distributor.rs | 23 +++--- src/bastion/src/child_ref.rs | 54 +------------ src/bastion/src/dispatcher.rs | 3 +- src/bastion/src/distributor.rs | 113 +++++++++++++++++++++++++--- src/bastion/src/errors.rs | 50 ++++++++++++ 5 files changed, 167 insertions(+), 76 deletions(-) diff --git a/src/bastion/examples/distributor.rs b/src/bastion/examples/distributor.rs index 5b771784..a3f87371 100644 --- a/src/bastion/examples/distributor.rs +++ b/src/bastion/examples/distributor.rs @@ -80,7 +80,7 @@ struct ConferenceSchedule { misc: String, } -/// cargo r --features=tokio-runtime distributor +/// cargo r --features=tokio-runtime --example distributor #[tokio::main] async fn main() -> AnyResult<()> { let subscriber = tracing_subscriber::fmt() @@ -119,22 +119,19 @@ async fn main() -> AnyResult<()> { Bastion::start(); // Wait a bit until everyone is ready - // std::thread::sleep(std::time::Duration::from_secs(1)); + std::thread::sleep(std::time::Duration::from_secs(1)); let staff = Distributor::named("staff"); let enthusiasts = Distributor::named("enthusiasts"); let attendees = Distributor::named("attendees"); // Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" - let answer = staff.ask_one("when is the next conference going to happen?")?; - MessageHandler::new( - answer - .await - .expect("coulnd't find out when the next conference is going to happen :("), - ) - .on_tell(|reply: String, _sender_addr| { - tracing::info!("received a reply to my message:\n{}", reply); - }); + let reply: Result = staff + .request("when is the next conference going to happen?") + .recv() + .expect("couldn't receive reply"); + + tracing::warn!("{:?}", reply); // Ok("Next month!") // "hey conference is going to happen. will you be there?" // Broadcast / Question -> if people reply with YES => fill the 3rd group @@ -198,9 +195,7 @@ async fn organize_the_event(ctx: BastionContext) -> Result<(), ()> { MessageHandler::new(ctx.recv().await?) .on_question(|message: &str, sender| { tracing::info!("received a question: \n{}", message); - sender - .reply("uh i think it will be next month!".to_string()) - .unwrap(); + sender.reply("Next month!".to_string()).unwrap(); }) .on_tell(|message: &str, _| { tracing::info!("received a message: \n{}", message); diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index 57778cb5..7cd90e7e 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -2,63 +2,15 @@ //! Allows users to communicate with Child through the mailboxes. use crate::context::BastionId; use crate::envelope::{Envelope, RefAddr}; -use crate::{broadcast::Sender, message::Msg}; -use crate::{ - distributor::Distributor, - message::{Answer, BastionMessage, Message}, -}; -use crate::{path::BastionPath, system::STRING_INTERNER}; -use futures::channel::mpsc::TrySendError; +use crate::message::{Answer, BastionMessage, Message}; +use crate::path::BastionPath; +use crate::{broadcast::Sender, prelude::SendError}; use std::cmp::{Eq, PartialEq}; use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use thiserror::Error; use tracing::{debug, trace}; -#[derive(Error, Debug)] -/// `SendError`s occur when a message couldn't be dispatched through a distributor -pub enum SendError { - #[error("couldn't send message. Channel Disconnected.")] - /// Channel has been closed before we could send a message - Disconnected(Msg), - #[error("couldn't send message. Channel is Full.")] - /// Channel is full, can't send a message - Full(Msg), - #[error("couldn't send a message I should have not sent. {0}")] - /// This error is returned when we try to send a message - /// that is not a BastionMessage::Message variant - Other(anyhow::Error), - #[error("No available Distributor matching {0}")] - /// The distributor we're trying to dispatch messages to is not registered in the system - NoDistributor(String), - #[error("Distributor has 0 Recipients")] - /// The distributor we're trying to dispatch messages to has no recipients - EmptyRecipient, -} - -impl From> for SendError { - fn from(tse: TrySendError) -> Self { - let is_disconnected = tse.is_disconnected(); - match tse.into_inner().msg { - BastionMessage::Message(msg) => { - if is_disconnected { - Self::Disconnected(msg) - } else { - Self::Full(msg) - } - } - other => Self::Other(anyhow::anyhow!("{:?}", other)), - } - } -} - -impl From for SendError { - fn from(distributor: Distributor) -> Self { - Self::NoDistributor(STRING_INTERNER.resolve(distributor.interned()).to_string()) - } -} - #[derive(Debug, Clone)] /// A "reference" to an element of a children group, allowing to /// communicate with it. diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 5ec8aef7..8bd06894 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -3,8 +3,9 @@ //! group of actors through the dispatchers that holds information about //! actors grouped together. use crate::{ - child_ref::{ChildRef, SendError}, + child_ref::ChildRef, message::{Answer, Message}, + prelude::SendError, }; use crate::{distributor::Distributor, envelope::SignedMessage}; use anyhow::Result as AnyResult; diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index 7d80c6ca..e0a181be 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -1,14 +1,16 @@ //! `Distributor` is a mechanism that allows you to send messages to children. use crate::{ - child_ref::SendError, - message::{Answer, Message}, - prelude::ChildRef, + message::{Answer, Message, MessageHandler}, + prelude::{ChildRef, SendError}, system::{STRING_INTERNER, SYSTEM}, }; use anyhow::Result as AnyResult; use lasso::Spur; -use std::fmt::Debug; +use std::{ + fmt::Debug, + sync::mpsc::{channel, Receiver}, +}; // Copy is fine here because we're working // with interned strings here @@ -36,11 +38,102 @@ impl Distributor { Self(STRING_INTERNER.get_or_intern(name.as_ref())) } + /// Ask a question to a recipient attached to the `Distributor` + /// and wait for a reply. + /// + /// This can be achieved manually using a `MessageHandler` and `ask_one`. /// Ask a question to a recipient attached to the `Distributor` /// /// # Example /// - /// ```rust + /// ```no_run + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # Bastion::start(); + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// // attach a named distributor to the children + /// children + /// # .with_redundancy(1) + /// .with_distributor(Distributor::named("my distributor")) + /// .with_exec(|ctx: BastionContext| { + /// async move { + /// loop { + /// // The message handler needs an `on_question` section + /// // that matches the `question` you're going to send, + /// // and that will reply with the Type the request expects. + /// // In our example, we ask a `&str` question, and expect a `bool` reply. + /// MessageHandler::new(ctx.recv().await?) + /// .on_question(|message: &str, sender| { + /// if message == "is it raining today?" { + /// sender.reply(true).unwrap(); + /// } + /// }); + /// } + /// Ok(()) + /// } + /// }) + /// # }) + /// # }); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let reply: Result = distributor + /// .request("is it raining today?") + /// .recv() + /// .expect("couldn't receive reply"); // Ok(true) + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn request(&self, question: impl Message) -> Receiver> { + let (sender, receiver) = channel(); + match SYSTEM.dispatcher().ask(*self, question) { + Ok(response) => { + spawn!(async move { + if let Ok(message) = response.await { + let message_to_send = MessageHandler::new(message) + .on_tell(|reply: R, _| Ok(reply)) + .on_fallback(|_, _| { + Err(SendError::Other(anyhow::anyhow!( + "received a message with the wrong type" + ))) + }); + let _ = sender.send(message_to_send); + } else { + let _ = sender.send(Err(SendError::Other(anyhow::anyhow!( + "couldn't receive reply" + )))); + } + }); + } + Err(error) => { + let _ = sender.send(Err(error)); + } + }; + + receiver + } + + /// Ask a question to a recipient attached to the `Distributor` + /// + /// # Example + /// + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] @@ -93,7 +186,7 @@ impl Distributor { /// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient) /// # Example /// - /// ```rust + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] @@ -145,7 +238,7 @@ impl Distributor { /// /// # Example /// - /// ```rust + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] @@ -198,7 +291,7 @@ impl Distributor { /// Requires a `Message` that implements `Clone`. (it will be cloned and passed to each recipient) /// # Example /// - /// ```rust + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] @@ -248,7 +341,7 @@ impl Distributor { /// subscribe a `ChildRef` to the named `Distributor` /// - /// ```rust + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] @@ -301,7 +394,7 @@ impl Distributor { /// unsubscribe a `ChildRef` to the named `Distributor` /// - /// ```rust + /// ```no_run /// # use bastion::prelude::*; /// # /// # #[cfg(feature = "tokio-runtime")] diff --git a/src/bastion/src/errors.rs b/src/bastion/src/errors.rs index 4f2c7e27..07d35a07 100644 --- a/src/bastion/src/errors.rs +++ b/src/bastion/src/errors.rs @@ -4,7 +4,14 @@ //! A ReceiveError may however be raised when calling try_recv() or try_recv_timeout() //! More errors may happen in the future. +use crate::envelope::Envelope; +use crate::message::Msg; +use crate::system::STRING_INTERNER; +use crate::{distributor::Distributor, message::BastionMessage}; +use futures::channel::mpsc::TrySendError; +use std::fmt::Debug; use std::time::Duration; +use thiserror::Error; #[derive(Debug)] /// These errors happen @@ -18,3 +25,46 @@ pub enum ReceiveError { /// Generic error. Not used yet Other, } + +#[derive(Error, Debug)] +/// `SendError`s occur when a message couldn't be dispatched through a distributor +pub enum SendError { + #[error("couldn't send message. Channel Disconnected.")] + /// Channel has been closed before we could send a message + Disconnected(Msg), + #[error("couldn't send message. Channel is Full.")] + /// Channel is full, can't send a message + Full(Msg), + #[error("couldn't send a message I should have not sent. {0}")] + /// This error is returned when we try to send a message + /// that is not a BastionMessage::Message variant + Other(anyhow::Error), + #[error("No available Distributor matching {0}")] + /// The distributor we're trying to dispatch messages to is not registered in the system + NoDistributor(String), + #[error("Distributor has 0 Recipients")] + /// The distributor we're trying to dispatch messages to has no recipients + EmptyRecipient, +} + +impl From> for SendError { + fn from(tse: TrySendError) -> Self { + let is_disconnected = tse.is_disconnected(); + match tse.into_inner().msg { + BastionMessage::Message(msg) => { + if is_disconnected { + Self::Disconnected(msg) + } else { + Self::Full(msg) + } + } + other => Self::Other(anyhow::anyhow!("{:?}", other)), + } + } +} + +impl From for SendError { + fn from(distributor: Distributor) -> Self { + Self::NoDistributor(STRING_INTERNER.resolve(distributor.interned()).to_string()) + } +} From 711b588ad394f208c20d65fdfd5dd21d9e4f8c7f Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sat, 10 Apr 2021 00:17:49 +0200 Subject: [PATCH 2/9] go async by default, add a sync variant backed by mpsc::channel --- src/bastion/examples/distributor.rs | 44 +++++++------ src/bastion/src/distributor.rs | 99 ++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 23 deletions(-) diff --git a/src/bastion/examples/distributor.rs b/src/bastion/examples/distributor.rs index a3f87371..c6d794eb 100644 --- a/src/bastion/examples/distributor.rs +++ b/src/bastion/examples/distributor.rs @@ -119,7 +119,7 @@ async fn main() -> AnyResult<()> { Bastion::start(); // Wait a bit until everyone is ready - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; let staff = Distributor::named("staff"); let enthusiasts = Distributor::named("enthusiasts"); @@ -128,10 +128,10 @@ async fn main() -> AnyResult<()> { // Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" let reply: Result = staff .request("when is the next conference going to happen?") - .recv() + .await .expect("couldn't receive reply"); - tracing::warn!("{:?}", reply); // Ok("Next month!") + tracing::error!("{:?}", reply); // Ok("Next month!") // "hey conference is going to happen. will you be there?" // Broadcast / Question -> if people reply with YES => fill the 3rd group @@ -140,23 +140,25 @@ async fn main() -> AnyResult<()> { .expect("couldn't ask everyone"); for answer in answers.into_iter() { - MessageHandler::new(answer.await.expect("couldn't receive reply")) - .on_tell(|rsvp: RSVP, _| { - if rsvp.attends { - tracing::info!("{:?} will be there! :)", rsvp.child_ref.id()); - attendees - .subscribe(rsvp.child_ref) - .expect("couldn't subscribe attendee"); - } else { - tracing::error!("{:?} won't make it :(", rsvp.child_ref.id()); - } - }) - .on_fallback(|unknown, _sender_addr| { - tracing::error!( - "distributor_test: uh oh, I received a message I didn't understand\n {:?}", - unknown - ); - }); + run!(async move { + MessageHandler::new(answer.await.expect("couldn't receive reply")) + .on_tell(|rsvp: RSVP, _| { + if rsvp.attends { + tracing::info!("{:?} will be there! :)", rsvp.child_ref.id()); + attendees + .subscribe(rsvp.child_ref) + .expect("couldn't subscribe attendee"); + } else { + tracing::error!("{:?} won't make it :(", rsvp.child_ref.id()); + } + }) + .on_fallback(|unknown, _sender_addr| { + tracing::error!( + "distributor_test: uh oh, I received a message I didn't understand\n {:?}", + unknown + ); + }); + }); } // Ok now that attendees have subscribed, let's send information around! @@ -173,6 +175,7 @@ async fn main() -> AnyResult<()> { tracing::error!("total number of attendees: {}", total_sent.len()); tracing::info!("the conference is running!"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; // An attendee sends a thank you note to one staff member (and not bother everyone) @@ -180,7 +183,6 @@ async fn main() -> AnyResult<()> { .tell_one("the conference was amazing thank you so much!") .context("couldn't thank the staff members :(")?; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; // And we're done! Bastion::stop(); diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index e0a181be..bd3d28cb 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -6,6 +6,7 @@ use crate::{ system::{STRING_INTERNER, SYSTEM}, }; use anyhow::Result as AnyResult; +use futures::channel::oneshot; use lasso::Spur; use std::{ fmt::Debug, @@ -44,6 +45,97 @@ impl Distributor { /// This can be achieved manually using a `MessageHandler` and `ask_one`. /// Ask a question to a recipient attached to the `Distributor` /// + /// + /// ```no_run + /// # use bastion::prelude::*; + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # async fn run() { + /// # Bastion::init(); + /// # Bastion::start(); + /// + /// # Bastion::supervisor(|supervisor| { + /// # supervisor.children(|children| { + /// // attach a named distributor to the children + /// children + /// # .with_redundancy(1) + /// .with_distributor(Distributor::named("my distributor")) + /// .with_exec(|ctx: BastionContext| { + /// async move { + /// loop { + /// // The message handler needs an `on_question` section + /// // that matches the `question` you're going to send, + /// // and that will reply with the Type the request expects. + /// // In our example, we ask a `&str` question, and expect a `bool` reply. + /// MessageHandler::new(ctx.recv().await?) + /// .on_question(|message: &str, sender| { + /// if message == "is it raining today?" { + /// sender.reply(true).unwrap(); + /// } + /// }); + /// } + /// Ok(()) + /// } + /// }) + /// # }) + /// # }); + /// + /// let distributor = Distributor::named("my distributor"); + /// + /// let reply: Result = distributor + /// .request("is it raining today?") + /// .await + /// .expect("couldn't receive reply"); + /// + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn request( + &self, + question: impl Message, + ) -> oneshot::Receiver> { + let (sender, receiver) = oneshot::channel(); + match SYSTEM.dispatcher().ask(*self, question) { + Ok(response) => { + spawn!(async move { + if let Ok(message) = response.await { + let message_to_send = MessageHandler::new(message) + .on_tell(|reply: R, _| Ok(reply)) + .on_fallback(|_, _| { + Err(SendError::Other(anyhow::anyhow!( + "received a message with the wrong type" + ))) + }); + let _ = sender.send(message_to_send); + } else { + let _ = sender.send(Err(SendError::Other(anyhow::anyhow!( + "couldn't receive reply" + )))); + } + }); + } + Err(error) => { + let _ = sender.send(Err(error)); + } + }; + + receiver + } + + /// Ask a question to a recipient attached to the `Distributor` + /// and wait for a reply. + /// + /// this is the sync variant of the `request` function, backed by a futures::channel::oneshot /// # Example /// /// ```no_run @@ -92,7 +184,7 @@ impl Distributor { /// let distributor = Distributor::named("my distributor"); /// /// let reply: Result = distributor - /// .request("is it raining today?") + /// .request_sync("is it raining today?") /// .recv() /// .expect("couldn't receive reply"); // Ok(true) /// @@ -100,7 +192,10 @@ impl Distributor { /// # Bastion::block_until_stopped(); /// # } /// ``` - pub fn request(&self, question: impl Message) -> Receiver> { + pub fn request_sync( + &self, + question: impl Message, + ) -> Receiver> { let (sender, receiver) = channel(); match SYSTEM.dispatcher().ask(*self, question) { Ok(response) => { From e9dc4a8442ccc5da6cab2f4d5506d655688f5439 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sat, 10 Apr 2021 14:06:23 +0200 Subject: [PATCH 3/9] let the example runwith or without the tokio-runtime feature --- src/bastion/examples/distributor.rs | 37 ++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/bastion/examples/distributor.rs b/src/bastion/examples/distributor.rs index c6d794eb..5ebf7b4b 100644 --- a/src/bastion/examples/distributor.rs +++ b/src/bastion/examples/distributor.rs @@ -81,8 +81,18 @@ struct ConferenceSchedule { } /// cargo r --features=tokio-runtime --example distributor +#[cfg(feature = "tokio-runtime")] #[tokio::main] async fn main() -> AnyResult<()> { + run() +} + +#[cfg(not(feature = "tokio-runtime"))] +fn main() -> AnyResult<()> { + run() +} + +fn run() -> AnyResult<()> { let subscriber = tracing_subscriber::fmt() .with_max_level(Level::INFO) .finish(); @@ -119,17 +129,19 @@ async fn main() -> AnyResult<()> { Bastion::start(); // Wait a bit until everyone is ready - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + sleep(std::time::Duration::from_secs(5)); let staff = Distributor::named("staff"); let enthusiasts = Distributor::named("enthusiasts"); let attendees = Distributor::named("attendees"); // Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" - let reply: Result = staff - .request("when is the next conference going to happen?") - .await - .expect("couldn't receive reply"); + let reply: Result = run!(async { + staff + .request("when is the next conference going to happen?") + .await + .expect("couldn't receive reply") + }); tracing::error!("{:?}", reply); // Ok("Next month!") @@ -176,7 +188,8 @@ async fn main() -> AnyResult<()> { tracing::info!("the conference is running!"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + // Let's wait until the conference is over 8D + sleep(std::time::Duration::from_secs(5)); // An attendee sends a thank you note to one staff member (and not bother everyone) staff @@ -240,3 +253,15 @@ async fn be_interested_in_the_conference(ctx: BastionContext) -> Result<(), ()> }); } } + +#[cfg(feature = "tokio-runtime")] +fn sleep(duration: std::time::Duration) { + run!(async { + tokio::time::sleep(duration).await; + }); +} + +#[cfg(not(feature = "tokio-runtime"))] +fn sleep(duration: std::time::Duration) { + std::thread::sleep(duration); +} From 69d5c8b9a5451f9a1da691c07c189cf3221bde95 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sat, 10 Apr 2021 14:07:23 +0200 Subject: [PATCH 4/9] fix: sometimes the children weren't attached to the dispatcher --- src/bastion/src/children.rs | 10 ++++++++++ src/bastion/src/dispatcher.rs | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/bastion/src/children.rs b/src/bastion/src/children.rs index b33e6522..3c507d37 100644 --- a/src/bastion/src/children.rs +++ b/src/bastion/src/children.rs @@ -1159,6 +1159,16 @@ impl Children { Ok(()) } + /// Registers all declared local distributors in the global dispatcher. + pub(crate) fn register_distributors(&self) -> AnyResult<()> { + let global_dispatcher = SYSTEM.dispatcher(); + + for distributor in self.distributors.iter() { + global_dispatcher.register_distributor(distributor)?; + } + Ok(()) + } + /// Removes all declared local distributors from the global dispatcher. pub(crate) fn remove_distributors(&self) -> AnyResult<()> { let global_dispatcher = SYSTEM.dispatcher(); diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 8bd06894..8b1076bb 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -562,6 +562,26 @@ impl GlobalDispatcher { Ok(()) } + /// Adds distributor to the global registry. + pub(crate) fn register_distributor(&self, distributor: &Distributor) -> AnyResult<()> { + let is_registered = self.distributors.contains_key(distributor); + + if is_registered { + debug!( + "The distributor with the '{:?}' name already registered in the cluster.", + distributor + ); + return Ok(()); + } + + let instance = distributor.clone(); + self.distributors.insert( + instance, + Arc::new(Box::new(DefaultRecipientHandler::default())), + )?; + Ok(()) + } + /// Removes distributor from the global registry. pub(crate) fn remove_distributor(&self, distributor: &Distributor) -> AnyResult<()> { self.distributors.remove(distributor)?; From 2ce9ed9a87136c30c9d140e0bd8f62681f82273e Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sun, 11 Apr 2021 01:15:48 +0200 Subject: [PATCH 5/9] use RwLock for now --- src/bastion/src/child.rs | 2 +- src/bastion/src/children.rs | 2 + src/bastion/src/context.rs | 21 +--- src/bastion/src/dispatcher.rs | 84 ++++++++++---- src/bastion/src/distributor.rs | 201 ++++++++++++++++++++++++++++++--- src/bastion/src/supervisor.rs | 8 ++ 6 files changed, 263 insertions(+), 55 deletions(-) diff --git a/src/bastion/src/child.rs b/src/bastion/src/child.rs index e4b6b263..fac8bab5 100644 --- a/src/bastion/src/child.rs +++ b/src/bastion/src/child.rs @@ -416,7 +416,7 @@ impl Child { distributors .iter() .map(|&distributor| { - global_dispatcher.register_recipient(distributor, child_ref.clone()) + global_dispatcher.register_recipient(&distributor, child_ref.clone()) }) .collect::>>()?; } diff --git a/src/bastion/src/children.rs b/src/bastion/src/children.rs index 3c507d37..bd0dc368 100644 --- a/src/bastion/src/children.rs +++ b/src/bastion/src/children.rs @@ -468,6 +468,8 @@ impl Children { /// ``` /// [`RecipientHandler`]: crate::dispatcher::RecipientHandler pub fn with_distributor(mut self, distributor: Distributor) -> Self { + // Try to register the distributor as soon as we're aware of it + let _ = SYSTEM.dispatcher().register_distributor(&distributor); self.distributors.push(distributor); self } diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index a2d563cf..ff190731 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -851,11 +851,11 @@ mod context_tests { Bastion::init(); Bastion::start(); - run_test(test_recv); - run_test(test_try_recv); - run_test(test_try_recv_fail); - run_test(test_try_recv_timeout); - run_test(test_try_recv_timeout_fail); + test_recv(); + test_try_recv(); + test_try_recv_fail(); + test_try_recv_timeout(); + test_try_recv_timeout_fail(); Bastion::stop(); Bastion::block_until_stopped(); @@ -949,15 +949,6 @@ mod context_tests { run!(async { Delay::new(std::time::Duration::from_millis(2)).await }); // The child panicked, but we should still be able to send things to it - assert!(children.broadcast("test recv timeout").is_ok()); - } - - fn run_test(test: T) -> () - where - T: FnOnce() -> () + panic::UnwindSafe, - { - let result = panic::catch_unwind(|| test()); - - assert!(result.is_ok()) + children.broadcast("test recv timeout").unwrap(); } } diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 8b1076bb..4596659c 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -10,12 +10,16 @@ use crate::{ use crate::{distributor::Distributor, envelope::SignedMessage}; use anyhow::Result as AnyResult; use lever::prelude::*; -use std::fmt::{self, Debug}; use std::hash::{Hash, Hasher}; +use std::sync::RwLock; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use std::{ + collections::HashMap, + fmt::{self, Debug}, +}; use tracing::{debug, trace}; /// Type alias for the concurrency hashmap. Each key-value pair stores @@ -346,7 +350,8 @@ impl Into for String { pub(crate) struct GlobalDispatcher { /// Storage for all registered group of actors. pub dispatchers: LOTable>>, - pub distributors: LOTable>>, + // TODO: switch to LOTable once lever implements write optimized granularity + pub distributors: Arc>>>, } impl GlobalDispatcher { @@ -354,7 +359,12 @@ impl GlobalDispatcher { pub(crate) fn new() -> Self { GlobalDispatcher { dispatchers: LOTable::new(), - distributors: LOTable::new(), + distributors: Arc::new(RwLock::new(HashMap::new())) + // TODO: switch to LOTable once lever implements write optimized granularity + // distributors: LOTableBuilder::new() + //.with_concurrency(TransactionConcurrency::Optimistic) + //.with_isolation(TransactionIsolation::Serializable) + //.build(), } } @@ -496,6 +506,13 @@ impl GlobalDispatcher { fn next(&self, distributor: Distributor) -> Result, SendError> { self.distributors + .read() + .map_err(|error| { + SendError::Other(anyhow::anyhow!( + "couldn't get read lock on distributors {:?}", + error + )) + })? .get(&distributor) .map(|recipient| recipient.next()) .ok_or_else(|| SendError::from(distributor)) @@ -503,6 +520,13 @@ impl GlobalDispatcher { fn all(&self, distributor: Distributor) -> Result, SendError> { self.distributors + .read() + .map_err(|error| { + SendError::Other(anyhow::anyhow!( + "couldn't get read lock on distributors {:?}", + error + )) + })? .get(&distributor) .map(|recipient| recipient.all()) .ok_or_else(|| SendError::from(distributor)) @@ -535,17 +559,22 @@ impl GlobalDispatcher { /// Appends the information about actor to the recipients. pub(crate) fn register_recipient( &self, - distributor: Distributor, + distributor: &Distributor, child_ref: ChildRef, ) -> AnyResult<()> { - if let Some(recipient) = self.distributors.get(&distributor) { - recipient.register(child_ref); + let mut distributors = self.distributors.write().map_err(|error| { + anyhow::anyhow!("couldn't get read lock on distributors {:?}", error) + })?; + if let Some(recipients) = distributors.get(&distributor) { + recipients.register(child_ref); } else { - let actors = DefaultRecipientHandler::default(); - actors.register(child_ref); - self.distributors - .insert(distributor, Arc::new(Box::new(actors)))?; - } + let recipients = DefaultRecipientHandler::default(); + recipients.register(child_ref); + distributors.insert( + distributor.clone(), + Box::new(recipients) as Box<(dyn RecipientHandler)>, + ); + }; Ok(()) } @@ -554,37 +583,42 @@ impl GlobalDispatcher { distributor_list: &[Distributor], child_ref: ChildRef, ) -> AnyResult<()> { + let distributors = self.distributors.write().map_err(|error| { + anyhow::anyhow!("couldn't get read lock on distributors {:?}", error) + })?; distributor_list.iter().for_each(|distributor| { - if let Some(recipient) = self.distributors.get(distributor) { - recipient.remove(&child_ref); - } + distributors + .get(&distributor) + .map(|recipients| recipients.remove(&child_ref)); }); Ok(()) } /// Adds distributor to the global registry. pub(crate) fn register_distributor(&self, distributor: &Distributor) -> AnyResult<()> { - let is_registered = self.distributors.contains_key(distributor); - - if is_registered { + let mut distributors = self.distributors.write().map_err(|error| { + anyhow::anyhow!("couldn't get read lock on distributors {:?}", error) + })?; + if distributors.contains_key(&distributor) { debug!( "The distributor with the '{:?}' name already registered in the cluster.", distributor ); - return Ok(()); + } else { + distributors.insert( + distributor.clone(), + Box::new(DefaultRecipientHandler::default()), + ); } - - let instance = distributor.clone(); - self.distributors.insert( - instance, - Arc::new(Box::new(DefaultRecipientHandler::default())), - )?; Ok(()) } /// Removes distributor from the global registry. pub(crate) fn remove_distributor(&self, distributor: &Distributor) -> AnyResult<()> { - self.distributors.remove(distributor)?; + let mut distributors = self.distributors.write().map_err(|error| { + anyhow::anyhow!("couldn't get read lock on distributors {:?}", error) + })?; + distributors.remove(distributor); Ok(()) } } diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index bd3d28cb..bdcea759 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -261,8 +261,6 @@ impl Distributor { /// # }); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// /// let distributor = Distributor::named("my distributor"); /// @@ -314,8 +312,6 @@ impl Distributor { /// # }); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// /// let distributor = Distributor::named("my distributor"); /// @@ -366,8 +362,6 @@ impl Distributor { /// # }); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// /// let distributor = Distributor::named("my distributor"); /// @@ -419,8 +413,6 @@ impl Distributor { /// # }); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// /// let distributor = Distributor::named("my distributor"); /// @@ -468,8 +460,6 @@ impl Distributor { /// # }).unwrap(); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// # /// let child_ref = children.elems()[0].clone(); /// @@ -483,8 +473,7 @@ impl Distributor { /// # } /// ``` pub fn subscribe(&self, child_ref: ChildRef) -> AnyResult<()> { - let global_dispatcher = SYSTEM.dispatcher(); - global_dispatcher.register_recipient(*self, child_ref) + SYSTEM.dispatcher().register_recipient(self, child_ref) } /// unsubscribe a `ChildRef` to the named `Distributor` @@ -521,8 +510,6 @@ impl Distributor { /// # }).unwrap(); /// # /// # Bastion::start(); - /// # // Wait until everyone is up - /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// # /// let child_ref = children.elems()[0].clone(); /// @@ -544,3 +531,189 @@ impl Distributor { &self.0 } } + +#[cfg(test)] +mod distributor_tests { + use crate::prelude::*; + use futures_timer::Delay; + + #[cfg(feature = "tokio-runtime")] + #[tokio::test] + async fn distributor_tests() { + run().await; + } + + #[cfg(not(feature = "tokio-runtime"))] + #[test] + fn distributors_tests() { + run!(run()); + } + + async fn run() { + setup(); + + // Wait until all children are registered + Delay::new(std::time::Duration::from_secs(5)).await; + + test_tell().await; + test_ask().await; + test_request().await; + test_subscribe().await; + + // We don't wanna block until stopped here + // since our children are loop {} ing for ever waiting for messages + } + + async fn test_tell() { + let test_distributor = Distributor::named("test distributor"); + + test_distributor + .tell_one("don't panic and carry a towel") + .unwrap(); + + let sent = test_distributor + .tell_everyone("so long, and thanks for all the fish") + .unwrap(); + + assert_eq!( + 5, + sent.len(), + "test distributor is supposed to have 5 children" + ); + } + + async fn test_ask() { + let test_distributor = Distributor::named("test distributor"); + + let question: String = + "What is the answer to life, the universe and everything?".to_string(); + + let answer = test_distributor.ask_one(question.clone()).unwrap(); + + MessageHandler::new(answer.await.unwrap()) + .on_tell(|answer: u8, _| { + assert_eq!(42, answer); + }) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }); + + let answers = test_distributor.ask_everyone(question.clone()).unwrap(); + assert_eq!( + 5, + answers.len(), + "test distributor is supposed to have 5 children" + ); + + let meanings = futures::future::join_all(answers.into_iter().map(|answer| async { + MessageHandler::new(answer.await.unwrap()) + .on_tell(|answer: u8, _| { + assert_eq!(42, answer); + answer + }) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }) + })) + .await; + + assert_eq!( + 42 * 5, + meanings.iter().sum::(), + "5 children returning 42 should sum to 42 * 5" + ); + + let sent = test_distributor + .tell_everyone("so long, and thanks for all the fish") + .unwrap(); + assert_eq!( + 5, + sent.len(), + "test distributor is supposed to have 5 children" + ); + } + + async fn test_request() { + let test_distributor = Distributor::named("test distributor"); + + let question: String = + "What is the answer to life, the universe and everything?".to_string(); + + let answer: u8 = test_distributor + .request(question.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(42, answer); + + let answer_sync: u8 = test_distributor + .request_sync(question) + .recv() + .unwrap() + .unwrap(); + + assert_eq!(42, answer_sync); + } + + async fn test_subscribe() { + let temp_distributor = Distributor::named("temp distributor"); + + assert!( + temp_distributor.tell_one("hello!").is_err(), + "should not be able to send message to an empty distributor" + ); + + let one_child: ChildRef = Distributor::named("test distributor") + .request(()) + .await + .unwrap() + .unwrap(); + + temp_distributor.subscribe(one_child.clone()).unwrap(); + + temp_distributor + .tell_one("hello!") + .expect("should be able to send message a distributor that has a subscriber"); + + temp_distributor.unsubscribe(one_child).unwrap(); + + assert!( + temp_distributor.tell_one("hello!").is_err(), + "should not be able to send message to a distributor who's sole subscriber unsubscribed" + ); + } + + fn setup() { + Bastion::init(); + Bastion::start(); + Bastion::supervisor(|supervisor| { + supervisor.children(|children| { + children + .with_redundancy(5) + .with_distributor(Distributor::named("test distributor")) + .with_exec(|ctx| async move { + loop { + let child_ref = ctx.current().clone(); + MessageHandler::new(ctx.recv().await?) + .on_question(|question: String, sender| { + if question == "What is the answer to life, the universe and everything?".to_string() { + let _ = sender.reply(42_u8); + } else { + panic!("wrong question {}", question); + } + }) + // send your child ref + .on_question(|_: (), sender| { + let _ = sender.reply(child_ref); + }) + .on_tell(|_: &str, _| {}) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }); + } + }) + }) + }) + .unwrap(); + } +} diff --git a/src/bastion/src/supervisor.rs b/src/bastion/src/supervisor.rs index 7ddd8c39..273ea9e2 100644 --- a/src/bastion/src/supervisor.rs +++ b/src/bastion/src/supervisor.rs @@ -647,10 +647,18 @@ impl Supervisor { let children = Children::new(bcast); let mut children = init(children); debug!("Children({}): Initialized.", children.id()); + // FIXME: children group elems launched without the group itself being launched if let Err(e) = children.register_dispatchers() { warn!("couldn't register all dispatchers into the registry: {}", e); }; + if let Err(e) = children.register_distributors() { + warn!( + "couldn't register all distributors into the registry: {}", + e + ); + }; + children.launch_elems(); debug!( From fec3ea7f3f8ce832641e1daf4d39843e66b6d778 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sun, 11 Apr 2021 10:44:12 +0200 Subject: [PATCH 6/9] add an after_start callback to know exactly when the children have spawned and the distributor is ready to receive and dispatch messages to everyone --- src/bastion/src/callbacks.rs | 100 +++++++++++++++++++--- src/bastion/src/child.rs | 3 + src/bastion/src/context.rs | 2 +- src/bastion/src/distributor.rs | 150 ++++++++++++++++++++++----------- 4 files changed, 195 insertions(+), 60 deletions(-) diff --git a/src/bastion/src/callbacks.rs b/src/bastion/src/callbacks.rs index 4cc56f09..293c7c54 100644 --- a/src/bastion/src/callbacks.rs +++ b/src/bastion/src/callbacks.rs @@ -8,6 +8,7 @@ pub(crate) enum CallbackType { AfterStop, BeforeRestart, BeforeStart, + AfterStart, } #[derive(Default, Clone)] @@ -22,12 +23,12 @@ pub(crate) enum CallbackType { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { -/// # run(); +/// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { -/// # run(); +/// # run(); /// # } /// # /// # fn run() { @@ -60,6 +61,7 @@ pub(crate) enum CallbackType { /// [`Children`]: crate::children::Children pub struct Callbacks { before_start: Option>, + after_start: Option>, before_restart: Option>, after_restart: Option>, after_stop: Option>, @@ -77,12 +79,12 @@ impl Callbacks { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { - /// # run(); + /// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { - /// # run(); + /// # run(); /// # } /// # /// # fn run() { @@ -136,12 +138,12 @@ impl Callbacks { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { - /// # run(); + /// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { - /// # run(); + /// # run(); /// # } /// # /// # fn run() { @@ -191,6 +193,74 @@ impl Callbacks { self } + /// Sets the method that will get called right after the [`Supervisor`] + /// or [`Children`] is launched. + /// This method will be called after the child has subscribed to its distributors and dispatchers. + /// + /// Once the callback has run, the child has caught up it's message backlog, + /// and is waiting for new messages to process. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// # Bastion::supervisor(|supervisor| { + /// supervisor.children(|children| { + /// let callbacks = Callbacks::new() + /// .with_after_start(|| println!("Children group ready to process messages.")); + /// + /// children + /// .with_exec(|ctx| { + /// // -- Children group started. + /// // with_after_start called + /// async move { + /// // ... + /// + /// // This will stop the children group... + /// Ok(()) + /// // Note that because the children group stopped by itself, + /// // if its supervisor restarts it, its `before_start` callback + /// // will get called and not `after_restart`. + /// } + /// // -- Children group stopped. + /// }) + /// .with_callbacks(callbacks) + /// }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// + /// [`Supervisor`]: crate::supervisor::Supervisor + /// [`Children`]: crate::children::Children + /// [`with_after_restart`]: Self::with_after_restart + pub fn with_after_start(mut self, after_start: C) -> Self + where + C: Fn() + Send + Sync + 'static, + { + let after_start = Arc::new(after_start); + self.after_start = Some(after_start); + self + } + /// Sets the method that will get called before the [`Supervisor`] /// or [`Children`] is reset if: /// - the supervisor of the supervised element using this callback @@ -208,12 +278,12 @@ impl Callbacks { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { - /// # run(); + /// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { - /// # run(); + /// # run(); /// # } /// # /// # fn run() { @@ -282,12 +352,12 @@ impl Callbacks { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { - /// # run(); + /// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { - /// # run(); + /// # run(); /// # } /// # /// # fn run() { @@ -360,12 +430,12 @@ impl Callbacks { /// # #[cfg(feature = "tokio-runtime")] /// # #[tokio::main] /// # async fn main() { - /// # run(); + /// # run(); /// # } /// # /// # #[cfg(not(feature = "tokio-runtime"))] /// # fn main() { - /// # run(); + /// # run(); /// # } /// # /// # fn run() { @@ -493,6 +563,12 @@ impl Callbacks { } } + pub(crate) fn after_start(&self) { + if let Some(after_start) = &self.after_start { + after_start() + } + } + pub(crate) fn before_restart(&self) { if let Some(before_restart) = &self.before_restart { before_restart() diff --git a/src/bastion/src/child.rs b/src/bastion/src/child.rs index fac8bab5..c6db1f7e 100644 --- a/src/bastion/src/child.rs +++ b/src/bastion/src/child.rs @@ -283,6 +283,7 @@ impl Child { CallbackType::BeforeRestart => self.callbacks.before_restart(), CallbackType::AfterRestart => self.callbacks.after_restart(), CallbackType::AfterStop => self.callbacks.after_stop(), + CallbackType::AfterStart => self.callbacks.after_start(), } } @@ -312,6 +313,8 @@ impl Child { return; }; + self.callbacks.after_start(); + loop { #[cfg(feature = "scaling")] self.update_stats().await; diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index ff190731..e4d6537d 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -917,7 +917,7 @@ mod context_tests { let children = Bastion::children(|children| { children.with_exec(|ctx: BastionContext| async move { - msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(1)).await.expect("recv_timeout failed"), + msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(5)).await.expect("recv_timeout failed"), ref msg: &'static str => { assert_eq!(msg, &"test recv timeout"); }; diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index bdcea759..dd3e34df 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -535,31 +535,109 @@ impl Distributor { #[cfg(test)] mod distributor_tests { use crate::prelude::*; - use futures_timer::Delay; + use futures::channel::mpsc::channel; + use futures::{SinkExt, StreamExt}; + + #[cfg(feature = "tokio-runtime")] + #[tokio::test] + async fn subscribe_tests() { + run_subscribe_tests().await; + } + + #[cfg(not(feature = "tokio-runtime"))] + #[test] + fn subscribe_tests() { + run!(run_subscribe_tests()); + } + + async fn run_subscribe_tests() { + Bastion::init(); + Bastion::start(); + + // This channel and the use of callbacks will allow us to know when all of the children are spawned. + let (sender, receiver) = channel(1); + + let sender = sender; + Bastion::supervisor(|supervisor| { + supervisor.children(|children| { + children + .with_callbacks(Callbacks::new().with_after_start(move || { + let mut sender = sender.clone(); + spawn!(async move { + use futures::SinkExt; + sender.send(()).await + }); + })) + .with_exec(|ctx| async move { + loop { + let child_ref = ctx.current().clone(); + MessageHandler::new(ctx.recv().await?) + .on_question(|_: (), sender| { + let _ = sender.reply(child_ref); + }) + .on_tell(|_: &str, _| {}) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }); + } + }) + }) + }) + .unwrap(); + + // Wait until the child has spawned + receiver.take(1).collect::>().await; + + let temp_distributor = Distributor::named("temp distributor"); + + assert!( + temp_distributor.tell_one("hello!").is_err(), + "should not be able to send message to an empty distributor" + ); + + let one_child: ChildRef = Distributor::named("test distributor") + .request(()) + .await + .unwrap() + .unwrap(); + + temp_distributor.subscribe(one_child.clone()).unwrap(); + + temp_distributor + .tell_one("hello!") + .expect("should be able to send message a distributor that has a subscriber"); + + temp_distributor.unsubscribe(one_child).unwrap(); + + assert!( + temp_distributor.tell_one("hello!").is_err(), + "should not be able to send message to a distributor who's sole subscriber unsubscribed" + ); + + Bastion::stop(); + Bastion::block_until_stopped(); + } #[cfg(feature = "tokio-runtime")] #[tokio::test] async fn distributor_tests() { - run().await; + run_distributor_tests().await; } #[cfg(not(feature = "tokio-runtime"))] #[test] fn distributors_tests() { - run!(run()); + run!(run_distributor_tests()); } - async fn run() { - setup(); - - // Wait until all children are registered - Delay::new(std::time::Duration::from_secs(5)).await; + async fn run_distributor_tests() { + setup().await; test_tell().await; test_ask().await; test_request().await; - test_subscribe().await; + Bastion::stop(); // We don't wanna block until stopped here // since our children are loop {} ing for ever waiting for messages } @@ -622,15 +700,6 @@ mod distributor_tests { meanings.iter().sum::(), "5 children returning 42 should sum to 42 * 5" ); - - let sent = test_distributor - .tell_everyone("so long, and thanks for all the fish") - .unwrap(); - assert_eq!( - 5, - sent.len(), - "test distributor is supposed to have 5 children" - ); } async fn test_request() { @@ -655,42 +724,26 @@ mod distributor_tests { assert_eq!(42, answer_sync); } - async fn test_subscribe() { - let temp_distributor = Distributor::named("temp distributor"); - - assert!( - temp_distributor.tell_one("hello!").is_err(), - "should not be able to send message to an empty distributor" - ); - - let one_child: ChildRef = Distributor::named("test distributor") - .request(()) - .await - .unwrap() - .unwrap(); - - temp_distributor.subscribe(one_child.clone()).unwrap(); - - temp_distributor - .tell_one("hello!") - .expect("should be able to send message a distributor that has a subscriber"); + async fn setup() { + Bastion::init(); + Bastion::start(); - temp_distributor.unsubscribe(one_child).unwrap(); + const NUM_CHILDREN: usize = 5; - assert!( - temp_distributor.tell_one("hello!").is_err(), - "should not be able to send message to a distributor who's sole subscriber unsubscribed" - ); - } + // This channel and the use of callbacks will allow us to know when all of the children are spawned. + let (sender, receiver) = channel(NUM_CHILDREN); - fn setup() { - Bastion::init(); - Bastion::start(); Bastion::supervisor(|supervisor| { supervisor.children(|children| { children - .with_redundancy(5) + .with_redundancy(NUM_CHILDREN) .with_distributor(Distributor::named("test distributor")) + .with_callbacks(Callbacks::new().with_after_start(move || { + let mut sender = sender.clone(); + spawn!(async move { + sender.send(()).await + }); + })) .with_exec(|ctx| async move { loop { let child_ref = ctx.current().clone(); @@ -715,5 +768,8 @@ mod distributor_tests { }) }) .unwrap(); + + // Wait until the children have spawned + receiver.take(NUM_CHILDREN).collect::>().await; } } From 11359d22c69cbdbe270724b6dc4d2be2277231f6 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sun, 11 Apr 2021 19:22:04 +0200 Subject: [PATCH 7/9] wip --- src/bastion/src/distributor.rs | 184 ++++++++++++------------ src/bastion/tests/message_signatures.rs | 2 +- 2 files changed, 93 insertions(+), 93 deletions(-) diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index dd3e34df..8080e5fc 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -105,10 +105,11 @@ impl Distributor { question: impl Message, ) -> oneshot::Receiver> { let (sender, receiver) = oneshot::channel(); - match SYSTEM.dispatcher().ask(*self, question) { - Ok(response) => { - spawn!(async move { - if let Ok(message) = response.await { + let s = *self; + spawn!(async move { + match SYSTEM.dispatcher().ask(s, question) { + Ok(response) => match response.await { + Ok(message) => { let message_to_send = MessageHandler::new(message) .on_tell(|reply: R, _| Ok(reply)) .on_fallback(|_, _| { @@ -117,17 +118,19 @@ impl Distributor { ))) }); let _ = sender.send(message_to_send); - } else { + } + Err(e) => { let _ = sender.send(Err(SendError::Other(anyhow::anyhow!( - "couldn't receive reply" + "couldn't receive reply: {:?}", + e )))); } - }); - } - Err(error) => { - let _ = sender.send(Err(error)); - } - }; + }, + Err(error) => { + let _ = sender.send(Err(error)); + } + }; + }); receiver } @@ -197,9 +200,10 @@ impl Distributor { question: impl Message, ) -> Receiver> { let (sender, receiver) = channel(); - match SYSTEM.dispatcher().ask(*self, question) { - Ok(response) => { - spawn!(async move { + let s = *self; + spawn!(async move { + match SYSTEM.dispatcher().ask(s, question) { + Ok(response) => { if let Ok(message) = response.await { let message_to_send = MessageHandler::new(message) .on_tell(|reply: R, _| Ok(reply)) @@ -214,12 +218,12 @@ impl Distributor { "couldn't receive reply" )))); } - }); - } - Err(error) => { - let _ = sender.send(Err(error)); - } - }; + } + Err(error) => { + let _ = sender.send(Err(error)); + } + }; + }); receiver } @@ -541,16 +545,16 @@ mod distributor_tests { #[cfg(feature = "tokio-runtime")] #[tokio::test] async fn subscribe_tests() { - run_subscribe_tests().await; + run_subscribe_tests(); } #[cfg(not(feature = "tokio-runtime"))] #[test] fn subscribe_tests() { - run!(run_subscribe_tests()); + run_subscribe_tests(); } - async fn run_subscribe_tests() { + fn run_subscribe_tests() { Bastion::init(); Bastion::start(); @@ -563,10 +567,7 @@ mod distributor_tests { children .with_callbacks(Callbacks::new().with_after_start(move || { let mut sender = sender.clone(); - spawn!(async move { - use futures::SinkExt; - sender.send(()).await - }); + spawn!(async move { sender.send(()).await }); })) .with_exec(|ctx| async move { loop { @@ -586,7 +587,9 @@ mod distributor_tests { .unwrap(); // Wait until the child has spawned - receiver.take(1).collect::>().await; + run!(async { + receiver.take(1).collect::>().await; + }); let temp_distributor = Distributor::named("temp distributor"); @@ -595,12 +598,13 @@ mod distributor_tests { "should not be able to send message to an empty distributor" ); - let one_child: ChildRef = Distributor::named("test distributor") - .request(()) - .await - .unwrap() - .unwrap(); - + let one_child: ChildRef = run!(async { + Distributor::named("test distributor") + .request(()) + .await + .unwrap() + .unwrap() + }); temp_distributor.subscribe(one_child.clone()).unwrap(); temp_distributor @@ -613,36 +617,29 @@ mod distributor_tests { temp_distributor.tell_one("hello!").is_err(), "should not be able to send message to a distributor who's sole subscriber unsubscribed" ); - - Bastion::stop(); - Bastion::block_until_stopped(); } #[cfg(feature = "tokio-runtime")] #[tokio::test] async fn distributor_tests() { - run_distributor_tests().await; + run_distributor_tests(); } #[cfg(not(feature = "tokio-runtime"))] #[test] fn distributors_tests() { - run!(run_distributor_tests()); + run_distributor_tests(); } - async fn run_distributor_tests() { - setup().await; - - test_tell().await; - test_ask().await; - test_request().await; + fn run_distributor_tests() { + setup(); - Bastion::stop(); - // We don't wanna block until stopped here - // since our children are loop {} ing for ever waiting for messages + test_tell(); + test_ask(); + test_request(); } - async fn test_tell() { + fn test_tell() { let test_distributor = Distributor::named("test distributor"); test_distributor @@ -660,60 +657,64 @@ mod distributor_tests { ); } - async fn test_ask() { + fn test_ask() { let test_distributor = Distributor::named("test distributor"); let question: String = "What is the answer to life, the universe and everything?".to_string(); - let answer = test_distributor.ask_one(question.clone()).unwrap(); - - MessageHandler::new(answer.await.unwrap()) - .on_tell(|answer: u8, _| { - assert_eq!(42, answer); - }) - .on_fallback(|unknown, _sender_addr| { - panic!("unknown message\n {:?}", unknown); - }); - - let answers = test_distributor.ask_everyone(question.clone()).unwrap(); - assert_eq!( - 5, - answers.len(), - "test distributor is supposed to have 5 children" - ); - - let meanings = futures::future::join_all(answers.into_iter().map(|answer| async { + run!(async { + let answer = test_distributor.ask_one(question.clone()).unwrap(); MessageHandler::new(answer.await.unwrap()) .on_tell(|answer: u8, _| { assert_eq!(42, answer); - answer }) .on_fallback(|unknown, _sender_addr| { panic!("unknown message\n {:?}", unknown); - }) - })) - .await; - - assert_eq!( - 42 * 5, - meanings.iter().sum::(), - "5 children returning 42 should sum to 42 * 5" - ); + }); + }); + + run!(async { + let answers = test_distributor.ask_everyone(question.clone()).unwrap(); + assert_eq!( + 5, + answers.len(), + "test distributor is supposed to have 5 children" + ); + let meanings = futures::future::join_all(answers.into_iter().map(|answer| async { + MessageHandler::new(answer.await.unwrap()) + .on_tell(|answer: u8, _| { + assert_eq!(42, answer); + answer + }) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }) + })) + .await; + + assert_eq!( + 42 * 5, + meanings.iter().sum::(), + "5 children returning 42 should sum to 42 * 5" + ); + }); } - async fn test_request() { + fn test_request() { let test_distributor = Distributor::named("test distributor"); let question: String = "What is the answer to life, the universe and everything?".to_string(); - let answer: u8 = test_distributor - .request(question.clone()) - .await - .unwrap() - .unwrap(); - assert_eq!(42, answer); + run!(async { + let answer: u8 = test_distributor + .request(question.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(42, answer); + }); let answer_sync: u8 = test_distributor .request_sync(question) @@ -724,7 +725,7 @@ mod distributor_tests { assert_eq!(42, answer_sync); } - async fn setup() { + fn setup() { Bastion::init(); Bastion::start(); @@ -759,10 +760,7 @@ mod distributor_tests { .on_question(|_: (), sender| { let _ = sender.reply(child_ref); }) - .on_tell(|_: &str, _| {}) - .on_fallback(|unknown, _sender_addr| { - panic!("unknown message\n {:?}", unknown); - }); + .on_tell(|_: &str, _| {}); } }) }) @@ -770,6 +768,8 @@ mod distributor_tests { .unwrap(); // Wait until the children have spawned - receiver.take(NUM_CHILDREN).collect::>().await; + run!(async { + receiver.take(NUM_CHILDREN).collect::>().await; + }); } } diff --git a/src/bastion/tests/message_signatures.rs b/src/bastion/tests/message_signatures.rs index 8d3abe7f..6c107725 100644 --- a/src/bastion/tests/message_signatures.rs +++ b/src/bastion/tests/message_signatures.rs @@ -19,7 +19,7 @@ fn spawn_responders() -> ChildrenRef { msg! { ctx.recv().await?, msg: &'static str =!> { if msg == "Hello" { - assert!(signature!().is_sender_identified(), false); + assert!(signature!().is_sender_identified(), "sender is not identified"); answer!(ctx, "Goodbye").unwrap(); } }; From 8724256512716d1a0f905ab2755e01691d7b0f31 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sun, 11 Apr 2021 19:40:31 +0200 Subject: [PATCH 8/9] merge tests --- src/bastion/src/distributor.rs | 105 +++++++++++---------------------- 1 file changed, 36 insertions(+), 69 deletions(-) diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index 8080e5fc..6c414307 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -542,55 +542,17 @@ mod distributor_tests { use futures::channel::mpsc::channel; use futures::{SinkExt, StreamExt}; - #[cfg(feature = "tokio-runtime")] - #[tokio::test] - async fn subscribe_tests() { - run_subscribe_tests(); - } - - #[cfg(not(feature = "tokio-runtime"))] #[test] - fn subscribe_tests() { - run_subscribe_tests(); - } - - fn run_subscribe_tests() { - Bastion::init(); - Bastion::start(); - - // This channel and the use of callbacks will allow us to know when all of the children are spawned. - let (sender, receiver) = channel(1); - - let sender = sender; - Bastion::supervisor(|supervisor| { - supervisor.children(|children| { - children - .with_callbacks(Callbacks::new().with_after_start(move || { - let mut sender = sender.clone(); - spawn!(async move { sender.send(()).await }); - })) - .with_exec(|ctx| async move { - loop { - let child_ref = ctx.current().clone(); - MessageHandler::new(ctx.recv().await?) - .on_question(|_: (), sender| { - let _ = sender.reply(child_ref); - }) - .on_tell(|_: &str, _| {}) - .on_fallback(|unknown, _sender_addr| { - panic!("unknown message\n {:?}", unknown); - }); - } - }) - }) - }) - .unwrap(); + fn distributor_tests() { + setup(); - // Wait until the child has spawned - run!(async { - receiver.take(1).collect::>().await; - }); + test_tell(); + test_ask(); + test_request(); + test_subscribe(); + } + fn test_subscribe() { let temp_distributor = Distributor::named("temp distributor"); assert!( @@ -619,26 +581,6 @@ mod distributor_tests { ); } - #[cfg(feature = "tokio-runtime")] - #[tokio::test] - async fn distributor_tests() { - run_distributor_tests(); - } - - #[cfg(not(feature = "tokio-runtime"))] - #[test] - fn distributors_tests() { - run_distributor_tests(); - } - - fn run_distributor_tests() { - setup(); - - test_tell(); - test_ask(); - test_request(); - } - fn test_tell() { let test_distributor = Distributor::named("test distributor"); @@ -735,14 +677,16 @@ mod distributor_tests { let (sender, receiver) = channel(NUM_CHILDREN); Bastion::supervisor(|supervisor| { + let test_ready = sender.clone(); + let subscribe_test_ready = sender.clone(); supervisor.children(|children| { children .with_redundancy(NUM_CHILDREN) .with_distributor(Distributor::named("test distributor")) .with_callbacks(Callbacks::new().with_after_start(move || { - let mut sender = sender.clone(); + let mut test_ready = test_ready.clone(); spawn!(async move { - sender.send(()).await + test_ready.send(()).await }); })) .with_exec(|ctx| async move { @@ -763,13 +707,36 @@ mod distributor_tests { .on_tell(|_: &str, _| {}); } }) + // Subscribe / unsubscribe tests + }).children(|children| { + children + .with_distributor(Distributor::named("subscribe test")) + .with_callbacks(Callbacks::new().with_after_start(move || { + let mut subscribe_test_ready = subscribe_test_ready.clone(); + spawn!(async move { subscribe_test_ready.send(()).await }); + })) + .with_exec(|ctx| async move { + loop { + let child_ref = ctx.current().clone(); + MessageHandler::new(ctx.recv().await?) + .on_question(|_: (), sender| { + let _ = sender.reply(child_ref); + }) + .on_tell(|_: &str, _| {}) + .on_fallback(|unknown, _sender_addr| { + panic!("unknown message\n {:?}", unknown); + }); + } + }) }) }) .unwrap(); // Wait until the children have spawned run!(async { - receiver.take(NUM_CHILDREN).collect::>().await; + // NUM_CHILDREN for the test distributor group, + // 1 for the subscribe test group + receiver.take(NUM_CHILDREN + 1).collect::>().await; }); } } From 926955d42942d68436c72e1e43cb667fb4919f7f Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Sun, 11 Apr 2021 20:23:03 +0200 Subject: [PATCH 9/9] split the tokio and the regular test runner so that both pass --- src/bastion/src/context.rs | 3 - src/bastion/src/distributor.rs | 112 ++++++++++++++++++--------------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index e4d6537d..2429d86f 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -856,9 +856,6 @@ mod context_tests { test_try_recv_fail(); test_try_recv_timeout(); test_try_recv_timeout_fail(); - - Bastion::stop(); - Bastion::block_until_stopped(); } fn test_recv() { diff --git a/src/bastion/src/distributor.rs b/src/bastion/src/distributor.rs index 6c414307..23c9565d 100644 --- a/src/bastion/src/distributor.rs +++ b/src/bastion/src/distributor.rs @@ -542,8 +542,24 @@ mod distributor_tests { use futures::channel::mpsc::channel; use futures::{SinkExt, StreamExt}; + const TEST_DISTRIBUTOR: &str = "test distributor"; + const SUBSCRIBE_TEST_DISTRIBUTOR: &str = "subscribe test"; + + #[cfg(feature = "tokio-runtime")] + #[tokio::test] + async fn test_tokio_distributor() { + blocking!({ + run_tests(); + }); + } + + #[cfg(not(feature = "tokio-runtime"))] #[test] fn distributor_tests() { + run_tests(); + } + + fn run_tests() { setup(); test_tell(); @@ -561,7 +577,7 @@ mod distributor_tests { ); let one_child: ChildRef = run!(async { - Distributor::named("test distributor") + Distributor::named(SUBSCRIBE_TEST_DISTRIBUTOR) .request(()) .await .unwrap() @@ -582,7 +598,7 @@ mod distributor_tests { } fn test_tell() { - let test_distributor = Distributor::named("test distributor"); + let test_distributor = Distributor::named(TEST_DISTRIBUTOR); test_distributor .tell_one("don't panic and carry a towel") @@ -600,7 +616,7 @@ mod distributor_tests { } fn test_ask() { - let test_distributor = Distributor::named("test distributor"); + let test_distributor = Distributor::named(TEST_DISTRIBUTOR); let question: String = "What is the answer to life, the universe and everything?".to_string(); @@ -644,7 +660,7 @@ mod distributor_tests { } fn test_request() { - let test_distributor = Distributor::named("test distributor"); + let test_distributor = Distributor::named(TEST_DISTRIBUTOR); let question: String = "What is the answer to life, the universe and everything?".to_string(); @@ -679,56 +695,48 @@ mod distributor_tests { Bastion::supervisor(|supervisor| { let test_ready = sender.clone(); let subscribe_test_ready = sender.clone(); - supervisor.children(|children| { - children - .with_redundancy(NUM_CHILDREN) - .with_distributor(Distributor::named("test distributor")) - .with_callbacks(Callbacks::new().with_after_start(move || { - let mut test_ready = test_ready.clone(); - spawn!(async move { - test_ready.send(()).await - }); - })) - .with_exec(|ctx| async move { - loop { - let child_ref = ctx.current().clone(); - MessageHandler::new(ctx.recv().await?) - .on_question(|question: String, sender| { - if question == "What is the answer to life, the universe and everything?".to_string() { + supervisor + .children(|children| { + children + .with_redundancy(NUM_CHILDREN) + .with_distributor(Distributor::named(TEST_DISTRIBUTOR)) + .with_callbacks(Callbacks::new().with_after_start(move || { + let mut test_ready = test_ready.clone(); + spawn!(async move { test_ready.send(()).await }); + })) + .with_exec(|ctx| async move { + loop { + let child_ref = ctx.current().clone(); + MessageHandler::new(ctx.recv().await?) + .on_question(|_: String, sender| { let _ = sender.reply(42_u8); - } else { - panic!("wrong question {}", question); - } - }) - // send your child ref - .on_question(|_: (), sender| { - let _ = sender.reply(child_ref); - }) - .on_tell(|_: &str, _| {}); - } - }) + }) + // send your child ref + .on_question(|_: (), sender| { + let _ = sender.reply(child_ref); + }); + } + }) // Subscribe / unsubscribe tests - }).children(|children| { - children - .with_distributor(Distributor::named("subscribe test")) - .with_callbacks(Callbacks::new().with_after_start(move || { - let mut subscribe_test_ready = subscribe_test_ready.clone(); - spawn!(async move { subscribe_test_ready.send(()).await }); - })) - .with_exec(|ctx| async move { - loop { - let child_ref = ctx.current().clone(); - MessageHandler::new(ctx.recv().await?) - .on_question(|_: (), sender| { - let _ = sender.reply(child_ref); - }) - .on_tell(|_: &str, _| {}) - .on_fallback(|unknown, _sender_addr| { - panic!("unknown message\n {:?}", unknown); - }); - } - }) - }) + }) + .children(|children| { + children + .with_distributor(Distributor::named(SUBSCRIBE_TEST_DISTRIBUTOR)) + .with_callbacks(Callbacks::new().with_after_start(move || { + let mut subscribe_test_ready = subscribe_test_ready.clone(); + spawn!(async move { subscribe_test_ready.send(()).await }); + })) + .with_exec(|ctx| async move { + loop { + let child_ref = ctx.current().clone(); + MessageHandler::new(ctx.recv().await?).on_question( + |_: (), sender| { + let _ = sender.reply(child_ref); + }, + ); + } + }) + }) }) .unwrap();