From cb78494b5c9db2137ff2d2def95049afb6ee6280 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Tue, 25 May 2021 16:43:01 +0200 Subject: [PATCH 1/2] wip: async recipient ish ? --- src/bastion/src/dispatcher.rs | 48 ++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index fcb447d9..9167af97 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -3,14 +3,15 @@ //! group of actors through the dispatchers that holds information about //! actors grouped together. use crate::{ + child::Child, child_ref::ChildRef, message::{Answer, Message}, prelude::SendError, }; use crate::{distributor::Distributor, envelope::SignedMessage}; use anyhow::Result as AnyResult; +use futures::Future; use lever::prelude::*; -use std::hash::{Hash, Hasher}; use std::sync::RwLock; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -20,6 +21,10 @@ use std::{ collections::HashMap, fmt::{self, Debug}, }; +use std::{ + hash::{Hash, Hasher}, + task::Poll, +}; use tracing::{debug, trace}; /// Type alias for the concurrency hashmap. Each key-value pair stores @@ -71,6 +76,29 @@ pub trait Recipient { /// A `RecipientHandler` is a `Recipient` implementor, that can be stored in the dispatcher pub trait RecipientHandler: Recipient + Send + Sync + Debug {} +impl Future for RoundRobinHandler { + type Output = Vec; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let recipients = self.public_recipients(); + if !recipients.is_empty() { + return Poll::Ready(recipients); + } + + self.waker.register(cx.waker()); + + let recipients = self.public_recipients(); + if !recipients.is_empty() { + return Poll::Ready(recipients); + } else { + Poll::Pending + } + } +} + impl RecipientHandler for RoundRobinHandler {} /// The default handler, which does round-robin. @@ -101,6 +129,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler; pub struct RoundRobinHandler { index: AtomicUsize, recipients: RecipientMap, + waker: futures::task::AtomicWaker, } impl RoundRobinHandler { @@ -118,6 +147,22 @@ impl RoundRobinHandler { } } +impl RoundRobinHandler { + async fn poll_next(&mut self) -> ChildRef { + let index = self.index.fetch_add(1, Ordering::SeqCst); + let recipients = self.await; + // TODO [igni]: unwrap?! + recipients + .get(index % recipients.len()) + .map(std::clone::Clone::clone) + .unwrap() + } + + async fn poll_all(&mut self) -> Vec { + self.await + } +} + impl Recipient for RoundRobinHandler { fn next(&self) -> Option { let entries = self.public_recipients(); @@ -137,6 +182,7 @@ impl Recipient for RoundRobinHandler { fn register(&self, actor: ChildRef) { let _ = self.recipients.insert(actor, ()); + self.waker.wake(); } fn remove(&self, actor: &ChildRef) { From 708f43bcf9eea9bbbf3e9517dfb8bea7bc5c7ed5 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Tue, 25 May 2021 17:14:28 +0200 Subject: [PATCH 2/2] wip --- src/bastion/src/dispatcher.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 9167af97..247d3dbc 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -3,7 +3,6 @@ //! group of actors through the dispatchers that holds information about //! actors grouped together. use crate::{ - child::Child, child_ref::ChildRef, message::{Answer, Message}, prelude::SendError, @@ -92,7 +91,7 @@ impl Future for RoundRobinHandler { let recipients = self.public_recipients(); if !recipients.is_empty() { - return Poll::Ready(recipients); + Poll::Ready(recipients) } else { Poll::Pending }