From 4cffc8d6b32bd564e00870c1efd35ccf950ded51 Mon Sep 17 00:00:00 2001 From: Bradley Odell Date: Sat, 16 Mar 2024 00:53:21 -0700 Subject: [PATCH] Initial implementation of `force_send` function. --- src/lib.rs | 107 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3b79610..09bebfe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -423,6 +423,10 @@ type ChanLock = Spinlock; #[cfg(not(feature = "spin"))] type ChanLock = Mutex; +#[cfg(feature = "spin")] +type ChanGuard<'a, T> = SpinlockGuard<'a, T>; +#[cfg(not(feature = "spin"))] +type ChanGuard<'a, T> = MutexGuard<'a, T>; type SignalVec = VecDeque>>; struct Chan { @@ -432,6 +436,42 @@ struct Chan { } impl Chan { + #[inline(always)] + fn push_waiters(mut guard: ChanGuard, msg: T) { + let mut msg = Some(msg); + loop { + let slot = guard.waiting.pop_front(); + match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) { + // No more waiting receivers and msg in queue, so break out of the loop + None if msg.is_none() => break, + // No more waiting receivers, so add msg to queue and break out of the loop + None => { + guard.queue.push_back(msg.unwrap()); + break; + } + Some((Some(m), signal)) => { + if signal.fire() { + // Was async and a stream, so didn't acquire the message. Wake another + // receiver, and do not yet push the message. + msg.replace(m); + continue; + } else { + // Was async and not a stream, so it did acquire the message. Push the + // message to the queue for it to be received. + guard.queue.push_back(m); + drop(guard); + break; + } + }, + Some((None, signal)) => { + drop(guard); + signal.fire(); + break; // Was sync, so it has acquired the message + }, + } + } + } + fn pull_pending(&mut self, pull_extra: bool) { if let Some((cap, sending)) = &mut self.sending { let effective_cap = *cap + pull_extra as usize; @@ -488,40 +528,7 @@ impl Shared { if self.is_disconnected() { Err(TrySendTimeoutError::Disconnected(msg)).into() } else if !chan.waiting.is_empty() { - let mut msg = Some(msg); - - loop { - let slot = chan.waiting.pop_front(); - match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) { - // No more waiting receivers and msg in queue, so break out of the loop - None if msg.is_none() => break, - // No more waiting receivers, so add msg to queue and break out of the loop - None => { - chan.queue.push_back(msg.unwrap()); - break; - } - Some((Some(m), signal)) => { - if signal.fire() { - // Was async and a stream, so didn't acquire the message. Wake another - // receiver, and do not yet push the message. - msg.replace(m); - continue; - } else { - // Was async and not a stream, so it did acquire the message. Push the - // message to the queue for it to be received. - chan.queue.push_back(m); - drop(chan); - break; - } - }, - Some((None, signal)) => { - drop(chan); - signal.fire(); - break; // Was sync, so it has acquired the message - }, - } - } - + Chan::push_waiters(chan, msg); Ok(()).into() } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) { chan.queue.push_back(msg); @@ -578,6 +585,29 @@ impl Shared { ) } + fn send_force(&self, msg: T) -> Result, SendError> { + let mut chan = wait_lock(&self.chan); + + if self.is_disconnected() { + Err(SendError(msg)) + } else if !chan.waiting.is_empty() { + Chan::push_waiters(chan, msg); + Ok(None) + } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) { + chan.queue.push_back(msg); + Ok(None) + } else { + // Channel is bounded and full + if let Some(old) = chan.queue.pop_front() { + chan.queue.push_back(msg); + Ok(Some(old)) + } else { + // Zero-capacity bounded channel + Ok(Some(msg)) + } + } + } + fn recv>>( &self, should_block: bool, @@ -707,6 +737,15 @@ impl Sender { }) } + /// Forcefully send a value into the channel, returning an error if all receivers have been dropped. + /// If the channel is bounded and is full, the oldest value is popped from the receiving + /// end of the channel to make space for the new value. This enables "ring buffer" functionality. + /// If the channel associated with this sender is unbounded, + /// this method has the same behaviour as [`Sender::send`]. + pub fn force_send(&self, msg: T) -> Result, SendError> { + self.shared.send_force(msg) + } + /// Send a value into the channel, returning an error if all receivers have been dropped. /// If the channel is bounded and is full, this method will block until space is available /// or all receivers have been dropped. If the channel is unbounded, this method will not