Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sender::force_send #143

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 73 additions & 34 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ type ChanLock<T> = Spinlock<T>;
#[cfg(not(feature = "spin"))]
type ChanLock<T> = Mutex<T>;

#[cfg(feature = "spin")]
type ChanGuard<'a, T> = SpinlockGuard<'a, T>;
#[cfg(not(feature = "spin"))]
type ChanGuard<'a, T> = MutexGuard<'a, T>;

type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
struct Chan<T> {
Expand All @@ -432,6 +436,42 @@ struct Chan<T> {
}

impl<T> Chan<T> {
#[inline(always)]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included this line to ensure there isn't a performance degradation from extracting out this function from the existing code.

fn push_waiters(mut guard: ChanGuard<Self>, msg: T) {
let mut msg = Some(msg);
loop {
let slot = guard.waiting.pop_front();
match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
// No more waiting receivers and msg in queue, so break out of the loop
None if msg.is_none() => break,
// No more waiting receivers, so add msg to queue and break out of the loop
None => {
guard.queue.push_back(msg.unwrap());
break;
}
Some((Some(m), signal)) => {
if signal.fire() {
// Was async and a stream, so didn't acquire the message. Wake another
// receiver, and do not yet push the message.
msg.replace(m);
continue;
} else {
// Was async and not a stream, so it did acquire the message. Push the
// message to the queue for it to be received.
guard.queue.push_back(m);
drop(guard);
break;
}
},
Some((None, signal)) => {
drop(guard);
signal.fire();
break; // Was sync, so it has acquired the message
},
}
}
}

fn pull_pending(&mut self, pull_extra: bool) {
if let Some((cap, sending)) = &mut self.sending {
let effective_cap = *cap + pull_extra as usize;
Expand Down Expand Up @@ -488,40 +528,7 @@ impl<T> Shared<T> {
if self.is_disconnected() {
Err(TrySendTimeoutError::Disconnected(msg)).into()
} else if !chan.waiting.is_empty() {
let mut msg = Some(msg);

loop {
let slot = chan.waiting.pop_front();
match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
// No more waiting receivers and msg in queue, so break out of the loop
None if msg.is_none() => break,
// No more waiting receivers, so add msg to queue and break out of the loop
None => {
chan.queue.push_back(msg.unwrap());
break;
}
Some((Some(m), signal)) => {
if signal.fire() {
// Was async and a stream, so didn't acquire the message. Wake another
// receiver, and do not yet push the message.
msg.replace(m);
continue;
} else {
// Was async and not a stream, so it did acquire the message. Push the
// message to the queue for it to be received.
chan.queue.push_back(m);
drop(chan);
break;
}
},
Some((None, signal)) => {
drop(chan);
signal.fire();
break; // Was sync, so it has acquired the message
},
}
}

Chan::push_waiters(chan, msg);
Ok(()).into()
} else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
chan.queue.push_back(msg);
Expand Down Expand Up @@ -578,6 +585,29 @@ impl<T> Shared<T> {
)
}

fn send_force(&self, msg: T) -> Result<Option<T>, SendError<T>> {
let mut chan = wait_lock(&self.chan);

if self.is_disconnected() {
Err(SendError(msg))
} else if !chan.waiting.is_empty() {
Chan::push_waiters(chan, msg);
Ok(None)
} else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
chan.queue.push_back(msg);
Ok(None)
} else {
// Channel is bounded and full
if let Some(old) = chan.queue.pop_front() {
chan.queue.push_back(msg);
Ok(Some(old))
} else {
// Zero-capacity bounded channel
Ok(Some(msg))
}
}
}

fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
&self,
should_block: bool,
Expand Down Expand Up @@ -707,6 +737,15 @@ impl<T> Sender<T> {
})
}

/// Forcefully send a value into the channel, returning an error if all receivers have been dropped.
/// If the channel is bounded and is full, the oldest value is popped from the receiving
/// end of the channel to make space for the new value. This enables "ring buffer" functionality.
/// If the channel associated with this sender is unbounded,
/// this method has the same behaviour as [`Sender::send`].
pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
self.shared.send_force(msg)
}

/// Send a value into the channel, returning an error if all receivers have been dropped.
/// If the channel is bounded and is full, this method will block until space is available
/// or all receivers have been dropped. If the channel is unbounded, this method will not
Expand Down