aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/src
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/flume/src
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/flume/src')
-rw-r--r--vendor/flume/src/async.rs543
-rw-r--r--vendor/flume/src/lib.rs1142
-rw-r--r--vendor/flume/src/select.rs405
-rw-r--r--vendor/flume/src/signal.rs33
4 files changed, 2123 insertions, 0 deletions
diff --git a/vendor/flume/src/async.rs b/vendor/flume/src/async.rs
new file mode 100644
index 0000000..fae44d4
--- /dev/null
+++ b/vendor/flume/src/async.rs
@@ -0,0 +1,543 @@
+//! Futures and other types that allow asynchronous interaction with channels.
+
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll, Waker},
+ any::Any,
+ ops::Deref,
+};
+use crate::*;
+use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
+use futures_sink::Sink;
+use spin1::Mutex as Spinlock;
+
+struct AsyncSignal {
+ waker: Spinlock<Waker>,
+ woken: AtomicBool,
+ stream: bool,
+}
+
+impl AsyncSignal {
+ fn new(cx: &Context, stream: bool) -> Self {
+ AsyncSignal {
+ waker: Spinlock::new(cx.waker().clone()),
+ woken: AtomicBool::new(false),
+ stream,
+ }
+ }
+}
+
+impl Signal for AsyncSignal {
+ fn fire(&self) -> bool {
+ self.woken.store(true, Ordering::SeqCst);
+ self.waker.lock().wake_by_ref();
+ self.stream
+ }
+
+ fn as_any(&self) -> &(dyn Any + 'static) { self }
+ fn as_ptr(&self) -> *const () { self as *const _ as *const () }
+}
+
+impl<T> Hook<T, AsyncSignal> {
+ // Update the hook to point to the given Waker.
+ // Returns whether the hook has been previously awakened
+ fn update_waker(&self, cx_waker: &Waker) -> bool {
+ let mut waker = self.1.waker.lock();
+ let woken = self.1.woken.load(Ordering::SeqCst);
+ if !waker.will_wake(cx_waker) {
+ *waker = cx_waker.clone();
+
+ // Avoid the edge case where the waker was woken just before the wakers were
+ // swapped.
+ if woken {
+ cx_waker.wake_by_ref();
+ }
+ }
+ woken
+ }
+}
+
+#[derive(Clone)]
+enum OwnedOrRef<'a, T> {
+ Owned(T),
+ Ref(&'a T),
+}
+
+impl<'a, T> Deref for OwnedOrRef<'a, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ match self {
+ OwnedOrRef::Owned(arc) => &arc,
+ OwnedOrRef::Ref(r) => r,
+ }
+ }
+}
+
+impl<T> Sender<T> {
+ /// Asynchronously send a value into the channel, returning an error if all receivers have been
+ /// dropped. If the channel is bounded and is full, the returned future will yield to the async
+ /// runtime.
+ ///
+ /// In the current implementation, the returned future will not yield to the async runtime if the
+ /// channel is unbounded. This may change in later versions.
+ pub fn send_async(&self, item: T) -> SendFut<T> {
+ SendFut {
+ sender: OwnedOrRef::Ref(&self),
+ hook: Some(SendState::NotYetSent(item)),
+ }
+ }
+
+ /// Convert this sender into a future that asynchronously sends a single message into the channel,
+ /// returning an error if all receivers have been dropped. If the channel is bounded and is full,
+ /// this future will yield to the async runtime.
+ ///
+ /// In the current implementation, the returned future will not yield to the async runtime if the
+ /// channel is unbounded. This may change in later versions.
+ pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
+ SendFut {
+ sender: OwnedOrRef::Owned(self),
+ hook: Some(SendState::NotYetSent(item)),
+ }
+ }
+
+ /// Create an asynchronous sink that uses this sender to asynchronously send messages into the
+ /// channel. The sender will continue to be usable after the sink has been dropped.
+ ///
+ /// In the current implementation, the returned sink will not yield to the async runtime if the
+ /// channel is unbounded. This may change in later versions.
+ pub fn sink(&self) -> SendSink<'_, T> {
+ SendSink(SendFut {
+ sender: OwnedOrRef::Ref(&self),
+ hook: None,
+ })
+ }
+
+ /// Convert this sender into a sink that allows asynchronously sending messages into the channel.
+ ///
+ /// In the current implementation, the returned sink will not yield to the async runtime if the
+ /// channel is unbounded. This may change in later versions.
+ pub fn into_sink<'a>(self) -> SendSink<'a, T> {
+ SendSink(SendFut {
+ sender: OwnedOrRef::Owned(self),
+ hook: None,
+ })
+ }
+}
+
+enum SendState<T> {
+ NotYetSent(T),
+ QueuedItem(Arc<Hook<T, AsyncSignal>>),
+}
+
+/// A future that sends a value into a channel.
+///
+/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`].
+#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
+pub struct SendFut<'a, T> {
+ sender: OwnedOrRef<'a, Sender<T>>,
+ // Only none after dropping
+ hook: Option<SendState<T>>,
+}
+
+impl<T> std::marker::Unpin for SendFut<'_, T> {}
+
+impl<'a, T> SendFut<'a, T> {
+ /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called
+ /// on drop and just before `start_send` in the `Sink` implementation.
+ fn reset_hook(&mut self) {
+ if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
+ let hook: Arc<Hook<T, dyn Signal>> = hook;
+ wait_lock(&self.sender.shared.chan).sending
+ .as_mut()
+ .unwrap().1
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ }
+ }
+
+ /// See [`Sender::is_disconnected`].
+ pub fn is_disconnected(&self) -> bool {
+ self.sender.is_disconnected()
+ }
+
+ /// See [`Sender::is_empty`].
+ pub fn is_empty(&self) -> bool {
+ self.sender.is_empty()
+ }
+
+ /// See [`Sender::is_full`].
+ pub fn is_full(&self) -> bool {
+ self.sender.is_full()
+ }
+
+ /// See [`Sender::len`].
+ pub fn len(&self) -> usize {
+ self.sender.len()
+ }
+
+ /// See [`Sender::capacity`].
+ pub fn capacity(&self) -> Option<usize> {
+ self.sender.capacity()
+ }
+}
+
+impl<'a, T> Drop for SendFut<'a, T> {
+ fn drop(&mut self) {
+ self.reset_hook()
+ }
+}
+
+
+impl<'a, T> Future for SendFut<'a, T> {
+ type Output = Result<(), SendError<T>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
+ if hook.is_empty() {
+ Poll::Ready(Ok(()))
+ } else if self.sender.shared.is_disconnected() {
+ let item = hook.try_take();
+ self.hook = None;
+ match item {
+ Some(item) => Poll::Ready(Err(SendError(item))),
+ None => Poll::Ready(Ok(())),
+ }
+ } else {
+ hook.update_waker(cx.waker());
+ Poll::Pending
+ }
+ } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
+ let this = self.get_mut();
+ let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
+
+ shared.send(
+ // item
+ item,
+ // should_block
+ true,
+ // make_signal
+ |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
+ // do_block
+ |hook| {
+ *this_hook = Some(SendState::QueuedItem(hook));
+ Poll::Pending
+ },
+ )
+ .map(|r| r.map_err(|err| match err {
+ TrySendTimeoutError::Disconnected(msg) => SendError(msg),
+ _ => unreachable!(),
+ }))
+ } else { // Nothing to do
+ Poll::Ready(Ok(()))
+ }
+ }
+}
+
+impl<'a, T> FusedFuture for SendFut<'a, T> {
+ fn is_terminated(&self) -> bool {
+ self.sender.shared.is_disconnected()
+ }
+}
+
+/// A sink that allows sending values into a channel.
+///
+/// Can be created via [`Sender::sink`] or [`Sender::into_sink`].
+pub struct SendSink<'a, T>(SendFut<'a, T>);
+
+impl<'a, T> SendSink<'a, T> {
+ /// Returns a clone of a sending half of the channel of this sink.
+ pub fn sender(&self) -> &Sender<T> {
+ &self.0.sender
+ }
+
+ /// See [`Sender::is_disconnected`].
+ pub fn is_disconnected(&self) -> bool {
+ self.0.is_disconnected()
+ }
+
+ /// See [`Sender::is_empty`].
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// See [`Sender::is_full`].
+ pub fn is_full(&self) -> bool {
+ self.0.is_full()
+ }
+
+ /// See [`Sender::len`].
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// See [`Sender::capacity`].
+ pub fn capacity(&self) -> Option<usize> {
+ self.0.capacity()
+ }
+
+ /// Returns whether the SendSinks are belong to the same channel.
+ pub fn same_channel(&self, other: &Self) -> bool {
+ self.sender().same_channel(other.sender())
+ }
+}
+
+impl<'a, T> Sink<T> for SendSink<'a, T> {
+ type Error = SendError<T>;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.0).poll(cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ self.0.reset_hook();
+ self.0.hook = Some(SendState::NotYetSent(item));
+
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
+ }
+}
+
+impl<'a, T> Clone for SendSink<'a, T> {
+ fn clone(&self) -> SendSink<'a, T> {
+ SendSink(SendFut {
+ sender: self.0.sender.clone(),
+ hook: None,
+ })
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Asynchronously receive a value from the channel, returning an error if all senders have been
+ /// dropped. If the channel is empty, the returned future will yield to the async runtime.
+ pub fn recv_async(&self) -> RecvFut<'_, T> {
+ RecvFut::new(OwnedOrRef::Ref(self))
+ }
+
+ /// Convert this receiver into a future that asynchronously receives a single message from the
+ /// channel, returning an error if all senders have been dropped. If the channel is empty, this
+ /// future will yield to the async runtime.
+ pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
+ RecvFut::new(OwnedOrRef::Owned(self))
+ }
+
+ /// Create an asynchronous stream that uses this receiver to asynchronously receive messages
+ /// from the channel. The receiver will continue to be usable after the stream has been dropped.
+ pub fn stream(&self) -> RecvStream<'_, T> {
+ RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
+ }
+
+ /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel.
+ pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
+ RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
+ }
+}
+
+/// A future which allows asynchronously receiving a message.
+///
+/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`].
+#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
+pub struct RecvFut<'a, T> {
+ receiver: OwnedOrRef<'a, Receiver<T>>,
+ hook: Option<Arc<Hook<T, AsyncSignal>>>,
+}
+
+impl<'a, T> RecvFut<'a, T> {
+ fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
+ Self {
+ receiver,
+ hook: None,
+ }
+ }
+
+ /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking
+ /// another receiver if this receiver has been woken, so as not to cause any missed wakeups.
+ /// This is called on drop and after a new item is received in `Stream::poll_next`.
+ fn reset_hook(&mut self) {
+ if let Some(hook) = self.hook.take() {
+ let hook: Arc<Hook<T, dyn Signal>> = hook;
+ let mut chan = wait_lock(&self.receiver.shared.chan);
+ // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers?
+ chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) {
+ // If this signal has been fired, but we're being dropped (and so not listening to it),
+ // pass the signal on to another receiver
+ chan.try_wake_receiver_if_pending();
+ }
+ }
+ }
+
+ fn poll_inner(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ stream: bool,
+ ) -> Poll<Result<T, RecvError>> {
+ if self.hook.is_some() {
+ match self.receiver.shared.recv_sync(None) {
+ Ok(msg) => return Poll::Ready(Ok(msg)),
+ Err(TryRecvTimeoutError::Disconnected) => {
+ return Poll::Ready(Err(RecvError::Disconnected))
+ }
+ _ => (),
+ }
+
+ let hook = self.hook.as_ref().map(Arc::clone).unwrap();
+ if hook.update_waker(cx.waker()) {
+ // If the previous hook was awakened, we need to insert it back to the
+ // queue, otherwise, it remains valid.
+ wait_lock(&self.receiver.shared.chan)
+ .waiting
+ .push_back(hook);
+ }
+ // To avoid a missed wakeup, re-check disconnect status here because the channel might have
+ // gotten shut down before we had a chance to push our hook
+ if self.receiver.shared.is_disconnected() {
+ // And now, to avoid a race condition between the first recv attempt and the disconnect check we
+ // just performed, attempt to recv again just in case we missed something.
+ Poll::Ready(
+ self.receiver
+ .shared
+ .recv_sync(None)
+ .map(Ok)
+ .unwrap_or(Err(RecvError::Disconnected)),
+ )
+ } else {
+ Poll::Pending
+ }
+ } else {
+ let mut_self = self.get_mut();
+ let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
+
+ shared.recv(
+ // should_block
+ true,
+ // make_signal
+ || Hook::trigger(AsyncSignal::new(cx, stream)),
+ // do_block
+ |hook| {
+ *this_hook = Some(hook);
+ Poll::Pending
+ },
+ )
+ .map(|r| r.map_err(|err| match err {
+ TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
+ _ => unreachable!(),
+ }))
+ }
+ }
+
+ /// See [`Receiver::is_disconnected`].
+ pub fn is_disconnected(&self) -> bool {
+ self.receiver.is_disconnected()
+ }
+
+ /// See [`Receiver::is_empty`].
+ pub fn is_empty(&self) -> bool {
+ self.receiver.is_empty()
+ }
+
+ /// See [`Receiver::is_full`].
+ pub fn is_full(&self) -> bool {
+ self.receiver.is_full()
+ }
+
+ /// See [`Receiver::len`].
+ pub fn len(&self) -> usize {
+ self.receiver.len()
+ }
+
+ /// See [`Receiver::capacity`].
+ pub fn capacity(&self) -> Option<usize> {
+ self.receiver.capacity()
+ }
+}
+
+impl<'a, T> Drop for RecvFut<'a, T> {
+ fn drop(&mut self) {
+ self.reset_hook();
+ }
+}
+
+impl<'a, T> Future for RecvFut<'a, T> {
+ type Output = Result<T, RecvError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.poll_inner(cx, false) // stream = false
+ }
+}
+
+impl<'a, T> FusedFuture for RecvFut<'a, T> {
+ fn is_terminated(&self) -> bool {
+ self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
+ }
+}
+
+/// A stream which allows asynchronously receiving messages.
+///
+/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`].
+pub struct RecvStream<'a, T>(RecvFut<'a, T>);
+
+impl<'a, T> RecvStream<'a, T> {
+ /// See [`Receiver::is_disconnected`].
+ pub fn is_disconnected(&self) -> bool {
+ self.0.is_disconnected()
+ }
+
+ /// See [`Receiver::is_empty`].
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// See [`Receiver::is_full`].
+ pub fn is_full(&self) -> bool {
+ self.0.is_full()
+ }
+
+ /// See [`Receiver::len`].
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// See [`Receiver::capacity`].
+ pub fn capacity(&self) -> Option<usize> {
+ self.0.capacity()
+ }
+
+ /// Returns whether the SendSinks are belong to the same channel.
+ pub fn same_channel(&self, other: &Self) -> bool {
+ self.0.receiver.same_channel(&*other.0.receiver)
+ }
+}
+
+impl<'a, T> Clone for RecvStream<'a, T> {
+ fn clone(&self) -> RecvStream<'a, T> {
+ RecvStream(RecvFut::new(self.0.receiver.clone()))
+ }
+}
+
+impl<'a, T> Stream for RecvStream<'a, T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(item) => {
+ self.0.reset_hook();
+ Poll::Ready(item.ok())
+ }
+ }
+ }
+}
+
+impl<'a, T> FusedStream for RecvStream<'a, T> {
+ fn is_terminated(&self) -> bool {
+ self.0.is_terminated()
+ }
+}
diff --git a/vendor/flume/src/lib.rs b/vendor/flume/src/lib.rs
new file mode 100644
index 0000000..c9bb3ee
--- /dev/null
+++ b/vendor/flume/src/lib.rs
@@ -0,0 +1,1142 @@
+//! # Flume
+//!
+//! A blazingly fast multi-producer, multi-consumer channel.
+//!
+//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
+//!
+//! ## Why Flume?
+//!
+//! - **Featureful**: Unbounded, bounded and rendezvous queues
+//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
+//! - **Safe**: No `unsafe` code anywhere in the codebase!
+//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
+//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
+//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
+//! - **Simple**: Few dependencies, minimal codebase, fast to compile
+//! - **Asynchronous**: `async` support, including mix 'n match with sync code
+//! - **Ergonomic**: Powerful `select`-like interface
+//!
+//! ## Example
+//!
+//! ```
+//! let (tx, rx) = flume::unbounded();
+//!
+//! tx.send(42).unwrap();
+//! assert_eq!(rx.recv().unwrap(), 42);
+//! ```
+
+#![deny(missing_docs)]
+
+#[cfg(feature = "select")]
+pub mod select;
+#[cfg(feature = "async")]
+pub mod r#async;
+
+mod signal;
+
+// Reexports
+#[cfg(feature = "select")]
+pub use select::Selector;
+
+use std::{
+ collections::VecDeque,
+ sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
+ time::{Duration, Instant},
+ marker::PhantomData,
+ thread,
+ fmt,
+};
+
+#[cfg(feature = "spin")]
+use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
+use crate::signal::{Signal, SyncSignal};
+
+/// An error that may be emitted when attempting to send a value into a channel on a sender when
+/// all receivers are dropped.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub struct SendError<T>(pub T);
+
+impl<T> SendError<T> {
+ /// Consume the error, yielding the message that failed to send.
+ pub fn into_inner(self) -> T { self.0 }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "SendError(..)".fmt(f)
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "sending on a closed channel".fmt(f)
+ }
+}
+
+impl<T> std::error::Error for SendError<T> {}
+
+/// An error that may be emitted when attempting to send a value into a channel on a sender when
+/// the channel is full or all receivers are dropped.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub enum TrySendError<T> {
+ /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
+ Full(T),
+ /// All channel receivers were dropped and so the message has nobody to receive it.
+ Disconnected(T),
+}
+
+impl<T> TrySendError<T> {
+ /// Consume the error, yielding the message that failed to send.
+ pub fn into_inner(self) -> T {
+ match self {
+ Self::Full(msg) | Self::Disconnected(msg) => msg,
+ }
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ TrySendError::Full(..) => "Full(..)".fmt(f),
+ TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
+ }
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ TrySendError::Full(..) => "sending on a full channel".fmt(f),
+ TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
+ }
+ }
+}
+
+impl<T> std::error::Error for TrySendError<T> {}
+
+impl<T> From<SendError<T>> for TrySendError<T> {
+ fn from(err: SendError<T>) -> Self {
+ match err {
+ SendError(item) => Self::Disconnected(item),
+ }
+ }
+}
+
+/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
+/// the send operation times out or all receivers are dropped.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub enum SendTimeoutError<T> {
+ /// A timeout occurred when attempting to send the message.
+ Timeout(T),
+ /// All channel receivers were dropped and so the message has nobody to receive it.
+ Disconnected(T),
+}
+
+impl<T> SendTimeoutError<T> {
+ /// Consume the error, yielding the message that failed to send.
+ pub fn into_inner(self) -> T {
+ match self {
+ Self::Timeout(msg) | Self::Disconnected(msg) => msg,
+ }
+ }
+}
+
+impl<T> fmt::Debug for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "SendTimeoutError(..)".fmt(f)
+ }
+}
+
+impl<T> fmt::Display for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
+ SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
+ }
+ }
+}
+
+impl<T> std::error::Error for SendTimeoutError<T> {}
+
+impl<T> From<SendError<T>> for SendTimeoutError<T> {
+ fn from(err: SendError<T>) -> Self {
+ match err {
+ SendError(item) => Self::Disconnected(item),
+ }
+ }
+}
+
+enum TrySendTimeoutError<T> {
+ Full(T),
+ Disconnected(T),
+ Timeout(T),
+}
+
+/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
+/// are dropped and there are no more messages in the channel.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum RecvError {
+ /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
+ Disconnected,
+}
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ RecvError::Disconnected => "receiving on a closed channel".fmt(f),
+ }
+ }
+}
+
+impl std::error::Error for RecvError {}
+
+/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
+/// messages in the channel. If there are no messages in the channel and all senders are dropped,
+/// then `TryRecvError::Disconnected` will be returned.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum TryRecvError {
+ /// The channel was empty when the receive was attempted.
+ Empty,
+ /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
+ Disconnected,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ TryRecvError::Empty => "receiving on an empty channel".fmt(f),
+ TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
+ }
+ }
+}
+
+impl std::error::Error for TryRecvError {}
+
+impl From<RecvError> for TryRecvError {
+ fn from(err: RecvError) -> Self {
+ match err {
+ RecvError::Disconnected => Self::Disconnected,
+ }
+ }
+}
+
+/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
+/// when the receive operation times out or all senders are dropped and there are no values left
+/// in the channel.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum RecvTimeoutError {
+ /// A timeout occurred when attempting to receive a message.
+ Timeout,
+ /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
+ Disconnected,
+}
+
+impl fmt::Display for RecvTimeoutError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
+ RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
+ }
+ }
+}
+
+impl std::error::Error for RecvTimeoutError {}
+
+impl From<RecvError> for RecvTimeoutError {
+ fn from(err: RecvError) -> Self {
+ match err {
+ RecvError::Disconnected => Self::Disconnected,
+ }
+ }
+}
+
+enum TryRecvTimeoutError {
+ Empty,
+ Timeout,
+ Disconnected,
+}
+
+// TODO: Investigate some sort of invalidation flag for timeouts
+#[cfg(feature = "spin")]
+struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
+
+#[cfg(not(feature = "spin"))]
+struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
+
+#[cfg(feature = "spin")]
+impl<T, S: ?Sized + Signal> Hook<T, S> {
+ pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
+ where
+ S: Sized,
+ {
+ Arc::new(Self(Some(Spinlock::new(msg)), signal))
+ }
+
+ fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
+ self.0.as_ref().map(|s| s.lock())
+ }
+}
+
+#[cfg(not(feature = "spin"))]
+impl<T, S: ?Sized + Signal> Hook<T, S> {
+ pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
+ where
+ S: Sized,
+ {
+ Arc::new(Self(Some(Mutex::new(msg)), signal))
+ }
+
+ fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
+ self.0.as_ref().map(|s| s.lock().unwrap())
+ }
+}
+
+impl<T, S: ?Sized + Signal> Hook<T, S> {
+ pub fn fire_recv(&self) -> (T, &S) {
+ let msg = self.lock().unwrap().take().unwrap();
+ (msg, self.signal())
+ }
+
+ pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
+ let ret = match self.lock() {
+ Some(mut lock) => {
+ *lock = Some(msg);
+ None
+ }
+ None => Some(msg),
+ };
+ (ret, self.signal())
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.lock().map(|s| s.is_none()).unwrap_or(true)
+ }
+
+ pub fn try_take(&self) -> Option<T> {
+ self.lock().unwrap().take()
+ }
+
+ pub fn trigger(signal: S) -> Arc<Self>
+ where
+ S: Sized,
+ {
+ Arc::new(Self(None, signal))
+ }
+
+ pub fn signal(&self) -> &S {
+ &self.1
+ }
+
+ pub fn fire_nothing(&self) -> bool {
+ self.signal().fire()
+ }
+}
+
+impl<T> Hook<T, SyncSignal> {
+ pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
+ loop {
+ let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
+ let msg = self.lock().unwrap().take();
+ if let Some(msg) = msg {
+ break Some(msg);
+ } else if disconnected {
+ break None;
+ } else {
+ self.signal().wait()
+ }
+ }
+ }
+
+ // Err(true) if timeout
+ pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
+ loop {
+ let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
+ let msg = self.lock().unwrap().take();
+ if let Some(msg) = msg {
+ break Ok(msg);
+ } else if disconnected {
+ break Err(false);
+ } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
+ self.signal().wait_timeout(dur);
+ } else {
+ break Err(true);
+ }
+ }
+ }
+
+ pub fn wait_send(&self, abort: &AtomicBool) {
+ loop {
+ let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
+ if disconnected || self.lock().unwrap().is_none() {
+ break;
+ }
+
+ self.signal().wait();
+ }
+ }
+
+ // Err(true) if timeout
+ pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
+ loop {
+ let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
+ if self.lock().unwrap().is_none() {
+ break Ok(());
+ } else if disconnected {
+ break Err(false);
+ } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
+ self.signal().wait_timeout(dur);
+ } else {
+ break Err(true);
+ }
+ }
+ }
+}
+
+#[cfg(feature = "spin")]
+#[inline]
+fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
+ let mut i = 4;
+ loop {
+ for _ in 0..10 {
+ if let Some(guard) = lock.try_lock() {
+ return guard;
+ }
+ thread::yield_now();
+ }
+ // Sleep for at most ~1 ms
+ thread::sleep(Duration::from_nanos(1 << i.min(20)));
+ i += 1;
+ }
+}
+
+#[cfg(not(feature = "spin"))]
+#[inline]
+fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
+ lock.lock().unwrap()
+}
+
+#[cfg(not(feature = "spin"))]
+use std::sync::{Mutex, MutexGuard};
+
+#[cfg(feature = "spin")]
+type ChanLock<T> = Spinlock<T>;
+#[cfg(not(feature = "spin"))]
+type ChanLock<T> = Mutex<T>;
+
+
+type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
+struct Chan<T> {
+ sending: Option<(usize, SignalVec<T>)>,
+ queue: VecDeque<T>,
+ waiting: SignalVec<T>,
+}
+
+impl<T> Chan<T> {
+ fn pull_pending(&mut self, pull_extra: bool) {
+ if let Some((cap, sending)) = &mut self.sending {
+ let effective_cap = *cap + pull_extra as usize;
+
+ while self.queue.len() < effective_cap {
+ if let Some(s) = sending.pop_front() {
+ let (msg, signal) = s.fire_recv();
+ signal.fire();
+ self.queue.push_back(msg);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ fn try_wake_receiver_if_pending(&mut self) {
+ if !self.queue.is_empty() {
+ while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
+ }
+ }
+}
+
+struct Shared<T> {
+ chan: ChanLock<Chan<T>>,
+ disconnected: AtomicBool,
+ sender_count: AtomicUsize,
+ receiver_count: AtomicUsize,
+}
+
+impl<T> Shared<T> {
+ fn new(cap: Option<usize>) -> Self {
+ Self {
+ chan: ChanLock::new(Chan {
+ sending: cap.map(|cap| (cap, VecDeque::new())),
+ queue: VecDeque::new(),
+ waiting: VecDeque::new(),
+ }),
+ disconnected: AtomicBool::new(false),
+ sender_count: AtomicUsize::new(1),
+ receiver_count: AtomicUsize::new(1),
+ }
+ }
+
+ fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
+ &self,
+ msg: T,
+ should_block: bool,
+ make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
+ do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
+ ) -> R {
+ let mut chan = wait_lock(&self.chan);
+
+ if self.is_disconnected() {
+ Err(TrySendTimeoutError::Disconnected(msg)).into()
+ } else if !chan.waiting.is_empty() {
+ let mut msg = Some(msg);
+
+ loop {
+ let slot = chan.waiting.pop_front();
+ match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
+ // No more waiting receivers and msg in queue, so break out of the loop
+ None if msg.is_none() => break,
+ // No more waiting receivers, so add msg to queue and break out of the loop
+ None => {
+ chan.queue.push_back(msg.unwrap());
+ break;
+ }
+ Some((Some(m), signal)) => {
+ if signal.fire() {
+ // Was async and a stream, so didn't acquire the message. Wake another
+ // receiver, and do not yet push the message.
+ msg.replace(m);
+ continue;
+ } else {
+ // Was async and not a stream, so it did acquire the message. Push the
+ // message to the queue for it to be received.
+ chan.queue.push_back(m);
+ drop(chan);
+ break;
+ }
+ },
+ Some((None, signal)) => {
+ drop(chan);
+ signal.fire();
+ break; // Was sync, so it has acquired the message
+ },
+ }
+ }
+
+ Ok(()).into()
+ } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
+ chan.queue.push_back(msg);
+ Ok(()).into()
+ } else if should_block { // Only bounded from here on
+ let hook = make_signal(msg);
+ chan.sending.as_mut().unwrap().1.push_back(hook.clone());
+ drop(chan);
+
+ do_block(hook)
+ } else {
+ Err(TrySendTimeoutError::Full(msg)).into()
+ }
+ }
+
+ fn send_sync(
+ &self,
+ msg: T,
+ block: Option<Option<Instant>>,
+ ) -> Result<(), TrySendTimeoutError<T>> {
+ self.send(
+ // msg
+ msg,
+ // should_block
+ block.is_some(),
+ // make_signal
+ |msg| Hook::slot(Some(msg), SyncSignal::default()),
+ // do_block
+ |hook| if let Some(deadline) = block.unwrap() {
+ hook.wait_deadline_send(&self.disconnected, deadline)
+ .or_else(|timed_out| {
+ if timed_out { // Remove our signal
+ let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
+ wait_lock(&self.chan).sending
+ .as_mut()
+ .unwrap().1
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ }
+ hook.try_take().map(|msg| if self.is_disconnected() {
+ Err(TrySendTimeoutError::Disconnected(msg))
+ } else {
+ Err(TrySendTimeoutError::Timeout(msg))
+ })
+ .unwrap_or(Ok(()))
+ })
+ } else {
+ hook.wait_send(&self.disconnected);
+
+ match hook.try_take() {
+ Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
+ None => Ok(()),
+ }
+ },
+ )
+ }
+
+ fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
+ &self,
+ should_block: bool,
+ make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
+ do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
+ ) -> R {
+ let mut chan = wait_lock(&self.chan);
+ chan.pull_pending(true);
+
+ if let Some(msg) = chan.queue.pop_front() {
+ drop(chan);
+ Ok(msg).into()
+ } else if self.is_disconnected() {
+ drop(chan);
+ Err(TryRecvTimeoutError::Disconnected).into()
+ } else if should_block {
+ let hook = make_signal();
+ chan.waiting.push_back(hook.clone());
+ drop(chan);
+
+ do_block(hook)
+ } else {
+ drop(chan);
+ Err(TryRecvTimeoutError::Empty).into()
+ }
+ }
+
+ fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
+ self.recv(
+ // should_block
+ block.is_some(),
+ // make_signal
+ || Hook::slot(None, SyncSignal::default()),
+ // do_block
+ |hook| if let Some(deadline) = block.unwrap() {
+ hook.wait_deadline_recv(&self.disconnected, deadline)
+ .or_else(|timed_out| {
+ if timed_out { // Remove our signal
+ let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
+ wait_lock(&self.chan).waiting
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ }
+ match hook.try_take() {
+ Some(msg) => Ok(msg),
+ None => {
+ let disconnected = self.is_disconnected(); // Check disconnect *before* msg
+ if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
+ Ok(msg)
+ } else if disconnected {
+ Err(TryRecvTimeoutError::Disconnected)
+ } else {
+ Err(TryRecvTimeoutError::Timeout)
+ }
+ },
+ }
+ })
+ } else {
+ hook.wait_recv(&self.disconnected)
+ .or_else(|| wait_lock(&self.chan).queue.pop_front())
+ .ok_or(TryRecvTimeoutError::Disconnected)
+ },
+ )
+ }
+
+ /// Disconnect anything listening on this channel (this will not prevent receivers receiving
+ /// msgs that have already been sent)
+ fn disconnect_all(&self) {
+ self.disconnected.store(true, Ordering::Relaxed);
+
+ let mut chan = wait_lock(&self.chan);
+ chan.pull_pending(false);
+ if let Some((_, sending)) = chan.sending.as_ref() {
+ sending.iter().for_each(|hook| {
+ hook.signal().fire();
+ })
+ }
+ chan.waiting.iter().for_each(|hook| {
+ hook.signal().fire();
+ });
+ }
+
+ fn is_disconnected(&self) -> bool {
+ self.disconnected.load(Ordering::SeqCst)
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ fn is_full(&self) -> bool {
+ self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
+ }
+
+ fn len(&self) -> usize {
+ let mut chan = wait_lock(&self.chan);
+ chan.pull_pending(false);
+ chan.queue.len()
+ }
+
+ fn capacity(&self) -> Option<usize> {
+ wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
+ }
+
+ fn sender_count(&self) -> usize {
+ self.sender_count.load(Ordering::Relaxed)
+ }
+
+ fn receiver_count(&self) -> usize {
+ self.receiver_count.load(Ordering::Relaxed)
+ }
+}
+
+/// A transmitting end of a channel.
+pub struct Sender<T> {
+ shared: Arc<Shared<T>>,
+}
+
+impl<T> Sender<T> {
+ /// Attempt to send a value into the channel. If the channel is bounded and full, or all
+ /// receivers have been dropped, an error is returned. If the channel associated with this
+ /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.shared.send_sync(msg, None).map_err(|err| match err {
+ TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
+ TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+ _ => unreachable!(),
+ })
+ }
+
+ /// Send a value into the channel, returning an error if all receivers have been dropped.
+ /// If the channel is bounded and is full, this method will block until space is available
+ /// or all receivers have been dropped. If the channel is unbounded, this method will not
+ /// block.
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
+ TrySendTimeoutError::Disconnected(msg) => SendError(msg),
+ _ => unreachable!(),
+ })
+ }
+
+ /// Send a value into the channel, returning an error if all receivers have been dropped
+ /// or the deadline has passed. If the channel is bounded and is full, this method will
+ /// block until space is available, the deadline is reached, or all receivers have been
+ /// dropped.
+ pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
+ self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
+ TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
+ TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
+ _ => unreachable!(),
+ })
+ }
+
+ /// Send a value into the channel, returning an error if all receivers have been dropped
+ /// or the timeout has expired. If the channel is bounded and is full, this method will
+ /// block until space is available, the timeout has expired, or all receivers have been
+ /// dropped.
+ pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
+ self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
+ }
+
+ /// Returns true if all receivers for this channel have been dropped.
+ pub fn is_disconnected(&self) -> bool {
+ self.shared.is_disconnected()
+ }
+
+ /// Returns true if the channel is empty.
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ self.shared.is_empty()
+ }
+
+ /// Returns true if the channel is full.
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ self.shared.is_full()
+ }
+
+ /// Returns the number of messages in the channel
+ pub fn len(&self) -> usize {
+ self.shared.len()
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ self.shared.capacity()
+ }
+
+ /// Get the number of senders that currently exist, including this one.
+ pub fn sender_count(&self) -> usize {
+ self.shared.sender_count()
+ }
+
+ /// Get the number of receivers that currently exist.
+ ///
+ /// Note that this method makes no guarantees that a subsequent send will succeed; it's
+ /// possible that between `receiver_count()` being called and a `send()`, all open receivers
+ /// could drop.
+ pub fn receiver_count(&self) -> usize {
+ self.shared.receiver_count()
+ }
+
+ /// Creates a [`WeakSender`] that does not keep the channel open.
+ ///
+ /// The channel is closed once all `Sender`s are dropped, even if there
+ /// are still active `WeakSender`s.
+ pub fn downgrade(&self) -> WeakSender<T> {
+ WeakSender {
+ shared: Arc::downgrade(&self.shared),
+ }
+ }
+
+ /// Returns whether the senders are belong to the same channel.
+ pub fn same_channel(&self, other: &Sender<T>) -> bool {
+ Arc::ptr_eq(&self.shared, &other.shared)
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
+ /// contents will only be cleaned up when all senders and the receiver have been dropped.
+ fn clone(&self) -> Self {
+ self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
+ Self { shared: self.shared.clone() }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Sender").finish()
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ // Notify receivers that all senders have been dropped if the number of senders drops to 0.
+ if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
+ self.shared.disconnect_all();
+ }
+ }
+}
+
+/// A sender that does not prevent the channel from being closed.
+///
+/// Weak senders do not count towards the number of active senders on the channel. As soon as
+/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
+/// `WeakSender`.
+///
+/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
+/// method.
+pub struct WeakSender<T> {
+ shared: Weak<Shared<T>>,
+}
+
+impl<T> WeakSender<T> {
+ /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
+ ///
+ /// Returns `None` if the channel was closed already. Note that a `Some` return value
+ /// does not guarantee that the channel is still open.
+ pub fn upgrade(&self) -> Option<Sender<T>> {
+ self.shared
+ .upgrade()
+ // check that there are still live senders
+ .filter(|shared| {
+ shared
+ .sender_count
+ .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
+ if count == 0 {
+ // all senders are closed already -> don't increase the sender count
+ None
+ } else {
+ // there is still at least one active sender
+ Some(count + 1)
+ }
+ })
+ .is_ok()
+ })
+ .map(|shared| Sender { shared })
+ }
+}
+
+/// The receiving end of a channel.
+///
+/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
+/// Each message will only be received by a single receiver. This is useful for
+/// implementing work stealing for concurrent programs.
+pub struct Receiver<T> {
+ shared: Arc<Shared<T>>,
+}
+
+impl<T> Receiver<T> {
+ /// Attempt to fetch an incoming value from the channel associated with this receiver,
+ /// returning an error if the channel is empty or if all senders have been dropped.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ self.shared.recv_sync(None).map_err(|err| match err {
+ TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
+ TryRecvTimeoutError::Empty => TryRecvError::Empty,
+ _ => unreachable!(),
+ })
+ }
+
+ /// Wait for an incoming value from the channel associated with this receiver, returning an
+ /// error if all senders have been dropped.
+ pub fn recv(&self) -> Result<T, RecvError> {
+ self.shared.recv_sync(Some(None)).map_err(|err| match err {
+ TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
+ _ => unreachable!(),
+ })
+ }
+
+ /// Wait for an incoming value from the channel associated with this receiver, returning an
+ /// error if all senders have been dropped or the deadline has passed.
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
+ TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
+ TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
+ _ => unreachable!(),
+ })
+ }
+
+ /// Wait for an incoming value from the channel associated with this receiver, returning an
+ /// error if all senders have been dropped or the timeout has expired.
+ pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
+ self.recv_deadline(Instant::now().checked_add(dur).unwrap())
+ }
+
+ /// Create a blocking iterator over the values received on the channel that finishes iteration
+ /// when all senders have been dropped.
+ ///
+ /// You can also create a self-owned iterator with [`Receiver::into_iter`].
+ pub fn iter(&self) -> Iter<T> {
+ Iter { receiver: &self }
+ }
+
+ /// A non-blocking iterator over the values received on the channel that finishes iteration
+ /// when all senders have been dropped or the channel is empty.
+ pub fn try_iter(&self) -> TryIter<T> {
+ TryIter { receiver: &self }
+ }
+
+ /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
+ /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
+ /// the function has been called.
+ pub fn drain(&self) -> Drain<T> {
+ let mut chan = wait_lock(&self.shared.chan);
+ chan.pull_pending(false);
+ let queue = std::mem::take(&mut chan.queue);
+
+ Drain { queue, _phantom: PhantomData }
+ }
+
+ /// Returns true if all senders for this channel have been dropped.
+ pub fn is_disconnected(&self) -> bool {
+ self.shared.is_disconnected()
+ }
+
+ /// Returns true if the channel is empty.
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ self.shared.is_empty()
+ }
+
+ /// Returns true if the channel is full.
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ self.shared.is_full()
+ }
+
+ /// Returns the number of messages in the channel.
+ pub fn len(&self) -> usize {
+ self.shared.len()
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ self.shared.capacity()
+ }
+
+ /// Get the number of senders that currently exist.
+ pub fn sender_count(&self) -> usize {
+ self.shared.sender_count()
+ }
+
+ /// Get the number of receivers that currently exist, including this one.
+ pub fn receiver_count(&self) -> usize {
+ self.shared.receiver_count()
+ }
+
+ /// Returns whether the receivers are belong to the same channel.
+ pub fn same_channel(&self, other: &Receiver<T>) -> bool {
+ Arc::ptr_eq(&self.shared, &other.shared)
+ }
+}
+
+impl<T> Clone for Receiver<T> {
+ /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
+ /// channel contents will only be cleaned up when all senders and the receiver have been
+ /// dropped.
+ ///
+ /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
+ /// Each message will only be received by a single receiver. This is useful for
+ /// implementing work stealing for concurrent programs.
+ fn clone(&self) -> Self {
+ self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
+ Self { shared: self.shared.clone() }
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Receiver").finish()
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // Notify senders that all receivers have been dropped if the number of receivers drops
+ // to 0.
+ if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
+ self.shared.disconnect_all();
+ }
+ }
+}
+
+/// This exists as a shorthand for [`Receiver::iter`].
+impl<'a, T> IntoIterator for &'a Receiver<T> {
+ type Item = T;
+ type IntoIter = Iter<'a, T>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ Iter { receiver: self }
+ }
+}
+
+impl<T> IntoIterator for Receiver<T> {
+ type Item = T;
+ type IntoIter = IntoIter<T>;
+
+ /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter { receiver: self }
+ }
+}
+
+/// An iterator over the msgs received from a channel.
+pub struct Iter<'a, T> {
+ receiver: &'a Receiver<T>,
+}
+
+impl<'a, T> Iterator for Iter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.recv().ok()
+ }
+}
+
+/// An non-blocking iterator over the msgs received from a channel.
+pub struct TryIter<'a, T> {
+ receiver: &'a Receiver<T>,
+}
+
+impl<'a, T> Iterator for TryIter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.try_recv().ok()
+ }
+}
+
+/// An fixed-sized iterator over the msgs drained from a channel.
+#[derive(Debug)]
+pub struct Drain<'a, T> {
+ queue: VecDeque<T>,
+ /// A phantom field used to constrain the lifetime of this iterator. We do this because the
+ /// implementation may change and we don't want to unintentionally constrain it. Removing this
+ /// lifetime later is a possibility.
+ _phantom: PhantomData<&'a ()>,
+}
+
+impl<'a, T> Iterator for Drain<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.queue.pop_front()
+ }
+}
+
+impl<'a, T> ExactSizeIterator for Drain<'a, T> {
+ fn len(&self) -> usize {
+ self.queue.len()
+ }
+}
+
+/// An owned iterator over the msgs received from a channel.
+pub struct IntoIter<T> {
+ receiver: Receiver<T>,
+}
+
+impl<T> Iterator for IntoIter<T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.receiver.recv().ok()
+ }
+}
+
+/// Create a channel with no maximum capacity.
+///
+/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
+/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
+/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
+/// may be cloned.
+///
+/// # Examples
+/// ```
+/// let (tx, rx) = flume::unbounded();
+///
+/// tx.send(42).unwrap();
+/// assert_eq!(rx.recv().unwrap(), 42);
+/// ```
+pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
+ let shared = Arc::new(Shared::new(None));
+ (
+ Sender { shared: shared.clone() },
+ Receiver { shared },
+ )
+}
+
+/// Create a channel with a maximum capacity.
+///
+/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
+/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
+/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
+/// may be cloned.
+///
+/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
+/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
+/// is not desired, [`Sender::try_send`] may be used.
+///
+/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
+/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
+/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
+/// perform a handshake and transfer ownership of a value.
+///
+/// # Examples
+/// ```
+/// let (tx, rx) = flume::bounded(32);
+///
+/// for i in 1..33 {
+/// tx.send(i).unwrap();
+/// }
+/// assert!(tx.try_send(33).is_err());
+///
+/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
+/// ```
+pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
+ let shared = Arc::new(Shared::new(Some(cap)));
+ (
+ Sender { shared: shared.clone() },
+ Receiver { shared },
+ )
+}
diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs
new file mode 100644
index 0000000..cef15aa
--- /dev/null
+++ b/vendor/flume/src/select.rs
@@ -0,0 +1,405 @@
+//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.
+
+use crate::*;
+use spin1::Mutex as Spinlock;
+use std::{any::Any, marker::PhantomData};
+
+#[cfg(feature = "eventual-fairness")]
+use nanorand::Rng;
+
+// A unique token corresponding to an event in a selector
+type Token = usize;
+
+struct SelectSignal(
+ thread::Thread,
+ Token,
+ AtomicBool,
+ Arc<Spinlock<VecDeque<Token>>>,
+);
+
+impl Signal for SelectSignal {
+ fn fire(&self) -> bool {
+ self.2.store(true, Ordering::SeqCst);
+ self.3.lock().push_back(self.1);
+ self.0.unpark();
+ false
+ }
+
+ fn as_any(&self) -> &(dyn Any + 'static) {
+ self
+ }
+ fn as_ptr(&self) -> *const () {
+ self as *const _ as *const ()
+ }
+}
+
+trait Selection<'a, T> {
+ fn init(&mut self) -> Option<T>;
+ fn poll(&mut self) -> Option<T>;
+ fn deinit(&mut self);
+}
+
+/// An error that may be emitted when attempting to wait for a value on a receiver.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum SelectError {
+ /// A timeout occurred when waiting on a `Selector`.
+ Timeout,
+}
+
+impl fmt::Display for SelectError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SelectError::Timeout => "timeout occurred".fmt(f),
+ }
+ }
+}
+
+impl std::error::Error for SelectError {}
+
+/// A type used to wait upon multiple blocking operations at once.
+///
+/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour,
+/// allowing a thread to wait upon the result of more than one operation at once.
+///
+/// # Examples
+/// ```
+/// let (tx0, rx0) = flume::unbounded();
+/// let (tx1, rx1) = flume::unbounded();
+///
+/// std::thread::spawn(move || {
+/// tx0.send(true).unwrap();
+/// tx1.send(42).unwrap();
+/// });
+///
+/// flume::Selector::new()
+/// .recv(&rx0, |b| println!("Received {:?}", b))
+/// .recv(&rx1, |n| println!("Received {:?}", n))
+/// .wait();
+/// ```
+pub struct Selector<'a, T: 'a> {
+ selections: Vec<Box<dyn Selection<'a, T> + 'a>>,
+ next_poll: usize,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ #[cfg(feature = "eventual-fairness")]
+ rng: nanorand::WyRand,
+ phantom: PhantomData<*const ()>,
+}
+
+impl<'a, T: 'a> Default for Selector<'a, T> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<'a, T: 'a> fmt::Debug for Selector<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Selector").finish()
+ }
+}
+
+impl<'a, T> Selector<'a, T> {
+ /// Create a new selector.
+ pub fn new() -> Self {
+ Self {
+ selections: Vec::new(),
+ next_poll: 0,
+ signalled: Arc::default(),
+ phantom: PhantomData::default(),
+ #[cfg(feature = "eventual-fairness")]
+ rng: nanorand::WyRand::new(),
+ }
+ }
+
+ /// Add a send operation to the selector that sends the provided value.
+ ///
+ /// Once added, the selector can be used to run the provided handler function on completion of this operation.
+ pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>(
+ mut self,
+ sender: &'a Sender<U>,
+ msg: U,
+ mapper: F,
+ ) -> Self {
+ struct SendSelection<'a, T, F, U> {
+ sender: &'a Sender<U>,
+ msg: Option<U>,
+ token: Token,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ hook: Option<Arc<Hook<U, SelectSignal>>>,
+ mapper: F,
+ phantom: PhantomData<T>,
+ }
+
+ impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U>
+ where
+ F: FnMut(Result<(), SendError<U>>) -> T,
+ {
+ fn init(&mut self) -> Option<T> {
+ let token = self.token;
+ let signalled = self.signalled.clone();
+ let r = self.sender.shared.send(
+ self.msg.take().unwrap(),
+ true,
+ |msg| {
+ Hook::slot(
+ Some(msg),
+ SelectSignal(
+ thread::current(),
+ token,
+ AtomicBool::new(false),
+ signalled,
+ ),
+ )
+ },
+ // Always runs
+ |h| {
+ self.hook = Some(h);
+ Ok(())
+ },
+ );
+
+ if self.hook.is_none() {
+ Some((self.mapper)(match r {
+ Ok(()) => Ok(()),
+ Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)),
+ _ => unreachable!(),
+ }))
+ } else {
+ None
+ }
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ let res = if self.sender.shared.is_disconnected() {
+ // Check the hook one last time
+ if let Some(msg) = self.hook.as_ref()?.try_take() {
+ Err(SendError(msg))
+ } else {
+ Ok(())
+ }
+ } else if self.hook.as_ref().unwrap().is_empty() {
+ // The message was sent
+ Ok(())
+ } else {
+ return None;
+ };
+
+ Some((&mut self.mapper)(res))
+ }
+
+ fn deinit(&mut self) {
+ if let Some(hook) = self.hook.take() {
+ // Remove hook
+ let hook: Arc<Hook<U, dyn Signal>> = hook;
+ wait_lock(&self.sender.shared.chan)
+ .sending
+ .as_mut()
+ .unwrap()
+ .1
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ }
+ }
+ }
+
+ let token = self.selections.len();
+ self.selections.push(Box::new(SendSelection {
+ sender,
+ msg: Some(msg),
+ token,
+ signalled: self.signalled.clone(),
+ hook: None,
+ mapper,
+ phantom: Default::default(),
+ }));
+
+ self
+ }
+
+ /// Add a receive operation to the selector.
+ ///
+ /// Once added, the selector can be used to run the provided handler function on completion of this operation.
+ pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>(
+ mut self,
+ receiver: &'a Receiver<U>,
+ mapper: F,
+ ) -> Self {
+ struct RecvSelection<'a, T, F, U> {
+ receiver: &'a Receiver<U>,
+ token: Token,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ hook: Option<Arc<Hook<U, SelectSignal>>>,
+ mapper: F,
+ received: bool,
+ phantom: PhantomData<T>,
+ }
+
+ impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U>
+ where
+ F: FnMut(Result<U, RecvError>) -> T,
+ {
+ fn init(&mut self) -> Option<T> {
+ let token = self.token;
+ let signalled = self.signalled.clone();
+ let r = self.receiver.shared.recv(
+ true,
+ || {
+ Hook::trigger(SelectSignal(
+ thread::current(),
+ token,
+ AtomicBool::new(false),
+ signalled,
+ ))
+ },
+ // Always runs
+ |h| {
+ self.hook = Some(h);
+ Err(TryRecvTimeoutError::Timeout)
+ },
+ );
+
+ if self.hook.is_none() {
+ Some((self.mapper)(match r {
+ Ok(msg) => Ok(msg),
+ Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected),
+ _ => unreachable!(),
+ }))
+ } else {
+ None
+ }
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ let res = if let Ok(msg) = self.receiver.try_recv() {
+ self.received = true;
+ Ok(msg)
+ } else if self.receiver.shared.is_disconnected() {
+ Err(RecvError::Disconnected)
+ } else {
+ return None;
+ };
+
+ Some((&mut self.mapper)(res))
+ }
+
+ fn deinit(&mut self) {
+ if let Some(hook) = self.hook.take() {
+ // Remove hook
+ let hook: Arc<Hook<U, dyn Signal>> = hook;
+ wait_lock(&self.receiver.shared.chan)
+ .waiting
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ // If we were woken, but never polled, wake up another
+ if !self.received
+ && hook
+ .signal()
+ .as_any()
+ .downcast_ref::<SelectSignal>()
+ .unwrap()
+ .2
+ .load(Ordering::SeqCst)
+ {
+ wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending();
+ }
+ }
+ }
+ }
+
+ let token = self.selections.len();
+ self.selections.push(Box::new(RecvSelection {
+ receiver,
+ token,
+ signalled: self.signalled.clone(),
+ hook: None,
+ mapper,
+ received: false,
+ phantom: Default::default(),
+ }));
+
+ self
+ }
+
+ fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> {
+ #[cfg(feature = "eventual-fairness")]
+ {
+ self.next_poll = self.rng.generate_range(0..self.selections.len());
+ }
+
+ let res = 'outer: loop {
+ // Init signals
+ for _ in 0..self.selections.len() {
+ if let Some(val) = self.selections[self.next_poll].init() {
+ break 'outer Some(val);
+ }
+ self.next_poll = (self.next_poll + 1) % self.selections.len();
+ }
+
+ // Speculatively poll
+ if let Some(msg) = self.poll() {
+ break 'outer Some(msg);
+ }
+
+ loop {
+ if let Some(deadline) = deadline {
+ if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
+ thread::park_timeout(dur);
+ }
+ } else {
+ thread::park();
+ }
+
+ if deadline.map(|d| Instant::now() >= d).unwrap_or(false) {
+ break 'outer self.poll();
+ }
+
+ let token = if let Some(token) = self.signalled.lock().pop_front() {
+ token
+ } else {
+ // Spurious wakeup, park again
+ continue;
+ };
+
+ // Attempt to receive a message
+ if let Some(msg) = self.selections[token].poll() {
+ break 'outer Some(msg);
+ }
+ }
+ };
+
+ // Deinit signals
+ for s in &mut self.selections {
+ s.deinit();
+ }
+
+ res
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ for _ in 0..self.selections.len() {
+ if let Some(val) = self.selections[self.next_poll].poll() {
+ return Some(val);
+ }
+ self.next_poll = (self.next_poll + 1) % self.selections.len();
+ }
+ None
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness`
+ /// feature flag is enabled, this method is fair and will handle a random event of those that are ready.
+ pub fn wait(self) -> T {
+ self.wait_inner(None).unwrap()
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the
+ /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that
+ /// are ready.
+ pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> {
+ self.wait_inner(Some(Instant::now() + dur))
+ .ok_or(SelectError::Timeout)
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached.
+ /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those
+ /// that are ready.
+ pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> {
+ self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout)
+ }
+}
diff --git a/vendor/flume/src/signal.rs b/vendor/flume/src/signal.rs
new file mode 100644
index 0000000..89395a3
--- /dev/null
+++ b/vendor/flume/src/signal.rs
@@ -0,0 +1,33 @@
+use std::{thread::{self, Thread}, time::Duration, any::Any};
+
+pub trait Signal: Send + Sync + 'static {
+ /// Fire the signal, returning whether it is a stream signal. This is because streams do not
+ /// acquire a message when woken, so signals must be fired until one that does acquire a message
+ /// is fired, otherwise a wakeup could be missed, leading to a lost message until one is eagerly
+ /// grabbed by a receiver.
+ fn fire(&self) -> bool;
+ fn as_any(&self) -> &(dyn Any + 'static);
+ fn as_ptr(&self) -> *const ();
+}
+
+pub struct SyncSignal(Thread);
+
+impl Default for SyncSignal {
+ fn default() -> Self {
+ Self(thread::current())
+ }
+}
+
+impl Signal for SyncSignal {
+ fn fire(&self) -> bool {
+ self.0.unpark();
+ false
+ }
+ fn as_any(&self) -> &(dyn Any + 'static) { self }
+ fn as_ptr(&self) -> *const () { self as *const _ as *const () }
+}
+
+impl SyncSignal {
+ pub fn wait(&self) { thread::park(); }
+ pub fn wait_timeout(&self, dur: Duration) { thread::park_timeout(dur); }
+}