diff options
Diffstat (limited to 'vendor/flume/src')
-rw-r--r-- | vendor/flume/src/async.rs | 543 | ||||
-rw-r--r-- | vendor/flume/src/lib.rs | 1142 | ||||
-rw-r--r-- | vendor/flume/src/select.rs | 405 | ||||
-rw-r--r-- | vendor/flume/src/signal.rs | 33 |
4 files changed, 0 insertions, 2123 deletions
diff --git a/vendor/flume/src/async.rs b/vendor/flume/src/async.rs deleted file mode 100644 index fae44d4..0000000 --- a/vendor/flume/src/async.rs +++ /dev/null @@ -1,543 +0,0 @@ -//! Futures and other types that allow asynchronous interaction with channels. - -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll, Waker}, - any::Any, - ops::Deref, -}; -use crate::*; -use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture}; -use futures_sink::Sink; -use spin1::Mutex as Spinlock; - -struct AsyncSignal { - waker: Spinlock<Waker>, - woken: AtomicBool, - stream: bool, -} - -impl AsyncSignal { - fn new(cx: &Context, stream: bool) -> Self { - AsyncSignal { - waker: Spinlock::new(cx.waker().clone()), - woken: AtomicBool::new(false), - stream, - } - } -} - -impl Signal for AsyncSignal { - fn fire(&self) -> bool { - self.woken.store(true, Ordering::SeqCst); - self.waker.lock().wake_by_ref(); - self.stream - } - - fn as_any(&self) -> &(dyn Any + 'static) { self } - fn as_ptr(&self) -> *const () { self as *const _ as *const () } -} - -impl<T> Hook<T, AsyncSignal> { - // Update the hook to point to the given Waker. - // Returns whether the hook has been previously awakened - fn update_waker(&self, cx_waker: &Waker) -> bool { - let mut waker = self.1.waker.lock(); - let woken = self.1.woken.load(Ordering::SeqCst); - if !waker.will_wake(cx_waker) { - *waker = cx_waker.clone(); - - // Avoid the edge case where the waker was woken just before the wakers were - // swapped. - if woken { - cx_waker.wake_by_ref(); - } - } - woken - } -} - -#[derive(Clone)] -enum OwnedOrRef<'a, T> { - Owned(T), - Ref(&'a T), -} - -impl<'a, T> Deref for OwnedOrRef<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - match self { - OwnedOrRef::Owned(arc) => &arc, - OwnedOrRef::Ref(r) => r, - } - } -} - -impl<T> Sender<T> { - /// Asynchronously send a value into the channel, returning an error if all receivers have been - /// dropped. If the channel is bounded and is full, the returned future will yield to the async - /// runtime. - /// - /// In the current implementation, the returned future will not yield to the async runtime if the - /// channel is unbounded. This may change in later versions. - pub fn send_async(&self, item: T) -> SendFut<T> { - SendFut { - sender: OwnedOrRef::Ref(&self), - hook: Some(SendState::NotYetSent(item)), - } - } - - /// Convert this sender into a future that asynchronously sends a single message into the channel, - /// returning an error if all receivers have been dropped. If the channel is bounded and is full, - /// this future will yield to the async runtime. - /// - /// In the current implementation, the returned future will not yield to the async runtime if the - /// channel is unbounded. This may change in later versions. - pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> { - SendFut { - sender: OwnedOrRef::Owned(self), - hook: Some(SendState::NotYetSent(item)), - } - } - - /// Create an asynchronous sink that uses this sender to asynchronously send messages into the - /// channel. The sender will continue to be usable after the sink has been dropped. - /// - /// In the current implementation, the returned sink will not yield to the async runtime if the - /// channel is unbounded. This may change in later versions. - pub fn sink(&self) -> SendSink<'_, T> { - SendSink(SendFut { - sender: OwnedOrRef::Ref(&self), - hook: None, - }) - } - - /// Convert this sender into a sink that allows asynchronously sending messages into the channel. - /// - /// In the current implementation, the returned sink will not yield to the async runtime if the - /// channel is unbounded. This may change in later versions. - pub fn into_sink<'a>(self) -> SendSink<'a, T> { - SendSink(SendFut { - sender: OwnedOrRef::Owned(self), - hook: None, - }) - } -} - -enum SendState<T> { - NotYetSent(T), - QueuedItem(Arc<Hook<T, AsyncSignal>>), -} - -/// A future that sends a value into a channel. -/// -/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`]. -#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] -pub struct SendFut<'a, T> { - sender: OwnedOrRef<'a, Sender<T>>, - // Only none after dropping - hook: Option<SendState<T>>, -} - -impl<T> std::marker::Unpin for SendFut<'_, T> {} - -impl<'a, T> SendFut<'a, T> { - /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called - /// on drop and just before `start_send` in the `Sink` implementation. - fn reset_hook(&mut self) { - if let Some(SendState::QueuedItem(hook)) = self.hook.take() { - let hook: Arc<Hook<T, dyn Signal>> = hook; - wait_lock(&self.sender.shared.chan).sending - .as_mut() - .unwrap().1 - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - } - } - - /// See [`Sender::is_disconnected`]. - pub fn is_disconnected(&self) -> bool { - self.sender.is_disconnected() - } - - /// See [`Sender::is_empty`]. - pub fn is_empty(&self) -> bool { - self.sender.is_empty() - } - - /// See [`Sender::is_full`]. - pub fn is_full(&self) -> bool { - self.sender.is_full() - } - - /// See [`Sender::len`]. - pub fn len(&self) -> usize { - self.sender.len() - } - - /// See [`Sender::capacity`]. - pub fn capacity(&self) -> Option<usize> { - self.sender.capacity() - } -} - -impl<'a, T> Drop for SendFut<'a, T> { - fn drop(&mut self) { - self.reset_hook() - } -} - - -impl<'a, T> Future for SendFut<'a, T> { - type Output = Result<(), SendError<T>>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() { - if hook.is_empty() { - Poll::Ready(Ok(())) - } else if self.sender.shared.is_disconnected() { - let item = hook.try_take(); - self.hook = None; - match item { - Some(item) => Poll::Ready(Err(SendError(item))), - None => Poll::Ready(Ok(())), - } - } else { - hook.update_waker(cx.waker()); - Poll::Pending - } - } else if let Some(SendState::NotYetSent(item)) = self.hook.take() { - let this = self.get_mut(); - let (shared, this_hook) = (&this.sender.shared, &mut this.hook); - - shared.send( - // item - item, - // should_block - true, - // make_signal - |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)), - // do_block - |hook| { - *this_hook = Some(SendState::QueuedItem(hook)); - Poll::Pending - }, - ) - .map(|r| r.map_err(|err| match err { - TrySendTimeoutError::Disconnected(msg) => SendError(msg), - _ => unreachable!(), - })) - } else { // Nothing to do - Poll::Ready(Ok(())) - } - } -} - -impl<'a, T> FusedFuture for SendFut<'a, T> { - fn is_terminated(&self) -> bool { - self.sender.shared.is_disconnected() - } -} - -/// A sink that allows sending values into a channel. -/// -/// Can be created via [`Sender::sink`] or [`Sender::into_sink`]. -pub struct SendSink<'a, T>(SendFut<'a, T>); - -impl<'a, T> SendSink<'a, T> { - /// Returns a clone of a sending half of the channel of this sink. - pub fn sender(&self) -> &Sender<T> { - &self.0.sender - } - - /// See [`Sender::is_disconnected`]. - pub fn is_disconnected(&self) -> bool { - self.0.is_disconnected() - } - - /// See [`Sender::is_empty`]. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - /// See [`Sender::is_full`]. - pub fn is_full(&self) -> bool { - self.0.is_full() - } - - /// See [`Sender::len`]. - pub fn len(&self) -> usize { - self.0.len() - } - - /// See [`Sender::capacity`]. - pub fn capacity(&self) -> Option<usize> { - self.0.capacity() - } - - /// Returns whether the SendSinks are belong to the same channel. - pub fn same_channel(&self, other: &Self) -> bool { - self.sender().same_channel(other.sender()) - } -} - -impl<'a, T> Sink<T> for SendSink<'a, T> { - type Error = SendError<T>; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { - Pin::new(&mut self.0).poll(cx) - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.0.reset_hook(); - self.0.hook = Some(SendState::NotYetSent(item)); - - Ok(()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { - Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { - Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here? - } -} - -impl<'a, T> Clone for SendSink<'a, T> { - fn clone(&self) -> SendSink<'a, T> { - SendSink(SendFut { - sender: self.0.sender.clone(), - hook: None, - }) - } -} - -impl<T> Receiver<T> { - /// Asynchronously receive a value from the channel, returning an error if all senders have been - /// dropped. If the channel is empty, the returned future will yield to the async runtime. - pub fn recv_async(&self) -> RecvFut<'_, T> { - RecvFut::new(OwnedOrRef::Ref(self)) - } - - /// Convert this receiver into a future that asynchronously receives a single message from the - /// channel, returning an error if all senders have been dropped. If the channel is empty, this - /// future will yield to the async runtime. - pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> { - RecvFut::new(OwnedOrRef::Owned(self)) - } - - /// Create an asynchronous stream that uses this receiver to asynchronously receive messages - /// from the channel. The receiver will continue to be usable after the stream has been dropped. - pub fn stream(&self) -> RecvStream<'_, T> { - RecvStream(RecvFut::new(OwnedOrRef::Ref(self))) - } - - /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel. - pub fn into_stream<'a>(self) -> RecvStream<'a, T> { - RecvStream(RecvFut::new(OwnedOrRef::Owned(self))) - } -} - -/// A future which allows asynchronously receiving a message. -/// -/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`]. -#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] -pub struct RecvFut<'a, T> { - receiver: OwnedOrRef<'a, Receiver<T>>, - hook: Option<Arc<Hook<T, AsyncSignal>>>, -} - -impl<'a, T> RecvFut<'a, T> { - fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self { - Self { - receiver, - hook: None, - } - } - - /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking - /// another receiver if this receiver has been woken, so as not to cause any missed wakeups. - /// This is called on drop and after a new item is received in `Stream::poll_next`. - fn reset_hook(&mut self) { - if let Some(hook) = self.hook.take() { - let hook: Arc<Hook<T, dyn Signal>> = hook; - let mut chan = wait_lock(&self.receiver.shared.chan); - // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers? - chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) { - // If this signal has been fired, but we're being dropped (and so not listening to it), - // pass the signal on to another receiver - chan.try_wake_receiver_if_pending(); - } - } - } - - fn poll_inner( - self: Pin<&mut Self>, - cx: &mut Context, - stream: bool, - ) -> Poll<Result<T, RecvError>> { - if self.hook.is_some() { - match self.receiver.shared.recv_sync(None) { - Ok(msg) => return Poll::Ready(Ok(msg)), - Err(TryRecvTimeoutError::Disconnected) => { - return Poll::Ready(Err(RecvError::Disconnected)) - } - _ => (), - } - - let hook = self.hook.as_ref().map(Arc::clone).unwrap(); - if hook.update_waker(cx.waker()) { - // If the previous hook was awakened, we need to insert it back to the - // queue, otherwise, it remains valid. - wait_lock(&self.receiver.shared.chan) - .waiting - .push_back(hook); - } - // To avoid a missed wakeup, re-check disconnect status here because the channel might have - // gotten shut down before we had a chance to push our hook - if self.receiver.shared.is_disconnected() { - // And now, to avoid a race condition between the first recv attempt and the disconnect check we - // just performed, attempt to recv again just in case we missed something. - Poll::Ready( - self.receiver - .shared - .recv_sync(None) - .map(Ok) - .unwrap_or(Err(RecvError::Disconnected)), - ) - } else { - Poll::Pending - } - } else { - let mut_self = self.get_mut(); - let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook); - - shared.recv( - // should_block - true, - // make_signal - || Hook::trigger(AsyncSignal::new(cx, stream)), - // do_block - |hook| { - *this_hook = Some(hook); - Poll::Pending - }, - ) - .map(|r| r.map_err(|err| match err { - TryRecvTimeoutError::Disconnected => RecvError::Disconnected, - _ => unreachable!(), - })) - } - } - - /// See [`Receiver::is_disconnected`]. - pub fn is_disconnected(&self) -> bool { - self.receiver.is_disconnected() - } - - /// See [`Receiver::is_empty`]. - pub fn is_empty(&self) -> bool { - self.receiver.is_empty() - } - - /// See [`Receiver::is_full`]. - pub fn is_full(&self) -> bool { - self.receiver.is_full() - } - - /// See [`Receiver::len`]. - pub fn len(&self) -> usize { - self.receiver.len() - } - - /// See [`Receiver::capacity`]. - pub fn capacity(&self) -> Option<usize> { - self.receiver.capacity() - } -} - -impl<'a, T> Drop for RecvFut<'a, T> { - fn drop(&mut self) { - self.reset_hook(); - } -} - -impl<'a, T> Future for RecvFut<'a, T> { - type Output = Result<T, RecvError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - self.poll_inner(cx, false) // stream = false - } -} - -impl<'a, T> FusedFuture for RecvFut<'a, T> { - fn is_terminated(&self) -> bool { - self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty() - } -} - -/// A stream which allows asynchronously receiving messages. -/// -/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`]. -pub struct RecvStream<'a, T>(RecvFut<'a, T>); - -impl<'a, T> RecvStream<'a, T> { - /// See [`Receiver::is_disconnected`]. - pub fn is_disconnected(&self) -> bool { - self.0.is_disconnected() - } - - /// See [`Receiver::is_empty`]. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - /// See [`Receiver::is_full`]. - pub fn is_full(&self) -> bool { - self.0.is_full() - } - - /// See [`Receiver::len`]. - pub fn len(&self) -> usize { - self.0.len() - } - - /// See [`Receiver::capacity`]. - pub fn capacity(&self) -> Option<usize> { - self.0.capacity() - } - - /// Returns whether the SendSinks are belong to the same channel. - pub fn same_channel(&self, other: &Self) -> bool { - self.0.receiver.same_channel(&*other.0.receiver) - } -} - -impl<'a, T> Clone for RecvStream<'a, T> { - fn clone(&self) -> RecvStream<'a, T> { - RecvStream(RecvFut::new(self.0.receiver.clone())) - } -} - -impl<'a, T> Stream for RecvStream<'a, T> { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true - Poll::Pending => Poll::Pending, - Poll::Ready(item) => { - self.0.reset_hook(); - Poll::Ready(item.ok()) - } - } - } -} - -impl<'a, T> FusedStream for RecvStream<'a, T> { - fn is_terminated(&self) -> bool { - self.0.is_terminated() - } -} diff --git a/vendor/flume/src/lib.rs b/vendor/flume/src/lib.rs deleted file mode 100644 index c9bb3ee..0000000 --- a/vendor/flume/src/lib.rs +++ /dev/null @@ -1,1142 +0,0 @@ -//! # Flume -//! -//! A blazingly fast multi-producer, multi-consumer channel. -//! -//! *"Do not communicate by sharing memory; instead, share memory by communicating."* -//! -//! ## Why Flume? -//! -//! - **Featureful**: Unbounded, bounded and rendezvous queues -//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel` -//! - **Safe**: No `unsafe` code anywhere in the codebase! -//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone` -//! - **Familiar**: Drop-in replacement for `std::sync::mpsc` -//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines -//! - **Simple**: Few dependencies, minimal codebase, fast to compile -//! - **Asynchronous**: `async` support, including mix 'n match with sync code -//! - **Ergonomic**: Powerful `select`-like interface -//! -//! ## Example -//! -//! ``` -//! let (tx, rx) = flume::unbounded(); -//! -//! tx.send(42).unwrap(); -//! assert_eq!(rx.recv().unwrap(), 42); -//! ``` - -#![deny(missing_docs)] - -#[cfg(feature = "select")] -pub mod select; -#[cfg(feature = "async")] -pub mod r#async; - -mod signal; - -// Reexports -#[cfg(feature = "select")] -pub use select::Selector; - -use std::{ - collections::VecDeque, - sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak}, - time::{Duration, Instant}, - marker::PhantomData, - thread, - fmt, -}; - -#[cfg(feature = "spin")] -use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; -use crate::signal::{Signal, SyncSignal}; - -/// An error that may be emitted when attempting to send a value into a channel on a sender when -/// all receivers are dropped. -#[derive(Copy, Clone, PartialEq, Eq)] -pub struct SendError<T>(pub T); - -impl<T> SendError<T> { - /// Consume the error, yielding the message that failed to send. - pub fn into_inner(self) -> T { self.0 } -} - -impl<T> fmt::Debug for SendError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "SendError(..)".fmt(f) - } -} - -impl<T> fmt::Display for SendError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "sending on a closed channel".fmt(f) - } -} - -impl<T> std::error::Error for SendError<T> {} - -/// An error that may be emitted when attempting to send a value into a channel on a sender when -/// the channel is full or all receivers are dropped. -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum TrySendError<T> { - /// The channel the message is sent on has a finite capacity and was full when the send was attempted. - Full(T), - /// All channel receivers were dropped and so the message has nobody to receive it. - Disconnected(T), -} - -impl<T> TrySendError<T> { - /// Consume the error, yielding the message that failed to send. - pub fn into_inner(self) -> T { - match self { - Self::Full(msg) | Self::Disconnected(msg) => msg, - } - } -} - -impl<T> fmt::Debug for TrySendError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - TrySendError::Full(..) => "Full(..)".fmt(f), - TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), - } - } -} - -impl<T> fmt::Display for TrySendError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TrySendError::Full(..) => "sending on a full channel".fmt(f), - TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), - } - } -} - -impl<T> std::error::Error for TrySendError<T> {} - -impl<T> From<SendError<T>> for TrySendError<T> { - fn from(err: SendError<T>) -> Self { - match err { - SendError(item) => Self::Disconnected(item), - } - } -} - -/// An error that may be emitted when sending a value into a channel on a sender with a timeout when -/// the send operation times out or all receivers are dropped. -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum SendTimeoutError<T> { - /// A timeout occurred when attempting to send the message. - Timeout(T), - /// All channel receivers were dropped and so the message has nobody to receive it. - Disconnected(T), -} - -impl<T> SendTimeoutError<T> { - /// Consume the error, yielding the message that failed to send. - pub fn into_inner(self) -> T { - match self { - Self::Timeout(msg) | Self::Disconnected(msg) => msg, - } - } -} - -impl<T> fmt::Debug for SendTimeoutError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "SendTimeoutError(..)".fmt(f) - } -} - -impl<T> fmt::Display for SendTimeoutError<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f), - SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f), - } - } -} - -impl<T> std::error::Error for SendTimeoutError<T> {} - -impl<T> From<SendError<T>> for SendTimeoutError<T> { - fn from(err: SendError<T>) -> Self { - match err { - SendError(item) => Self::Disconnected(item), - } - } -} - -enum TrySendTimeoutError<T> { - Full(T), - Disconnected(T), - Timeout(T), -} - -/// An error that may be emitted when attempting to wait for a value on a receiver when all senders -/// are dropped and there are no more messages in the channel. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum RecvError { - /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. - Disconnected, -} - -impl fmt::Display for RecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RecvError::Disconnected => "receiving on a closed channel".fmt(f), - } - } -} - -impl std::error::Error for RecvError {} - -/// An error that may be emitted when attempting to fetch a value on a receiver when there are no -/// messages in the channel. If there are no messages in the channel and all senders are dropped, -/// then `TryRecvError::Disconnected` will be returned. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum TryRecvError { - /// The channel was empty when the receive was attempted. - Empty, - /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. - Disconnected, -} - -impl fmt::Display for TryRecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TryRecvError::Empty => "receiving on an empty channel".fmt(f), - TryRecvError::Disconnected => "channel is empty and closed".fmt(f), - } - } -} - -impl std::error::Error for TryRecvError {} - -impl From<RecvError> for TryRecvError { - fn from(err: RecvError) -> Self { - match err { - RecvError::Disconnected => Self::Disconnected, - } - } -} - -/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout -/// when the receive operation times out or all senders are dropped and there are no values left -/// in the channel. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum RecvTimeoutError { - /// A timeout occurred when attempting to receive a message. - Timeout, - /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. - Disconnected, -} - -impl fmt::Display for RecvTimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f), - RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f), - } - } -} - -impl std::error::Error for RecvTimeoutError {} - -impl From<RecvError> for RecvTimeoutError { - fn from(err: RecvError) -> Self { - match err { - RecvError::Disconnected => Self::Disconnected, - } - } -} - -enum TryRecvTimeoutError { - Empty, - Timeout, - Disconnected, -} - -// TODO: Investigate some sort of invalidation flag for timeouts -#[cfg(feature = "spin")] -struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S); - -#[cfg(not(feature = "spin"))] -struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S); - -#[cfg(feature = "spin")] -impl<T, S: ?Sized + Signal> Hook<T, S> { - pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> - where - S: Sized, - { - Arc::new(Self(Some(Spinlock::new(msg)), signal)) - } - - fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> { - self.0.as_ref().map(|s| s.lock()) - } -} - -#[cfg(not(feature = "spin"))] -impl<T, S: ?Sized + Signal> Hook<T, S> { - pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> - where - S: Sized, - { - Arc::new(Self(Some(Mutex::new(msg)), signal)) - } - - fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> { - self.0.as_ref().map(|s| s.lock().unwrap()) - } -} - -impl<T, S: ?Sized + Signal> Hook<T, S> { - pub fn fire_recv(&self) -> (T, &S) { - let msg = self.lock().unwrap().take().unwrap(); - (msg, self.signal()) - } - - pub fn fire_send(&self, msg: T) -> (Option<T>, &S) { - let ret = match self.lock() { - Some(mut lock) => { - *lock = Some(msg); - None - } - None => Some(msg), - }; - (ret, self.signal()) - } - - pub fn is_empty(&self) -> bool { - self.lock().map(|s| s.is_none()).unwrap_or(true) - } - - pub fn try_take(&self) -> Option<T> { - self.lock().unwrap().take() - } - - pub fn trigger(signal: S) -> Arc<Self> - where - S: Sized, - { - Arc::new(Self(None, signal)) - } - - pub fn signal(&self) -> &S { - &self.1 - } - - pub fn fire_nothing(&self) -> bool { - self.signal().fire() - } -} - -impl<T> Hook<T, SyncSignal> { - pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> { - loop { - let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - let msg = self.lock().unwrap().take(); - if let Some(msg) = msg { - break Some(msg); - } else if disconnected { - break None; - } else { - self.signal().wait() - } - } - } - - // Err(true) if timeout - pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> { - loop { - let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - let msg = self.lock().unwrap().take(); - if let Some(msg) = msg { - break Ok(msg); - } else if disconnected { - break Err(false); - } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - self.signal().wait_timeout(dur); - } else { - break Err(true); - } - } - } - - pub fn wait_send(&self, abort: &AtomicBool) { - loop { - let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - if disconnected || self.lock().unwrap().is_none() { - break; - } - - self.signal().wait(); - } - } - - // Err(true) if timeout - pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> { - loop { - let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - if self.lock().unwrap().is_none() { - break Ok(()); - } else if disconnected { - break Err(false); - } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - self.signal().wait_timeout(dur); - } else { - break Err(true); - } - } - } -} - -#[cfg(feature = "spin")] -#[inline] -fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> { - let mut i = 4; - loop { - for _ in 0..10 { - if let Some(guard) = lock.try_lock() { - return guard; - } - thread::yield_now(); - } - // Sleep for at most ~1 ms - thread::sleep(Duration::from_nanos(1 << i.min(20))); - i += 1; - } -} - -#[cfg(not(feature = "spin"))] -#[inline] -fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> { - lock.lock().unwrap() -} - -#[cfg(not(feature = "spin"))] -use std::sync::{Mutex, MutexGuard}; - -#[cfg(feature = "spin")] -type ChanLock<T> = Spinlock<T>; -#[cfg(not(feature = "spin"))] -type ChanLock<T> = Mutex<T>; - - -type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>; -struct Chan<T> { - sending: Option<(usize, SignalVec<T>)>, - queue: VecDeque<T>, - waiting: SignalVec<T>, -} - -impl<T> Chan<T> { - fn pull_pending(&mut self, pull_extra: bool) { - if let Some((cap, sending)) = &mut self.sending { - let effective_cap = *cap + pull_extra as usize; - - while self.queue.len() < effective_cap { - if let Some(s) = sending.pop_front() { - let (msg, signal) = s.fire_recv(); - signal.fire(); - self.queue.push_back(msg); - } else { - break; - } - } - } - } - - fn try_wake_receiver_if_pending(&mut self) { - if !self.queue.is_empty() { - while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {} - } - } -} - -struct Shared<T> { - chan: ChanLock<Chan<T>>, - disconnected: AtomicBool, - sender_count: AtomicUsize, - receiver_count: AtomicUsize, -} - -impl<T> Shared<T> { - fn new(cap: Option<usize>) -> Self { - Self { - chan: ChanLock::new(Chan { - sending: cap.map(|cap| (cap, VecDeque::new())), - queue: VecDeque::new(), - waiting: VecDeque::new(), - }), - disconnected: AtomicBool::new(false), - sender_count: AtomicUsize::new(1), - receiver_count: AtomicUsize::new(1), - } - } - - fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>( - &self, - msg: T, - should_block: bool, - make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>, - do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, - ) -> R { - let mut chan = wait_lock(&self.chan); - - if self.is_disconnected() { - Err(TrySendTimeoutError::Disconnected(msg)).into() - } else if !chan.waiting.is_empty() { - let mut msg = Some(msg); - - loop { - let slot = chan.waiting.pop_front(); - match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) { - // No more waiting receivers and msg in queue, so break out of the loop - None if msg.is_none() => break, - // No more waiting receivers, so add msg to queue and break out of the loop - None => { - chan.queue.push_back(msg.unwrap()); - break; - } - Some((Some(m), signal)) => { - if signal.fire() { - // Was async and a stream, so didn't acquire the message. Wake another - // receiver, and do not yet push the message. - msg.replace(m); - continue; - } else { - // Was async and not a stream, so it did acquire the message. Push the - // message to the queue for it to be received. - chan.queue.push_back(m); - drop(chan); - break; - } - }, - Some((None, signal)) => { - drop(chan); - signal.fire(); - break; // Was sync, so it has acquired the message - }, - } - } - - Ok(()).into() - } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) { - chan.queue.push_back(msg); - Ok(()).into() - } else if should_block { // Only bounded from here on - let hook = make_signal(msg); - chan.sending.as_mut().unwrap().1.push_back(hook.clone()); - drop(chan); - - do_block(hook) - } else { - Err(TrySendTimeoutError::Full(msg)).into() - } - } - - fn send_sync( - &self, - msg: T, - block: Option<Option<Instant>>, - ) -> Result<(), TrySendTimeoutError<T>> { - self.send( - // msg - msg, - // should_block - block.is_some(), - // make_signal - |msg| Hook::slot(Some(msg), SyncSignal::default()), - // do_block - |hook| if let Some(deadline) = block.unwrap() { - hook.wait_deadline_send(&self.disconnected, deadline) - .or_else(|timed_out| { - if timed_out { // Remove our signal - let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone(); - wait_lock(&self.chan).sending - .as_mut() - .unwrap().1 - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - } - hook.try_take().map(|msg| if self.is_disconnected() { - Err(TrySendTimeoutError::Disconnected(msg)) - } else { - Err(TrySendTimeoutError::Timeout(msg)) - }) - .unwrap_or(Ok(())) - }) - } else { - hook.wait_send(&self.disconnected); - - match hook.try_take() { - Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)), - None => Ok(()), - } - }, - ) - } - - fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>( - &self, - should_block: bool, - make_signal: impl FnOnce() -> Arc<Hook<T, S>>, - do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, - ) -> R { - let mut chan = wait_lock(&self.chan); - chan.pull_pending(true); - - if let Some(msg) = chan.queue.pop_front() { - drop(chan); - Ok(msg).into() - } else if self.is_disconnected() { - drop(chan); - Err(TryRecvTimeoutError::Disconnected).into() - } else if should_block { - let hook = make_signal(); - chan.waiting.push_back(hook.clone()); - drop(chan); - - do_block(hook) - } else { - drop(chan); - Err(TryRecvTimeoutError::Empty).into() - } - } - - fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> { - self.recv( - // should_block - block.is_some(), - // make_signal - || Hook::slot(None, SyncSignal::default()), - // do_block - |hook| if let Some(deadline) = block.unwrap() { - hook.wait_deadline_recv(&self.disconnected, deadline) - .or_else(|timed_out| { - if timed_out { // Remove our signal - let hook: Arc<Hook<T, dyn Signal>> = hook.clone(); - wait_lock(&self.chan).waiting - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - } - match hook.try_take() { - Some(msg) => Ok(msg), - None => { - let disconnected = self.is_disconnected(); // Check disconnect *before* msg - if let Some(msg) = wait_lock(&self.chan).queue.pop_front() { - Ok(msg) - } else if disconnected { - Err(TryRecvTimeoutError::Disconnected) - } else { - Err(TryRecvTimeoutError::Timeout) - } - }, - } - }) - } else { - hook.wait_recv(&self.disconnected) - .or_else(|| wait_lock(&self.chan).queue.pop_front()) - .ok_or(TryRecvTimeoutError::Disconnected) - }, - ) - } - - /// Disconnect anything listening on this channel (this will not prevent receivers receiving - /// msgs that have already been sent) - fn disconnect_all(&self) { - self.disconnected.store(true, Ordering::Relaxed); - - let mut chan = wait_lock(&self.chan); - chan.pull_pending(false); - if let Some((_, sending)) = chan.sending.as_ref() { - sending.iter().for_each(|hook| { - hook.signal().fire(); - }) - } - chan.waiting.iter().for_each(|hook| { - hook.signal().fire(); - }); - } - - fn is_disconnected(&self) -> bool { - self.disconnected.load(Ordering::SeqCst) - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn is_full(&self) -> bool { - self.capacity().map(|cap| cap == self.len()).unwrap_or(false) - } - - fn len(&self) -> usize { - let mut chan = wait_lock(&self.chan); - chan.pull_pending(false); - chan.queue.len() - } - - fn capacity(&self) -> Option<usize> { - wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap) - } - - fn sender_count(&self) -> usize { - self.sender_count.load(Ordering::Relaxed) - } - - fn receiver_count(&self) -> usize { - self.receiver_count.load(Ordering::Relaxed) - } -} - -/// A transmitting end of a channel. -pub struct Sender<T> { - shared: Arc<Shared<T>>, -} - -impl<T> Sender<T> { - /// Attempt to send a value into the channel. If the channel is bounded and full, or all - /// receivers have been dropped, an error is returned. If the channel associated with this - /// sender is unbounded, this method has the same behaviour as [`Sender::send`]. - pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { - self.shared.send_sync(msg, None).map_err(|err| match err { - TrySendTimeoutError::Full(msg) => TrySendError::Full(msg), - TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), - _ => unreachable!(), - }) - } - - /// Send a value into the channel, returning an error if all receivers have been dropped. - /// If the channel is bounded and is full, this method will block until space is available - /// or all receivers have been dropped. If the channel is unbounded, this method will not - /// block. - pub fn send(&self, msg: T) -> Result<(), SendError<T>> { - self.shared.send_sync(msg, Some(None)).map_err(|err| match err { - TrySendTimeoutError::Disconnected(msg) => SendError(msg), - _ => unreachable!(), - }) - } - - /// Send a value into the channel, returning an error if all receivers have been dropped - /// or the deadline has passed. If the channel is bounded and is full, this method will - /// block until space is available, the deadline is reached, or all receivers have been - /// dropped. - pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { - self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err { - TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg), - TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg), - _ => unreachable!(), - }) - } - - /// Send a value into the channel, returning an error if all receivers have been dropped - /// or the timeout has expired. If the channel is bounded and is full, this method will - /// block until space is available, the timeout has expired, or all receivers have been - /// dropped. - pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> { - self.send_deadline(msg, Instant::now().checked_add(dur).unwrap()) - } - - /// Returns true if all receivers for this channel have been dropped. - pub fn is_disconnected(&self) -> bool { - self.shared.is_disconnected() - } - - /// Returns true if the channel is empty. - /// Note: Zero-capacity channels are always empty. - pub fn is_empty(&self) -> bool { - self.shared.is_empty() - } - - /// Returns true if the channel is full. - /// Note: Zero-capacity channels are always full. - pub fn is_full(&self) -> bool { - self.shared.is_full() - } - - /// Returns the number of messages in the channel - pub fn len(&self) -> usize { - self.shared.len() - } - - /// If the channel is bounded, returns its capacity. - pub fn capacity(&self) -> Option<usize> { - self.shared.capacity() - } - - /// Get the number of senders that currently exist, including this one. - pub fn sender_count(&self) -> usize { - self.shared.sender_count() - } - - /// Get the number of receivers that currently exist. - /// - /// Note that this method makes no guarantees that a subsequent send will succeed; it's - /// possible that between `receiver_count()` being called and a `send()`, all open receivers - /// could drop. - pub fn receiver_count(&self) -> usize { - self.shared.receiver_count() - } - - /// Creates a [`WeakSender`] that does not keep the channel open. - /// - /// The channel is closed once all `Sender`s are dropped, even if there - /// are still active `WeakSender`s. - pub fn downgrade(&self) -> WeakSender<T> { - WeakSender { - shared: Arc::downgrade(&self.shared), - } - } - - /// Returns whether the senders are belong to the same channel. - pub fn same_channel(&self, other: &Sender<T>) -> bool { - Arc::ptr_eq(&self.shared, &other.shared) - } -} - -impl<T> Clone for Sender<T> { - /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel - /// contents will only be cleaned up when all senders and the receiver have been dropped. - fn clone(&self) -> Self { - self.shared.sender_count.fetch_add(1, Ordering::Relaxed); - Self { shared: self.shared.clone() } - } -} - -impl<T> fmt::Debug for Sender<T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Sender").finish() - } -} - -impl<T> Drop for Sender<T> { - fn drop(&mut self) { - // Notify receivers that all senders have been dropped if the number of senders drops to 0. - if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 { - self.shared.disconnect_all(); - } - } -} - -/// A sender that does not prevent the channel from being closed. -/// -/// Weak senders do not count towards the number of active senders on the channel. As soon as -/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a -/// `WeakSender`. -/// -/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`] -/// method. -pub struct WeakSender<T> { - shared: Weak<Shared<T>>, -} - -impl<T> WeakSender<T> { - /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages. - /// - /// Returns `None` if the channel was closed already. Note that a `Some` return value - /// does not guarantee that the channel is still open. - pub fn upgrade(&self) -> Option<Sender<T>> { - self.shared - .upgrade() - // check that there are still live senders - .filter(|shared| { - shared - .sender_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| { - if count == 0 { - // all senders are closed already -> don't increase the sender count - None - } else { - // there is still at least one active sender - Some(count + 1) - } - }) - .is_ok() - }) - .map(|shared| Sender { shared }) - } -} - -/// The receiving end of a channel. -/// -/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. -/// Each message will only be received by a single receiver. This is useful for -/// implementing work stealing for concurrent programs. -pub struct Receiver<T> { - shared: Arc<Shared<T>>, -} - -impl<T> Receiver<T> { - /// Attempt to fetch an incoming value from the channel associated with this receiver, - /// returning an error if the channel is empty or if all senders have been dropped. - pub fn try_recv(&self) -> Result<T, TryRecvError> { - self.shared.recv_sync(None).map_err(|err| match err { - TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected, - TryRecvTimeoutError::Empty => TryRecvError::Empty, - _ => unreachable!(), - }) - } - - /// Wait for an incoming value from the channel associated with this receiver, returning an - /// error if all senders have been dropped. - pub fn recv(&self) -> Result<T, RecvError> { - self.shared.recv_sync(Some(None)).map_err(|err| match err { - TryRecvTimeoutError::Disconnected => RecvError::Disconnected, - _ => unreachable!(), - }) - } - - /// Wait for an incoming value from the channel associated with this receiver, returning an - /// error if all senders have been dropped or the deadline has passed. - pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { - self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err { - TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, - TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout, - _ => unreachable!(), - }) - } - - /// Wait for an incoming value from the channel associated with this receiver, returning an - /// error if all senders have been dropped or the timeout has expired. - pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> { - self.recv_deadline(Instant::now().checked_add(dur).unwrap()) - } - - /// Create a blocking iterator over the values received on the channel that finishes iteration - /// when all senders have been dropped. - /// - /// You can also create a self-owned iterator with [`Receiver::into_iter`]. - pub fn iter(&self) -> Iter<T> { - Iter { receiver: &self } - } - - /// A non-blocking iterator over the values received on the channel that finishes iteration - /// when all senders have been dropped or the channel is empty. - pub fn try_iter(&self) -> TryIter<T> { - TryIter { receiver: &self } - } - - /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike - /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once - /// the function has been called. - pub fn drain(&self) -> Drain<T> { - let mut chan = wait_lock(&self.shared.chan); - chan.pull_pending(false); - let queue = std::mem::take(&mut chan.queue); - - Drain { queue, _phantom: PhantomData } - } - - /// Returns true if all senders for this channel have been dropped. - pub fn is_disconnected(&self) -> bool { - self.shared.is_disconnected() - } - - /// Returns true if the channel is empty. - /// Note: Zero-capacity channels are always empty. - pub fn is_empty(&self) -> bool { - self.shared.is_empty() - } - - /// Returns true if the channel is full. - /// Note: Zero-capacity channels are always full. - pub fn is_full(&self) -> bool { - self.shared.is_full() - } - - /// Returns the number of messages in the channel. - pub fn len(&self) -> usize { - self.shared.len() - } - - /// If the channel is bounded, returns its capacity. - pub fn capacity(&self) -> Option<usize> { - self.shared.capacity() - } - - /// Get the number of senders that currently exist. - pub fn sender_count(&self) -> usize { - self.shared.sender_count() - } - - /// Get the number of receivers that currently exist, including this one. - pub fn receiver_count(&self) -> usize { - self.shared.receiver_count() - } - - /// Returns whether the receivers are belong to the same channel. - pub fn same_channel(&self, other: &Receiver<T>) -> bool { - Arc::ptr_eq(&self.shared, &other.shared) - } -} - -impl<T> Clone for Receiver<T> { - /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining - /// channel contents will only be cleaned up when all senders and the receiver have been - /// dropped. - /// - /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. - /// Each message will only be received by a single receiver. This is useful for - /// implementing work stealing for concurrent programs. - fn clone(&self) -> Self { - self.shared.receiver_count.fetch_add(1, Ordering::Relaxed); - Self { shared: self.shared.clone() } - } -} - -impl<T> fmt::Debug for Receiver<T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Receiver").finish() - } -} - -impl<T> Drop for Receiver<T> { - fn drop(&mut self) { - // Notify senders that all receivers have been dropped if the number of receivers drops - // to 0. - if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 { - self.shared.disconnect_all(); - } - } -} - -/// This exists as a shorthand for [`Receiver::iter`]. -impl<'a, T> IntoIterator for &'a Receiver<T> { - type Item = T; - type IntoIter = Iter<'a, T>; - - fn into_iter(self) -> Self::IntoIter { - Iter { receiver: self } - } -} - -impl<T> IntoIterator for Receiver<T> { - type Item = T; - type IntoIter = IntoIter<T>; - - /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`]. - fn into_iter(self) -> Self::IntoIter { - IntoIter { receiver: self } - } -} - -/// An iterator over the msgs received from a channel. -pub struct Iter<'a, T> { - receiver: &'a Receiver<T>, -} - -impl<'a, T> Iterator for Iter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<Self::Item> { - self.receiver.recv().ok() - } -} - -/// An non-blocking iterator over the msgs received from a channel. -pub struct TryIter<'a, T> { - receiver: &'a Receiver<T>, -} - -impl<'a, T> Iterator for TryIter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<Self::Item> { - self.receiver.try_recv().ok() - } -} - -/// An fixed-sized iterator over the msgs drained from a channel. -#[derive(Debug)] -pub struct Drain<'a, T> { - queue: VecDeque<T>, - /// A phantom field used to constrain the lifetime of this iterator. We do this because the - /// implementation may change and we don't want to unintentionally constrain it. Removing this - /// lifetime later is a possibility. - _phantom: PhantomData<&'a ()>, -} - -impl<'a, T> Iterator for Drain<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<Self::Item> { - self.queue.pop_front() - } -} - -impl<'a, T> ExactSizeIterator for Drain<'a, T> { - fn len(&self) -> usize { - self.queue.len() - } -} - -/// An owned iterator over the msgs received from a channel. -pub struct IntoIter<T> { - receiver: Receiver<T>, -} - -impl<T> Iterator for IntoIter<T> { - type Item = T; - - fn next(&mut self) -> Option<Self::Item> { - self.receiver.recv().ok() - } -} - -/// Create a channel with no maximum capacity. -/// -/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in -/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and -/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] -/// may be cloned. -/// -/// # Examples -/// ``` -/// let (tx, rx) = flume::unbounded(); -/// -/// tx.send(42).unwrap(); -/// assert_eq!(rx.recv().unwrap(), 42); -/// ``` -pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { - let shared = Arc::new(Shared::new(None)); - ( - Sender { shared: shared.clone() }, - Receiver { shared }, - ) -} - -/// Create a channel with a maximum capacity. -/// -/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one -/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and -/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] -/// may be cloned. -/// -/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to -/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour -/// is not desired, [`Sender::try_send`] may be used. -/// -/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero -/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a -/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers -/// perform a handshake and transfer ownership of a value. -/// -/// # Examples -/// ``` -/// let (tx, rx) = flume::bounded(32); -/// -/// for i in 1..33 { -/// tx.send(i).unwrap(); -/// } -/// assert!(tx.try_send(33).is_err()); -/// -/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum()); -/// ``` -pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { - let shared = Arc::new(Shared::new(Some(cap))); - ( - Sender { shared: shared.clone() }, - Receiver { shared }, - ) -} diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs deleted file mode 100644 index cef15aa..0000000 --- a/vendor/flume/src/select.rs +++ /dev/null @@ -1,405 +0,0 @@ -//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. - -use crate::*; -use spin1::Mutex as Spinlock; -use std::{any::Any, marker::PhantomData}; - -#[cfg(feature = "eventual-fairness")] -use nanorand::Rng; - -// A unique token corresponding to an event in a selector -type Token = usize; - -struct SelectSignal( - thread::Thread, - Token, - AtomicBool, - Arc<Spinlock<VecDeque<Token>>>, -); - -impl Signal for SelectSignal { - fn fire(&self) -> bool { - self.2.store(true, Ordering::SeqCst); - self.3.lock().push_back(self.1); - self.0.unpark(); - false - } - - fn as_any(&self) -> &(dyn Any + 'static) { - self - } - fn as_ptr(&self) -> *const () { - self as *const _ as *const () - } -} - -trait Selection<'a, T> { - fn init(&mut self) -> Option<T>; - fn poll(&mut self) -> Option<T>; - fn deinit(&mut self); -} - -/// An error that may be emitted when attempting to wait for a value on a receiver. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum SelectError { - /// A timeout occurred when waiting on a `Selector`. - Timeout, -} - -impl fmt::Display for SelectError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SelectError::Timeout => "timeout occurred".fmt(f), - } - } -} - -impl std::error::Error for SelectError {} - -/// A type used to wait upon multiple blocking operations at once. -/// -/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour, -/// allowing a thread to wait upon the result of more than one operation at once. -/// -/// # Examples -/// ``` -/// let (tx0, rx0) = flume::unbounded(); -/// let (tx1, rx1) = flume::unbounded(); -/// -/// std::thread::spawn(move || { -/// tx0.send(true).unwrap(); -/// tx1.send(42).unwrap(); -/// }); -/// -/// flume::Selector::new() -/// .recv(&rx0, |b| println!("Received {:?}", b)) -/// .recv(&rx1, |n| println!("Received {:?}", n)) -/// .wait(); -/// ``` -pub struct Selector<'a, T: 'a> { - selections: Vec<Box<dyn Selection<'a, T> + 'a>>, - next_poll: usize, - signalled: Arc<Spinlock<VecDeque<Token>>>, - #[cfg(feature = "eventual-fairness")] - rng: nanorand::WyRand, - phantom: PhantomData<*const ()>, -} - -impl<'a, T: 'a> Default for Selector<'a, T> { - fn default() -> Self { - Self::new() - } -} - -impl<'a, T: 'a> fmt::Debug for Selector<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Selector").finish() - } -} - -impl<'a, T> Selector<'a, T> { - /// Create a new selector. - pub fn new() -> Self { - Self { - selections: Vec::new(), - next_poll: 0, - signalled: Arc::default(), - phantom: PhantomData::default(), - #[cfg(feature = "eventual-fairness")] - rng: nanorand::WyRand::new(), - } - } - - /// Add a send operation to the selector that sends the provided value. - /// - /// Once added, the selector can be used to run the provided handler function on completion of this operation. - pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>( - mut self, - sender: &'a Sender<U>, - msg: U, - mapper: F, - ) -> Self { - struct SendSelection<'a, T, F, U> { - sender: &'a Sender<U>, - msg: Option<U>, - token: Token, - signalled: Arc<Spinlock<VecDeque<Token>>>, - hook: Option<Arc<Hook<U, SelectSignal>>>, - mapper: F, - phantom: PhantomData<T>, - } - - impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U> - where - F: FnMut(Result<(), SendError<U>>) -> T, - { - fn init(&mut self) -> Option<T> { - let token = self.token; - let signalled = self.signalled.clone(); - let r = self.sender.shared.send( - self.msg.take().unwrap(), - true, - |msg| { - Hook::slot( - Some(msg), - SelectSignal( - thread::current(), - token, - AtomicBool::new(false), - signalled, - ), - ) - }, - // Always runs - |h| { - self.hook = Some(h); - Ok(()) - }, - ); - - if self.hook.is_none() { - Some((self.mapper)(match r { - Ok(()) => Ok(()), - Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)), - _ => unreachable!(), - })) - } else { - None - } - } - - fn poll(&mut self) -> Option<T> { - let res = if self.sender.shared.is_disconnected() { - // Check the hook one last time - if let Some(msg) = self.hook.as_ref()?.try_take() { - Err(SendError(msg)) - } else { - Ok(()) - } - } else if self.hook.as_ref().unwrap().is_empty() { - // The message was sent - Ok(()) - } else { - return None; - }; - - Some((&mut self.mapper)(res)) - } - - fn deinit(&mut self) { - if let Some(hook) = self.hook.take() { - // Remove hook - let hook: Arc<Hook<U, dyn Signal>> = hook; - wait_lock(&self.sender.shared.chan) - .sending - .as_mut() - .unwrap() - .1 - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - } - } - } - - let token = self.selections.len(); - self.selections.push(Box::new(SendSelection { - sender, - msg: Some(msg), - token, - signalled: self.signalled.clone(), - hook: None, - mapper, - phantom: Default::default(), - })); - - self - } - - /// Add a receive operation to the selector. - /// - /// Once added, the selector can be used to run the provided handler function on completion of this operation. - pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>( - mut self, - receiver: &'a Receiver<U>, - mapper: F, - ) -> Self { - struct RecvSelection<'a, T, F, U> { - receiver: &'a Receiver<U>, - token: Token, - signalled: Arc<Spinlock<VecDeque<Token>>>, - hook: Option<Arc<Hook<U, SelectSignal>>>, - mapper: F, - received: bool, - phantom: PhantomData<T>, - } - - impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U> - where - F: FnMut(Result<U, RecvError>) -> T, - { - fn init(&mut self) -> Option<T> { - let token = self.token; - let signalled = self.signalled.clone(); - let r = self.receiver.shared.recv( - true, - || { - Hook::trigger(SelectSignal( - thread::current(), - token, - AtomicBool::new(false), - signalled, - )) - }, - // Always runs - |h| { - self.hook = Some(h); - Err(TryRecvTimeoutError::Timeout) - }, - ); - - if self.hook.is_none() { - Some((self.mapper)(match r { - Ok(msg) => Ok(msg), - Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected), - _ => unreachable!(), - })) - } else { - None - } - } - - fn poll(&mut self) -> Option<T> { - let res = if let Ok(msg) = self.receiver.try_recv() { - self.received = true; - Ok(msg) - } else if self.receiver.shared.is_disconnected() { - Err(RecvError::Disconnected) - } else { - return None; - }; - - Some((&mut self.mapper)(res)) - } - - fn deinit(&mut self) { - if let Some(hook) = self.hook.take() { - // Remove hook - let hook: Arc<Hook<U, dyn Signal>> = hook; - wait_lock(&self.receiver.shared.chan) - .waiting - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - // If we were woken, but never polled, wake up another - if !self.received - && hook - .signal() - .as_any() - .downcast_ref::<SelectSignal>() - .unwrap() - .2 - .load(Ordering::SeqCst) - { - wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending(); - } - } - } - } - - let token = self.selections.len(); - self.selections.push(Box::new(RecvSelection { - receiver, - token, - signalled: self.signalled.clone(), - hook: None, - mapper, - received: false, - phantom: Default::default(), - })); - - self - } - - fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> { - #[cfg(feature = "eventual-fairness")] - { - self.next_poll = self.rng.generate_range(0..self.selections.len()); - } - - let res = 'outer: loop { - // Init signals - for _ in 0..self.selections.len() { - if let Some(val) = self.selections[self.next_poll].init() { - break 'outer Some(val); - } - self.next_poll = (self.next_poll + 1) % self.selections.len(); - } - - // Speculatively poll - if let Some(msg) = self.poll() { - break 'outer Some(msg); - } - - loop { - if let Some(deadline) = deadline { - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - thread::park_timeout(dur); - } - } else { - thread::park(); - } - - if deadline.map(|d| Instant::now() >= d).unwrap_or(false) { - break 'outer self.poll(); - } - - let token = if let Some(token) = self.signalled.lock().pop_front() { - token - } else { - // Spurious wakeup, park again - continue; - }; - - // Attempt to receive a message - if let Some(msg) = self.selections[token].poll() { - break 'outer Some(msg); - } - } - }; - - // Deinit signals - for s in &mut self.selections { - s.deinit(); - } - - res - } - - fn poll(&mut self) -> Option<T> { - for _ in 0..self.selections.len() { - if let Some(val) = self.selections[self.next_poll].poll() { - return Some(val); - } - self.next_poll = (self.next_poll + 1) % self.selections.len(); - } - None - } - - /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness` - /// feature flag is enabled, this method is fair and will handle a random event of those that are ready. - pub fn wait(self) -> T { - self.wait_inner(None).unwrap() - } - - /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the - /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that - /// are ready. - pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> { - self.wait_inner(Some(Instant::now() + dur)) - .ok_or(SelectError::Timeout) - } - - /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached. - /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those - /// that are ready. - pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> { - self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout) - } -} diff --git a/vendor/flume/src/signal.rs b/vendor/flume/src/signal.rs deleted file mode 100644 index 89395a3..0000000 --- a/vendor/flume/src/signal.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::{thread::{self, Thread}, time::Duration, any::Any}; - -pub trait Signal: Send + Sync + 'static { - /// Fire the signal, returning whether it is a stream signal. This is because streams do not - /// acquire a message when woken, so signals must be fired until one that does acquire a message - /// is fired, otherwise a wakeup could be missed, leading to a lost message until one is eagerly - /// grabbed by a receiver. - fn fire(&self) -> bool; - fn as_any(&self) -> &(dyn Any + 'static); - fn as_ptr(&self) -> *const (); -} - -pub struct SyncSignal(Thread); - -impl Default for SyncSignal { - fn default() -> Self { - Self(thread::current()) - } -} - -impl Signal for SyncSignal { - fn fire(&self) -> bool { - self.0.unpark(); - false - } - fn as_any(&self) -> &(dyn Any + 'static) { self } - fn as_ptr(&self) -> *const () { self as *const _ as *const () } -} - -impl SyncSignal { - pub fn wait(&self) { thread::park(); } - pub fn wait_timeout(&self, dur: Duration) { thread::park_timeout(dur); } -} |