diff options
Diffstat (limited to 'vendor/flume/src')
-rw-r--r-- | vendor/flume/src/async.rs | 543 | ||||
-rw-r--r-- | vendor/flume/src/lib.rs | 1142 | ||||
-rw-r--r-- | vendor/flume/src/select.rs | 405 | ||||
-rw-r--r-- | vendor/flume/src/signal.rs | 33 |
4 files changed, 2123 insertions, 0 deletions
diff --git a/vendor/flume/src/async.rs b/vendor/flume/src/async.rs new file mode 100644 index 0000000..fae44d4 --- /dev/null +++ b/vendor/flume/src/async.rs @@ -0,0 +1,543 @@ +//! Futures and other types that allow asynchronous interaction with channels. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, + any::Any, + ops::Deref, +}; +use crate::*; +use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture}; +use futures_sink::Sink; +use spin1::Mutex as Spinlock; + +struct AsyncSignal { + waker: Spinlock<Waker>, + woken: AtomicBool, + stream: bool, +} + +impl AsyncSignal { + fn new(cx: &Context, stream: bool) -> Self { + AsyncSignal { + waker: Spinlock::new(cx.waker().clone()), + woken: AtomicBool::new(false), + stream, + } + } +} + +impl Signal for AsyncSignal { + fn fire(&self) -> bool { + self.woken.store(true, Ordering::SeqCst); + self.waker.lock().wake_by_ref(); + self.stream + } + + fn as_any(&self) -> &(dyn Any + 'static) { self } + fn as_ptr(&self) -> *const () { self as *const _ as *const () } +} + +impl<T> Hook<T, AsyncSignal> { + // Update the hook to point to the given Waker. + // Returns whether the hook has been previously awakened + fn update_waker(&self, cx_waker: &Waker) -> bool { + let mut waker = self.1.waker.lock(); + let woken = self.1.woken.load(Ordering::SeqCst); + if !waker.will_wake(cx_waker) { + *waker = cx_waker.clone(); + + // Avoid the edge case where the waker was woken just before the wakers were + // swapped. + if woken { + cx_waker.wake_by_ref(); + } + } + woken + } +} + +#[derive(Clone)] +enum OwnedOrRef<'a, T> { + Owned(T), + Ref(&'a T), +} + +impl<'a, T> Deref for OwnedOrRef<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + match self { + OwnedOrRef::Owned(arc) => &arc, + OwnedOrRef::Ref(r) => r, + } + } +} + +impl<T> Sender<T> { + /// Asynchronously send a value into the channel, returning an error if all receivers have been + /// dropped. If the channel is bounded and is full, the returned future will yield to the async + /// runtime. + /// + /// In the current implementation, the returned future will not yield to the async runtime if the + /// channel is unbounded. This may change in later versions. + pub fn send_async(&self, item: T) -> SendFut<T> { + SendFut { + sender: OwnedOrRef::Ref(&self), + hook: Some(SendState::NotYetSent(item)), + } + } + + /// Convert this sender into a future that asynchronously sends a single message into the channel, + /// returning an error if all receivers have been dropped. If the channel is bounded and is full, + /// this future will yield to the async runtime. + /// + /// In the current implementation, the returned future will not yield to the async runtime if the + /// channel is unbounded. This may change in later versions. + pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> { + SendFut { + sender: OwnedOrRef::Owned(self), + hook: Some(SendState::NotYetSent(item)), + } + } + + /// Create an asynchronous sink that uses this sender to asynchronously send messages into the + /// channel. The sender will continue to be usable after the sink has been dropped. + /// + /// In the current implementation, the returned sink will not yield to the async runtime if the + /// channel is unbounded. This may change in later versions. + pub fn sink(&self) -> SendSink<'_, T> { + SendSink(SendFut { + sender: OwnedOrRef::Ref(&self), + hook: None, + }) + } + + /// Convert this sender into a sink that allows asynchronously sending messages into the channel. + /// + /// In the current implementation, the returned sink will not yield to the async runtime if the + /// channel is unbounded. This may change in later versions. + pub fn into_sink<'a>(self) -> SendSink<'a, T> { + SendSink(SendFut { + sender: OwnedOrRef::Owned(self), + hook: None, + }) + } +} + +enum SendState<T> { + NotYetSent(T), + QueuedItem(Arc<Hook<T, AsyncSignal>>), +} + +/// A future that sends a value into a channel. +/// +/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`]. +#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] +pub struct SendFut<'a, T> { + sender: OwnedOrRef<'a, Sender<T>>, + // Only none after dropping + hook: Option<SendState<T>>, +} + +impl<T> std::marker::Unpin for SendFut<'_, T> {} + +impl<'a, T> SendFut<'a, T> { + /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called + /// on drop and just before `start_send` in the `Sink` implementation. + fn reset_hook(&mut self) { + if let Some(SendState::QueuedItem(hook)) = self.hook.take() { + let hook: Arc<Hook<T, dyn Signal>> = hook; + wait_lock(&self.sender.shared.chan).sending + .as_mut() + .unwrap().1 + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + } + } + + /// See [`Sender::is_disconnected`]. + pub fn is_disconnected(&self) -> bool { + self.sender.is_disconnected() + } + + /// See [`Sender::is_empty`]. + pub fn is_empty(&self) -> bool { + self.sender.is_empty() + } + + /// See [`Sender::is_full`]. + pub fn is_full(&self) -> bool { + self.sender.is_full() + } + + /// See [`Sender::len`]. + pub fn len(&self) -> usize { + self.sender.len() + } + + /// See [`Sender::capacity`]. + pub fn capacity(&self) -> Option<usize> { + self.sender.capacity() + } +} + +impl<'a, T> Drop for SendFut<'a, T> { + fn drop(&mut self) { + self.reset_hook() + } +} + + +impl<'a, T> Future for SendFut<'a, T> { + type Output = Result<(), SendError<T>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() { + if hook.is_empty() { + Poll::Ready(Ok(())) + } else if self.sender.shared.is_disconnected() { + let item = hook.try_take(); + self.hook = None; + match item { + Some(item) => Poll::Ready(Err(SendError(item))), + None => Poll::Ready(Ok(())), + } + } else { + hook.update_waker(cx.waker()); + Poll::Pending + } + } else if let Some(SendState::NotYetSent(item)) = self.hook.take() { + let this = self.get_mut(); + let (shared, this_hook) = (&this.sender.shared, &mut this.hook); + + shared.send( + // item + item, + // should_block + true, + // make_signal + |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)), + // do_block + |hook| { + *this_hook = Some(SendState::QueuedItem(hook)); + Poll::Pending + }, + ) + .map(|r| r.map_err(|err| match err { + TrySendTimeoutError::Disconnected(msg) => SendError(msg), + _ => unreachable!(), + })) + } else { // Nothing to do + Poll::Ready(Ok(())) + } + } +} + +impl<'a, T> FusedFuture for SendFut<'a, T> { + fn is_terminated(&self) -> bool { + self.sender.shared.is_disconnected() + } +} + +/// A sink that allows sending values into a channel. +/// +/// Can be created via [`Sender::sink`] or [`Sender::into_sink`]. +pub struct SendSink<'a, T>(SendFut<'a, T>); + +impl<'a, T> SendSink<'a, T> { + /// Returns a clone of a sending half of the channel of this sink. + pub fn sender(&self) -> &Sender<T> { + &self.0.sender + } + + /// See [`Sender::is_disconnected`]. + pub fn is_disconnected(&self) -> bool { + self.0.is_disconnected() + } + + /// See [`Sender::is_empty`]. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// See [`Sender::is_full`]. + pub fn is_full(&self) -> bool { + self.0.is_full() + } + + /// See [`Sender::len`]. + pub fn len(&self) -> usize { + self.0.len() + } + + /// See [`Sender::capacity`]. + pub fn capacity(&self) -> Option<usize> { + self.0.capacity() + } + + /// Returns whether the SendSinks are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.sender().same_channel(other.sender()) + } +} + +impl<'a, T> Sink<T> for SendSink<'a, T> { + type Error = SendError<T>; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.0).poll(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.0.reset_hook(); + self.0.hook = Some(SendState::NotYetSent(item)); + + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? + } +} + +impl<'a, T> Clone for SendSink<'a, T> { + fn clone(&self) -> SendSink<'a, T> { + SendSink(SendFut { + sender: self.0.sender.clone(), + hook: None, + }) + } +} + +impl<T> Receiver<T> { + /// Asynchronously receive a value from the channel, returning an error if all senders have been + /// dropped. If the channel is empty, the returned future will yield to the async runtime. + pub fn recv_async(&self) -> RecvFut<'_, T> { + RecvFut::new(OwnedOrRef::Ref(self)) + } + + /// Convert this receiver into a future that asynchronously receives a single message from the + /// channel, returning an error if all senders have been dropped. If the channel is empty, this + /// future will yield to the async runtime. + pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> { + RecvFut::new(OwnedOrRef::Owned(self)) + } + + /// Create an asynchronous stream that uses this receiver to asynchronously receive messages + /// from the channel. The receiver will continue to be usable after the stream has been dropped. + pub fn stream(&self) -> RecvStream<'_, T> { + RecvStream(RecvFut::new(OwnedOrRef::Ref(self))) + } + + /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel. + pub fn into_stream<'a>(self) -> RecvStream<'a, T> { + RecvStream(RecvFut::new(OwnedOrRef::Owned(self))) + } +} + +/// A future which allows asynchronously receiving a message. +/// +/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`]. +#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] +pub struct RecvFut<'a, T> { + receiver: OwnedOrRef<'a, Receiver<T>>, + hook: Option<Arc<Hook<T, AsyncSignal>>>, +} + +impl<'a, T> RecvFut<'a, T> { + fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self { + Self { + receiver, + hook: None, + } + } + + /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking + /// another receiver if this receiver has been woken, so as not to cause any missed wakeups. + /// This is called on drop and after a new item is received in `Stream::poll_next`. + fn reset_hook(&mut self) { + if let Some(hook) = self.hook.take() { + let hook: Arc<Hook<T, dyn Signal>> = hook; + let mut chan = wait_lock(&self.receiver.shared.chan); + // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers? + chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) { + // If this signal has been fired, but we're being dropped (and so not listening to it), + // pass the signal on to another receiver + chan.try_wake_receiver_if_pending(); + } + } + } + + fn poll_inner( + self: Pin<&mut Self>, + cx: &mut Context, + stream: bool, + ) -> Poll<Result<T, RecvError>> { + if self.hook.is_some() { + match self.receiver.shared.recv_sync(None) { + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvTimeoutError::Disconnected) => { + return Poll::Ready(Err(RecvError::Disconnected)) + } + _ => (), + } + + let hook = self.hook.as_ref().map(Arc::clone).unwrap(); + if hook.update_waker(cx.waker()) { + // If the previous hook was awakened, we need to insert it back to the + // queue, otherwise, it remains valid. + wait_lock(&self.receiver.shared.chan) + .waiting + .push_back(hook); + } + // To avoid a missed wakeup, re-check disconnect status here because the channel might have + // gotten shut down before we had a chance to push our hook + if self.receiver.shared.is_disconnected() { + // And now, to avoid a race condition between the first recv attempt and the disconnect check we + // just performed, attempt to recv again just in case we missed something. + Poll::Ready( + self.receiver + .shared + .recv_sync(None) + .map(Ok) + .unwrap_or(Err(RecvError::Disconnected)), + ) + } else { + Poll::Pending + } + } else { + let mut_self = self.get_mut(); + let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook); + + shared.recv( + // should_block + true, + // make_signal + || Hook::trigger(AsyncSignal::new(cx, stream)), + // do_block + |hook| { + *this_hook = Some(hook); + Poll::Pending + }, + ) + .map(|r| r.map_err(|err| match err { + TryRecvTimeoutError::Disconnected => RecvError::Disconnected, + _ => unreachable!(), + })) + } + } + + /// See [`Receiver::is_disconnected`]. + pub fn is_disconnected(&self) -> bool { + self.receiver.is_disconnected() + } + + /// See [`Receiver::is_empty`]. + pub fn is_empty(&self) -> bool { + self.receiver.is_empty() + } + + /// See [`Receiver::is_full`]. + pub fn is_full(&self) -> bool { + self.receiver.is_full() + } + + /// See [`Receiver::len`]. + pub fn len(&self) -> usize { + self.receiver.len() + } + + /// See [`Receiver::capacity`]. + pub fn capacity(&self) -> Option<usize> { + self.receiver.capacity() + } +} + +impl<'a, T> Drop for RecvFut<'a, T> { + fn drop(&mut self) { + self.reset_hook(); + } +} + +impl<'a, T> Future for RecvFut<'a, T> { + type Output = Result<T, RecvError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.poll_inner(cx, false) // stream = false + } +} + +impl<'a, T> FusedFuture for RecvFut<'a, T> { + fn is_terminated(&self) -> bool { + self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty() + } +} + +/// A stream which allows asynchronously receiving messages. +/// +/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`]. +pub struct RecvStream<'a, T>(RecvFut<'a, T>); + +impl<'a, T> RecvStream<'a, T> { + /// See [`Receiver::is_disconnected`]. + pub fn is_disconnected(&self) -> bool { + self.0.is_disconnected() + } + + /// See [`Receiver::is_empty`]. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// See [`Receiver::is_full`]. + pub fn is_full(&self) -> bool { + self.0.is_full() + } + + /// See [`Receiver::len`]. + pub fn len(&self) -> usize { + self.0.len() + } + + /// See [`Receiver::capacity`]. + pub fn capacity(&self) -> Option<usize> { + self.0.capacity() + } + + /// Returns whether the SendSinks are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.0.receiver.same_channel(&*other.0.receiver) + } +} + +impl<'a, T> Clone for RecvStream<'a, T> { + fn clone(&self) -> RecvStream<'a, T> { + RecvStream(RecvFut::new(self.0.receiver.clone())) + } +} + +impl<'a, T> Stream for RecvStream<'a, T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true + Poll::Pending => Poll::Pending, + Poll::Ready(item) => { + self.0.reset_hook(); + Poll::Ready(item.ok()) + } + } + } +} + +impl<'a, T> FusedStream for RecvStream<'a, T> { + fn is_terminated(&self) -> bool { + self.0.is_terminated() + } +} diff --git a/vendor/flume/src/lib.rs b/vendor/flume/src/lib.rs new file mode 100644 index 0000000..c9bb3ee --- /dev/null +++ b/vendor/flume/src/lib.rs @@ -0,0 +1,1142 @@ +//! # Flume +//! +//! A blazingly fast multi-producer, multi-consumer channel. +//! +//! *"Do not communicate by sharing memory; instead, share memory by communicating."* +//! +//! ## Why Flume? +//! +//! - **Featureful**: Unbounded, bounded and rendezvous queues +//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel` +//! - **Safe**: No `unsafe` code anywhere in the codebase! +//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone` +//! - **Familiar**: Drop-in replacement for `std::sync::mpsc` +//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines +//! - **Simple**: Few dependencies, minimal codebase, fast to compile +//! - **Asynchronous**: `async` support, including mix 'n match with sync code +//! - **Ergonomic**: Powerful `select`-like interface +//! +//! ## Example +//! +//! ``` +//! let (tx, rx) = flume::unbounded(); +//! +//! tx.send(42).unwrap(); +//! assert_eq!(rx.recv().unwrap(), 42); +//! ``` + +#![deny(missing_docs)] + +#[cfg(feature = "select")] +pub mod select; +#[cfg(feature = "async")] +pub mod r#async; + +mod signal; + +// Reexports +#[cfg(feature = "select")] +pub use select::Selector; + +use std::{ + collections::VecDeque, + sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak}, + time::{Duration, Instant}, + marker::PhantomData, + thread, + fmt, +}; + +#[cfg(feature = "spin")] +use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; +use crate::signal::{Signal, SyncSignal}; + +/// An error that may be emitted when attempting to send a value into a channel on a sender when +/// all receivers are dropped. +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct SendError<T>(pub T); + +impl<T> SendError<T> { + /// Consume the error, yielding the message that failed to send. + pub fn into_inner(self) -> T { self.0 } +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendError(..)".fmt(f) + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +impl<T> std::error::Error for SendError<T> {} + +/// An error that may be emitted when attempting to send a value into a channel on a sender when +/// the channel is full or all receivers are dropped. +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TrySendError<T> { + /// The channel the message is sent on has a finite capacity and was full when the send was attempted. + Full(T), + /// All channel receivers were dropped and so the message has nobody to receive it. + Disconnected(T), +} + +impl<T> TrySendError<T> { + /// Consume the error, yielding the message that failed to send. + pub fn into_inner(self) -> T { + match self { + Self::Full(msg) | Self::Disconnected(msg) => msg, + } + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "Full(..)".fmt(f), + TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), + } + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), + } + } +} + +impl<T> std::error::Error for TrySendError<T> {} + +impl<T> From<SendError<T>> for TrySendError<T> { + fn from(err: SendError<T>) -> Self { + match err { + SendError(item) => Self::Disconnected(item), + } + } +} + +/// An error that may be emitted when sending a value into a channel on a sender with a timeout when +/// the send operation times out or all receivers are dropped. +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum SendTimeoutError<T> { + /// A timeout occurred when attempting to send the message. + Timeout(T), + /// All channel receivers were dropped and so the message has nobody to receive it. + Disconnected(T), +} + +impl<T> SendTimeoutError<T> { + /// Consume the error, yielding the message that failed to send. + pub fn into_inner(self) -> T { + match self { + Self::Timeout(msg) | Self::Disconnected(msg) => msg, + } + } +} + +impl<T> fmt::Debug for SendTimeoutError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendTimeoutError(..)".fmt(f) + } +} + +impl<T> fmt::Display for SendTimeoutError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f), + SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f), + } + } +} + +impl<T> std::error::Error for SendTimeoutError<T> {} + +impl<T> From<SendError<T>> for SendTimeoutError<T> { + fn from(err: SendError<T>) -> Self { + match err { + SendError(item) => Self::Disconnected(item), + } + } +} + +enum TrySendTimeoutError<T> { + Full(T), + Disconnected(T), + Timeout(T), +} + +/// An error that may be emitted when attempting to wait for a value on a receiver when all senders +/// are dropped and there are no more messages in the channel. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum RecvError { + /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. + Disconnected, +} + +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RecvError::Disconnected => "receiving on a closed channel".fmt(f), + } + } +} + +impl std::error::Error for RecvError {} + +/// An error that may be emitted when attempting to fetch a value on a receiver when there are no +/// messages in the channel. If there are no messages in the channel and all senders are dropped, +/// then `TryRecvError::Disconnected` will be returned. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum TryRecvError { + /// The channel was empty when the receive was attempted. + Empty, + /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. + Disconnected, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "channel is empty and closed".fmt(f), + } + } +} + +impl std::error::Error for TryRecvError {} + +impl From<RecvError> for TryRecvError { + fn from(err: RecvError) -> Self { + match err { + RecvError::Disconnected => Self::Disconnected, + } + } +} + +/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout +/// when the receive operation times out or all senders are dropped and there are no values left +/// in the channel. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum RecvTimeoutError { + /// A timeout occurred when attempting to receive a message. + Timeout, + /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. + Disconnected, +} + +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f), + } + } +} + +impl std::error::Error for RecvTimeoutError {} + +impl From<RecvError> for RecvTimeoutError { + fn from(err: RecvError) -> Self { + match err { + RecvError::Disconnected => Self::Disconnected, + } + } +} + +enum TryRecvTimeoutError { + Empty, + Timeout, + Disconnected, +} + +// TODO: Investigate some sort of invalidation flag for timeouts +#[cfg(feature = "spin")] +struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S); + +#[cfg(not(feature = "spin"))] +struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S); + +#[cfg(feature = "spin")] +impl<T, S: ?Sized + Signal> Hook<T, S> { + pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> + where + S: Sized, + { + Arc::new(Self(Some(Spinlock::new(msg)), signal)) + } + + fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> { + self.0.as_ref().map(|s| s.lock()) + } +} + +#[cfg(not(feature = "spin"))] +impl<T, S: ?Sized + Signal> Hook<T, S> { + pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> + where + S: Sized, + { + Arc::new(Self(Some(Mutex::new(msg)), signal)) + } + + fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> { + self.0.as_ref().map(|s| s.lock().unwrap()) + } +} + +impl<T, S: ?Sized + Signal> Hook<T, S> { + pub fn fire_recv(&self) -> (T, &S) { + let msg = self.lock().unwrap().take().unwrap(); + (msg, self.signal()) + } + + pub fn fire_send(&self, msg: T) -> (Option<T>, &S) { + let ret = match self.lock() { + Some(mut lock) => { + *lock = Some(msg); + None + } + None => Some(msg), + }; + (ret, self.signal()) + } + + pub fn is_empty(&self) -> bool { + self.lock().map(|s| s.is_none()).unwrap_or(true) + } + + pub fn try_take(&self) -> Option<T> { + self.lock().unwrap().take() + } + + pub fn trigger(signal: S) -> Arc<Self> + where + S: Sized, + { + Arc::new(Self(None, signal)) + } + + pub fn signal(&self) -> &S { + &self.1 + } + + pub fn fire_nothing(&self) -> bool { + self.signal().fire() + } +} + +impl<T> Hook<T, SyncSignal> { + pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> { + loop { + let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg + let msg = self.lock().unwrap().take(); + if let Some(msg) = msg { + break Some(msg); + } else if disconnected { + break None; + } else { + self.signal().wait() + } + } + } + + // Err(true) if timeout + pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> { + loop { + let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg + let msg = self.lock().unwrap().take(); + if let Some(msg) = msg { + break Ok(msg); + } else if disconnected { + break Err(false); + } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + self.signal().wait_timeout(dur); + } else { + break Err(true); + } + } + } + + pub fn wait_send(&self, abort: &AtomicBool) { + loop { + let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg + if disconnected || self.lock().unwrap().is_none() { + break; + } + + self.signal().wait(); + } + } + + // Err(true) if timeout + pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> { + loop { + let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg + if self.lock().unwrap().is_none() { + break Ok(()); + } else if disconnected { + break Err(false); + } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + self.signal().wait_timeout(dur); + } else { + break Err(true); + } + } + } +} + +#[cfg(feature = "spin")] +#[inline] +fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> { + let mut i = 4; + loop { + for _ in 0..10 { + if let Some(guard) = lock.try_lock() { + return guard; + } + thread::yield_now(); + } + // Sleep for at most ~1 ms + thread::sleep(Duration::from_nanos(1 << i.min(20))); + i += 1; + } +} + +#[cfg(not(feature = "spin"))] +#[inline] +fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> { + lock.lock().unwrap() +} + +#[cfg(not(feature = "spin"))] +use std::sync::{Mutex, MutexGuard}; + +#[cfg(feature = "spin")] +type ChanLock<T> = Spinlock<T>; +#[cfg(not(feature = "spin"))] +type ChanLock<T> = Mutex<T>; + + +type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>; +struct Chan<T> { + sending: Option<(usize, SignalVec<T>)>, + queue: VecDeque<T>, + waiting: SignalVec<T>, +} + +impl<T> Chan<T> { + fn pull_pending(&mut self, pull_extra: bool) { + if let Some((cap, sending)) = &mut self.sending { + let effective_cap = *cap + pull_extra as usize; + + while self.queue.len() < effective_cap { + if let Some(s) = sending.pop_front() { + let (msg, signal) = s.fire_recv(); + signal.fire(); + self.queue.push_back(msg); + } else { + break; + } + } + } + } + + fn try_wake_receiver_if_pending(&mut self) { + if !self.queue.is_empty() { + while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {} + } + } +} + +struct Shared<T> { + chan: ChanLock<Chan<T>>, + disconnected: AtomicBool, + sender_count: AtomicUsize, + receiver_count: AtomicUsize, +} + +impl<T> Shared<T> { + fn new(cap: Option<usize>) -> Self { + Self { + chan: ChanLock::new(Chan { + sending: cap.map(|cap| (cap, VecDeque::new())), + queue: VecDeque::new(), + waiting: VecDeque::new(), + }), + disconnected: AtomicBool::new(false), + sender_count: AtomicUsize::new(1), + receiver_count: AtomicUsize::new(1), + } + } + + fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>( + &self, + msg: T, + should_block: bool, + make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>, + do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, + ) -> R { + let mut chan = wait_lock(&self.chan); + + if self.is_disconnected() { + Err(TrySendTimeoutError::Disconnected(msg)).into() + } else if !chan.waiting.is_empty() { + let mut msg = Some(msg); + + loop { + let slot = chan.waiting.pop_front(); + match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) { + // No more waiting receivers and msg in queue, so break out of the loop + None if msg.is_none() => break, + // No more waiting receivers, so add msg to queue and break out of the loop + None => { + chan.queue.push_back(msg.unwrap()); + break; + } + Some((Some(m), signal)) => { + if signal.fire() { + // Was async and a stream, so didn't acquire the message. Wake another + // receiver, and do not yet push the message. + msg.replace(m); + continue; + } else { + // Was async and not a stream, so it did acquire the message. Push the + // message to the queue for it to be received. + chan.queue.push_back(m); + drop(chan); + break; + } + }, + Some((None, signal)) => { + drop(chan); + signal.fire(); + break; // Was sync, so it has acquired the message + }, + } + } + + Ok(()).into() + } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) { + chan.queue.push_back(msg); + Ok(()).into() + } else if should_block { // Only bounded from here on + let hook = make_signal(msg); + chan.sending.as_mut().unwrap().1.push_back(hook.clone()); + drop(chan); + + do_block(hook) + } else { + Err(TrySendTimeoutError::Full(msg)).into() + } + } + + fn send_sync( + &self, + msg: T, + block: Option<Option<Instant>>, + ) -> Result<(), TrySendTimeoutError<T>> { + self.send( + // msg + msg, + // should_block + block.is_some(), + // make_signal + |msg| Hook::slot(Some(msg), SyncSignal::default()), + // do_block + |hook| if let Some(deadline) = block.unwrap() { + hook.wait_deadline_send(&self.disconnected, deadline) + .or_else(|timed_out| { + if timed_out { // Remove our signal + let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone(); + wait_lock(&self.chan).sending + .as_mut() + .unwrap().1 + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + } + hook.try_take().map(|msg| if self.is_disconnected() { + Err(TrySendTimeoutError::Disconnected(msg)) + } else { + Err(TrySendTimeoutError::Timeout(msg)) + }) + .unwrap_or(Ok(())) + }) + } else { + hook.wait_send(&self.disconnected); + + match hook.try_take() { + Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)), + None => Ok(()), + } + }, + ) + } + + fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>( + &self, + should_block: bool, + make_signal: impl FnOnce() -> Arc<Hook<T, S>>, + do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, + ) -> R { + let mut chan = wait_lock(&self.chan); + chan.pull_pending(true); + + if let Some(msg) = chan.queue.pop_front() { + drop(chan); + Ok(msg).into() + } else if self.is_disconnected() { + drop(chan); + Err(TryRecvTimeoutError::Disconnected).into() + } else if should_block { + let hook = make_signal(); + chan.waiting.push_back(hook.clone()); + drop(chan); + + do_block(hook) + } else { + drop(chan); + Err(TryRecvTimeoutError::Empty).into() + } + } + + fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> { + self.recv( + // should_block + block.is_some(), + // make_signal + || Hook::slot(None, SyncSignal::default()), + // do_block + |hook| if let Some(deadline) = block.unwrap() { + hook.wait_deadline_recv(&self.disconnected, deadline) + .or_else(|timed_out| { + if timed_out { // Remove our signal + let hook: Arc<Hook<T, dyn Signal>> = hook.clone(); + wait_lock(&self.chan).waiting + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + } + match hook.try_take() { + Some(msg) => Ok(msg), + None => { + let disconnected = self.is_disconnected(); // Check disconnect *before* msg + if let Some(msg) = wait_lock(&self.chan).queue.pop_front() { + Ok(msg) + } else if disconnected { + Err(TryRecvTimeoutError::Disconnected) + } else { + Err(TryRecvTimeoutError::Timeout) + } + }, + } + }) + } else { + hook.wait_recv(&self.disconnected) + .or_else(|| wait_lock(&self.chan).queue.pop_front()) + .ok_or(TryRecvTimeoutError::Disconnected) + }, + ) + } + + /// Disconnect anything listening on this channel (this will not prevent receivers receiving + /// msgs that have already been sent) + fn disconnect_all(&self) { + self.disconnected.store(true, Ordering::Relaxed); + + let mut chan = wait_lock(&self.chan); + chan.pull_pending(false); + if let Some((_, sending)) = chan.sending.as_ref() { + sending.iter().for_each(|hook| { + hook.signal().fire(); + }) + } + chan.waiting.iter().for_each(|hook| { + hook.signal().fire(); + }); + } + + fn is_disconnected(&self) -> bool { + self.disconnected.load(Ordering::SeqCst) + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn is_full(&self) -> bool { + self.capacity().map(|cap| cap == self.len()).unwrap_or(false) + } + + fn len(&self) -> usize { + let mut chan = wait_lock(&self.chan); + chan.pull_pending(false); + chan.queue.len() + } + + fn capacity(&self) -> Option<usize> { + wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap) + } + + fn sender_count(&self) -> usize { + self.sender_count.load(Ordering::Relaxed) + } + + fn receiver_count(&self) -> usize { + self.receiver_count.load(Ordering::Relaxed) + } +} + +/// A transmitting end of a channel. +pub struct Sender<T> { + shared: Arc<Shared<T>>, +} + +impl<T> Sender<T> { + /// Attempt to send a value into the channel. If the channel is bounded and full, or all + /// receivers have been dropped, an error is returned. If the channel associated with this + /// sender is unbounded, this method has the same behaviour as [`Sender::send`]. + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + self.shared.send_sync(msg, None).map_err(|err| match err { + TrySendTimeoutError::Full(msg) => TrySendError::Full(msg), + TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), + _ => unreachable!(), + }) + } + + /// Send a value into the channel, returning an error if all receivers have been dropped. + /// If the channel is bounded and is full, this method will block until space is available + /// or all receivers have been dropped. If the channel is unbounded, this method will not + /// block. + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + self.shared.send_sync(msg, Some(None)).map_err(|err| match err { + TrySendTimeoutError::Disconnected(msg) => SendError(msg), + _ => unreachable!(), + }) + } + + /// Send a value into the channel, returning an error if all receivers have been dropped + /// or the deadline has passed. If the channel is bounded and is full, this method will + /// block until space is available, the deadline is reached, or all receivers have been + /// dropped. + pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { + self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err { + TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg), + TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg), + _ => unreachable!(), + }) + } + + /// Send a value into the channel, returning an error if all receivers have been dropped + /// or the timeout has expired. If the channel is bounded and is full, this method will + /// block until space is available, the timeout has expired, or all receivers have been + /// dropped. + pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> { + self.send_deadline(msg, Instant::now().checked_add(dur).unwrap()) + } + + /// Returns true if all receivers for this channel have been dropped. + pub fn is_disconnected(&self) -> bool { + self.shared.is_disconnected() + } + + /// Returns true if the channel is empty. + /// Note: Zero-capacity channels are always empty. + pub fn is_empty(&self) -> bool { + self.shared.is_empty() + } + + /// Returns true if the channel is full. + /// Note: Zero-capacity channels are always full. + pub fn is_full(&self) -> bool { + self.shared.is_full() + } + + /// Returns the number of messages in the channel + pub fn len(&self) -> usize { + self.shared.len() + } + + /// If the channel is bounded, returns its capacity. + pub fn capacity(&self) -> Option<usize> { + self.shared.capacity() + } + + /// Get the number of senders that currently exist, including this one. + pub fn sender_count(&self) -> usize { + self.shared.sender_count() + } + + /// Get the number of receivers that currently exist. + /// + /// Note that this method makes no guarantees that a subsequent send will succeed; it's + /// possible that between `receiver_count()` being called and a `send()`, all open receivers + /// could drop. + pub fn receiver_count(&self) -> usize { + self.shared.receiver_count() + } + + /// Creates a [`WeakSender`] that does not keep the channel open. + /// + /// The channel is closed once all `Sender`s are dropped, even if there + /// are still active `WeakSender`s. + pub fn downgrade(&self) -> WeakSender<T> { + WeakSender { + shared: Arc::downgrade(&self.shared), + } + } + + /// Returns whether the senders are belong to the same channel. + pub fn same_channel(&self, other: &Sender<T>) -> bool { + Arc::ptr_eq(&self.shared, &other.shared) + } +} + +impl<T> Clone for Sender<T> { + /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel + /// contents will only be cleaned up when all senders and the receiver have been dropped. + fn clone(&self) -> Self { + self.shared.sender_count.fetch_add(1, Ordering::Relaxed); + Self { shared: self.shared.clone() } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Sender").finish() + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + // Notify receivers that all senders have been dropped if the number of senders drops to 0. + if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 { + self.shared.disconnect_all(); + } + } +} + +/// A sender that does not prevent the channel from being closed. +/// +/// Weak senders do not count towards the number of active senders on the channel. As soon as +/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a +/// `WeakSender`. +/// +/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`] +/// method. +pub struct WeakSender<T> { + shared: Weak<Shared<T>>, +} + +impl<T> WeakSender<T> { + /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages. + /// + /// Returns `None` if the channel was closed already. Note that a `Some` return value + /// does not guarantee that the channel is still open. + pub fn upgrade(&self) -> Option<Sender<T>> { + self.shared + .upgrade() + // check that there are still live senders + .filter(|shared| { + shared + .sender_count + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| { + if count == 0 { + // all senders are closed already -> don't increase the sender count + None + } else { + // there is still at least one active sender + Some(count + 1) + } + }) + .is_ok() + }) + .map(|shared| Sender { shared }) + } +} + +/// The receiving end of a channel. +/// +/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. +/// Each message will only be received by a single receiver. This is useful for +/// implementing work stealing for concurrent programs. +pub struct Receiver<T> { + shared: Arc<Shared<T>>, +} + +impl<T> Receiver<T> { + /// Attempt to fetch an incoming value from the channel associated with this receiver, + /// returning an error if the channel is empty or if all senders have been dropped. + pub fn try_recv(&self) -> Result<T, TryRecvError> { + self.shared.recv_sync(None).map_err(|err| match err { + TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected, + TryRecvTimeoutError::Empty => TryRecvError::Empty, + _ => unreachable!(), + }) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped. + pub fn recv(&self) -> Result<T, RecvError> { + self.shared.recv_sync(Some(None)).map_err(|err| match err { + TryRecvTimeoutError::Disconnected => RecvError::Disconnected, + _ => unreachable!(), + }) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped or the deadline has passed. + pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { + self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err { + TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, + TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout, + _ => unreachable!(), + }) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped or the timeout has expired. + pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> { + self.recv_deadline(Instant::now().checked_add(dur).unwrap()) + } + + /// Create a blocking iterator over the values received on the channel that finishes iteration + /// when all senders have been dropped. + /// + /// You can also create a self-owned iterator with [`Receiver::into_iter`]. + pub fn iter(&self) -> Iter<T> { + Iter { receiver: &self } + } + + /// A non-blocking iterator over the values received on the channel that finishes iteration + /// when all senders have been dropped or the channel is empty. + pub fn try_iter(&self) -> TryIter<T> { + TryIter { receiver: &self } + } + + /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike + /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once + /// the function has been called. + pub fn drain(&self) -> Drain<T> { + let mut chan = wait_lock(&self.shared.chan); + chan.pull_pending(false); + let queue = std::mem::take(&mut chan.queue); + + Drain { queue, _phantom: PhantomData } + } + + /// Returns true if all senders for this channel have been dropped. + pub fn is_disconnected(&self) -> bool { + self.shared.is_disconnected() + } + + /// Returns true if the channel is empty. + /// Note: Zero-capacity channels are always empty. + pub fn is_empty(&self) -> bool { + self.shared.is_empty() + } + + /// Returns true if the channel is full. + /// Note: Zero-capacity channels are always full. + pub fn is_full(&self) -> bool { + self.shared.is_full() + } + + /// Returns the number of messages in the channel. + pub fn len(&self) -> usize { + self.shared.len() + } + + /// If the channel is bounded, returns its capacity. + pub fn capacity(&self) -> Option<usize> { + self.shared.capacity() + } + + /// Get the number of senders that currently exist. + pub fn sender_count(&self) -> usize { + self.shared.sender_count() + } + + /// Get the number of receivers that currently exist, including this one. + pub fn receiver_count(&self) -> usize { + self.shared.receiver_count() + } + + /// Returns whether the receivers are belong to the same channel. + pub fn same_channel(&self, other: &Receiver<T>) -> bool { + Arc::ptr_eq(&self.shared, &other.shared) + } +} + +impl<T> Clone for Receiver<T> { + /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining + /// channel contents will only be cleaned up when all senders and the receiver have been + /// dropped. + /// + /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. + /// Each message will only be received by a single receiver. This is useful for + /// implementing work stealing for concurrent programs. + fn clone(&self) -> Self { + self.shared.receiver_count.fetch_add(1, Ordering::Relaxed); + Self { shared: self.shared.clone() } + } +} + +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Receiver").finish() + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // Notify senders that all receivers have been dropped if the number of receivers drops + // to 0. + if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 { + self.shared.disconnect_all(); + } + } +} + +/// This exists as a shorthand for [`Receiver::iter`]. +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + Iter { receiver: self } + } +} + +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`]. + fn into_iter(self) -> Self::IntoIter { + IntoIter { receiver: self } + } +} + +/// An iterator over the msgs received from a channel. +pub struct Iter<'a, T> { + receiver: &'a Receiver<T>, +} + +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.recv().ok() + } +} + +/// An non-blocking iterator over the msgs received from a channel. +pub struct TryIter<'a, T> { + receiver: &'a Receiver<T>, +} + +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.try_recv().ok() + } +} + +/// An fixed-sized iterator over the msgs drained from a channel. +#[derive(Debug)] +pub struct Drain<'a, T> { + queue: VecDeque<T>, + /// A phantom field used to constrain the lifetime of this iterator. We do this because the + /// implementation may change and we don't want to unintentionally constrain it. Removing this + /// lifetime later is a possibility. + _phantom: PhantomData<&'a ()>, +} + +impl<'a, T> Iterator for Drain<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.queue.pop_front() + } +} + +impl<'a, T> ExactSizeIterator for Drain<'a, T> { + fn len(&self) -> usize { + self.queue.len() + } +} + +/// An owned iterator over the msgs received from a channel. +pub struct IntoIter<T> { + receiver: Receiver<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.recv().ok() + } +} + +/// Create a channel with no maximum capacity. +/// +/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in +/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and +/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] +/// may be cloned. +/// +/// # Examples +/// ``` +/// let (tx, rx) = flume::unbounded(); +/// +/// tx.send(42).unwrap(); +/// assert_eq!(rx.recv().unwrap(), 42); +/// ``` +pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { + let shared = Arc::new(Shared::new(None)); + ( + Sender { shared: shared.clone() }, + Receiver { shared }, + ) +} + +/// Create a channel with a maximum capacity. +/// +/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one +/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and +/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] +/// may be cloned. +/// +/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to +/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour +/// is not desired, [`Sender::try_send`] may be used. +/// +/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero +/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a +/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers +/// perform a handshake and transfer ownership of a value. +/// +/// # Examples +/// ``` +/// let (tx, rx) = flume::bounded(32); +/// +/// for i in 1..33 { +/// tx.send(i).unwrap(); +/// } +/// assert!(tx.try_send(33).is_err()); +/// +/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum()); +/// ``` +pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { + let shared = Arc::new(Shared::new(Some(cap))); + ( + Sender { shared: shared.clone() }, + Receiver { shared }, + ) +} diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs new file mode 100644 index 0000000..cef15aa --- /dev/null +++ b/vendor/flume/src/select.rs @@ -0,0 +1,405 @@ +//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. + +use crate::*; +use spin1::Mutex as Spinlock; +use std::{any::Any, marker::PhantomData}; + +#[cfg(feature = "eventual-fairness")] +use nanorand::Rng; + +// A unique token corresponding to an event in a selector +type Token = usize; + +struct SelectSignal( + thread::Thread, + Token, + AtomicBool, + Arc<Spinlock<VecDeque<Token>>>, +); + +impl Signal for SelectSignal { + fn fire(&self) -> bool { + self.2.store(true, Ordering::SeqCst); + self.3.lock().push_back(self.1); + self.0.unpark(); + false + } + + fn as_any(&self) -> &(dyn Any + 'static) { + self + } + fn as_ptr(&self) -> *const () { + self as *const _ as *const () + } +} + +trait Selection<'a, T> { + fn init(&mut self) -> Option<T>; + fn poll(&mut self) -> Option<T>; + fn deinit(&mut self); +} + +/// An error that may be emitted when attempting to wait for a value on a receiver. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum SelectError { + /// A timeout occurred when waiting on a `Selector`. + Timeout, +} + +impl fmt::Display for SelectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SelectError::Timeout => "timeout occurred".fmt(f), + } + } +} + +impl std::error::Error for SelectError {} + +/// A type used to wait upon multiple blocking operations at once. +/// +/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour, +/// allowing a thread to wait upon the result of more than one operation at once. +/// +/// # Examples +/// ``` +/// let (tx0, rx0) = flume::unbounded(); +/// let (tx1, rx1) = flume::unbounded(); +/// +/// std::thread::spawn(move || { +/// tx0.send(true).unwrap(); +/// tx1.send(42).unwrap(); +/// }); +/// +/// flume::Selector::new() +/// .recv(&rx0, |b| println!("Received {:?}", b)) +/// .recv(&rx1, |n| println!("Received {:?}", n)) +/// .wait(); +/// ``` +pub struct Selector<'a, T: 'a> { + selections: Vec<Box<dyn Selection<'a, T> + 'a>>, + next_poll: usize, + signalled: Arc<Spinlock<VecDeque<Token>>>, + #[cfg(feature = "eventual-fairness")] + rng: nanorand::WyRand, + phantom: PhantomData<*const ()>, +} + +impl<'a, T: 'a> Default for Selector<'a, T> { + fn default() -> Self { + Self::new() + } +} + +impl<'a, T: 'a> fmt::Debug for Selector<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Selector").finish() + } +} + +impl<'a, T> Selector<'a, T> { + /// Create a new selector. + pub fn new() -> Self { + Self { + selections: Vec::new(), + next_poll: 0, + signalled: Arc::default(), + phantom: PhantomData::default(), + #[cfg(feature = "eventual-fairness")] + rng: nanorand::WyRand::new(), + } + } + + /// Add a send operation to the selector that sends the provided value. + /// + /// Once added, the selector can be used to run the provided handler function on completion of this operation. + pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>( + mut self, + sender: &'a Sender<U>, + msg: U, + mapper: F, + ) -> Self { + struct SendSelection<'a, T, F, U> { + sender: &'a Sender<U>, + msg: Option<U>, + token: Token, + signalled: Arc<Spinlock<VecDeque<Token>>>, + hook: Option<Arc<Hook<U, SelectSignal>>>, + mapper: F, + phantom: PhantomData<T>, + } + + impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U> + where + F: FnMut(Result<(), SendError<U>>) -> T, + { + fn init(&mut self) -> Option<T> { + let token = self.token; + let signalled = self.signalled.clone(); + let r = self.sender.shared.send( + self.msg.take().unwrap(), + true, + |msg| { + Hook::slot( + Some(msg), + SelectSignal( + thread::current(), + token, + AtomicBool::new(false), + signalled, + ), + ) + }, + // Always runs + |h| { + self.hook = Some(h); + Ok(()) + }, + ); + + if self.hook.is_none() { + Some((self.mapper)(match r { + Ok(()) => Ok(()), + Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)), + _ => unreachable!(), + })) + } else { + None + } + } + + fn poll(&mut self) -> Option<T> { + let res = if self.sender.shared.is_disconnected() { + // Check the hook one last time + if let Some(msg) = self.hook.as_ref()?.try_take() { + Err(SendError(msg)) + } else { + Ok(()) + } + } else if self.hook.as_ref().unwrap().is_empty() { + // The message was sent + Ok(()) + } else { + return None; + }; + + Some((&mut self.mapper)(res)) + } + + fn deinit(&mut self) { + if let Some(hook) = self.hook.take() { + // Remove hook + let hook: Arc<Hook<U, dyn Signal>> = hook; + wait_lock(&self.sender.shared.chan) + .sending + .as_mut() + .unwrap() + .1 + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + } + } + } + + let token = self.selections.len(); + self.selections.push(Box::new(SendSelection { + sender, + msg: Some(msg), + token, + signalled: self.signalled.clone(), + hook: None, + mapper, + phantom: Default::default(), + })); + + self + } + + /// Add a receive operation to the selector. + /// + /// Once added, the selector can be used to run the provided handler function on completion of this operation. + pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>( + mut self, + receiver: &'a Receiver<U>, + mapper: F, + ) -> Self { + struct RecvSelection<'a, T, F, U> { + receiver: &'a Receiver<U>, + token: Token, + signalled: Arc<Spinlock<VecDeque<Token>>>, + hook: Option<Arc<Hook<U, SelectSignal>>>, + mapper: F, + received: bool, + phantom: PhantomData<T>, + } + + impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U> + where + F: FnMut(Result<U, RecvError>) -> T, + { + fn init(&mut self) -> Option<T> { + let token = self.token; + let signalled = self.signalled.clone(); + let r = self.receiver.shared.recv( + true, + || { + Hook::trigger(SelectSignal( + thread::current(), + token, + AtomicBool::new(false), + signalled, + )) + }, + // Always runs + |h| { + self.hook = Some(h); + Err(TryRecvTimeoutError::Timeout) + }, + ); + + if self.hook.is_none() { + Some((self.mapper)(match r { + Ok(msg) => Ok(msg), + Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected), + _ => unreachable!(), + })) + } else { + None + } + } + + fn poll(&mut self) -> Option<T> { + let res = if let Ok(msg) = self.receiver.try_recv() { + self.received = true; + Ok(msg) + } else if self.receiver.shared.is_disconnected() { + Err(RecvError::Disconnected) + } else { + return None; + }; + + Some((&mut self.mapper)(res)) + } + + fn deinit(&mut self) { + if let Some(hook) = self.hook.take() { + // Remove hook + let hook: Arc<Hook<U, dyn Signal>> = hook; + wait_lock(&self.receiver.shared.chan) + .waiting + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + // If we were woken, but never polled, wake up another + if !self.received + && hook + .signal() + .as_any() + .downcast_ref::<SelectSignal>() + .unwrap() + .2 + .load(Ordering::SeqCst) + { + wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending(); + } + } + } + } + + let token = self.selections.len(); + self.selections.push(Box::new(RecvSelection { + receiver, + token, + signalled: self.signalled.clone(), + hook: None, + mapper, + received: false, + phantom: Default::default(), + })); + + self + } + + fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> { + #[cfg(feature = "eventual-fairness")] + { + self.next_poll = self.rng.generate_range(0..self.selections.len()); + } + + let res = 'outer: loop { + // Init signals + for _ in 0..self.selections.len() { + if let Some(val) = self.selections[self.next_poll].init() { + break 'outer Some(val); + } + self.next_poll = (self.next_poll + 1) % self.selections.len(); + } + + // Speculatively poll + if let Some(msg) = self.poll() { + break 'outer Some(msg); + } + + loop { + if let Some(deadline) = deadline { + if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + thread::park_timeout(dur); + } + } else { + thread::park(); + } + + if deadline.map(|d| Instant::now() >= d).unwrap_or(false) { + break 'outer self.poll(); + } + + let token = if let Some(token) = self.signalled.lock().pop_front() { + token + } else { + // Spurious wakeup, park again + continue; + }; + + // Attempt to receive a message + if let Some(msg) = self.selections[token].poll() { + break 'outer Some(msg); + } + } + }; + + // Deinit signals + for s in &mut self.selections { + s.deinit(); + } + + res + } + + fn poll(&mut self) -> Option<T> { + for _ in 0..self.selections.len() { + if let Some(val) = self.selections[self.next_poll].poll() { + return Some(val); + } + self.next_poll = (self.next_poll + 1) % self.selections.len(); + } + None + } + + /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness` + /// feature flag is enabled, this method is fair and will handle a random event of those that are ready. + pub fn wait(self) -> T { + self.wait_inner(None).unwrap() + } + + /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the + /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that + /// are ready. + pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> { + self.wait_inner(Some(Instant::now() + dur)) + .ok_or(SelectError::Timeout) + } + + /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached. + /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those + /// that are ready. + pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> { + self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout) + } +} diff --git a/vendor/flume/src/signal.rs b/vendor/flume/src/signal.rs new file mode 100644 index 0000000..89395a3 --- /dev/null +++ b/vendor/flume/src/signal.rs @@ -0,0 +1,33 @@ +use std::{thread::{self, Thread}, time::Duration, any::Any}; + +pub trait Signal: Send + Sync + 'static { + /// Fire the signal, returning whether it is a stream signal. This is because streams do not + /// acquire a message when woken, so signals must be fired until one that does acquire a message + /// is fired, otherwise a wakeup could be missed, leading to a lost message until one is eagerly + /// grabbed by a receiver. + fn fire(&self) -> bool; + fn as_any(&self) -> &(dyn Any + 'static); + fn as_ptr(&self) -> *const (); +} + +pub struct SyncSignal(Thread); + +impl Default for SyncSignal { + fn default() -> Self { + Self(thread::current()) + } +} + +impl Signal for SyncSignal { + fn fire(&self) -> bool { + self.0.unpark(); + false + } + fn as_any(&self) -> &(dyn Any + 'static) { self } + fn as_ptr(&self) -> *const () { self as *const _ as *const () } +} + +impl SyncSignal { + pub fn wait(&self) { thread::park(); } + pub fn wait_timeout(&self, dur: Duration) { thread::park_timeout(dur); } +} |