Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: async recipient ish ? #333

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion src/bastion/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
};
use crate::{distributor::Distributor, envelope::SignedMessage};
use anyhow::Result as AnyResult;
use futures::Future;
use lever::prelude::*;
use std::hash::{Hash, Hasher};
use std::sync::RwLock;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand All @@ -20,6 +20,10 @@ use std::{
collections::HashMap,
fmt::{self, Debug},
};
use std::{
hash::{Hash, Hasher},
task::Poll,
};
use tracing::{debug, trace};

/// Type alias for the concurrency hashmap. Each key-value pair stores
Expand Down Expand Up @@ -71,6 +75,29 @@ pub trait Recipient {
/// A `RecipientHandler` is a `Recipient` implementor, that can be stored in the dispatcher
pub trait RecipientHandler: Recipient + Send + Sync + Debug {}

impl Future for RoundRobinHandler {
type Output = Vec<ChildRef>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let recipients = self.public_recipients();
if !recipients.is_empty() {
return Poll::Ready(recipients);
}

self.waker.register(cx.waker());

let recipients = self.public_recipients();
if !recipients.is_empty() {
Poll::Ready(recipients)
} else {
Poll::Pending
}
}
}

impl RecipientHandler for RoundRobinHandler {}

/// The default handler, which does round-robin.
Expand Down Expand Up @@ -101,6 +128,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler;
pub struct RoundRobinHandler {
index: AtomicUsize,
recipients: RecipientMap,
waker: futures::task::AtomicWaker,
}

impl RoundRobinHandler {
Expand All @@ -118,6 +146,22 @@ impl RoundRobinHandler {
}
}

impl RoundRobinHandler {
async fn poll_next(&mut self) -> ChildRef {
let index = self.index.fetch_add(1, Ordering::SeqCst);
let recipients = self.await;
// TODO [igni]: unwrap?!
recipients
.get(index % recipients.len())
.map(std::clone::Clone::clone)
.unwrap()
}

async fn poll_all(&mut self) -> Vec<ChildRef> {
self.await
}
}

impl Recipient for RoundRobinHandler {
fn next(&self) -> Option<ChildRef> {
let entries = self.public_recipients();
Expand All @@ -137,6 +181,7 @@ impl Recipient for RoundRobinHandler {

fn register(&self, actor: ChildRef) {
let _ = self.recipients.insert(actor, ());
self.waker.wake();
}

fn remove(&self, actor: &ChildRef) {
Expand Down