aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/src
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
committerValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
commita990de90fe41456a23e58bd087d2f107d321f3a1 (patch)
tree15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/flume/src
parent3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff)
downloadfparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz
fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip
Deleted vendor folder
Diffstat (limited to 'vendor/flume/src')
-rw-r--r--vendor/flume/src/async.rs543
-rw-r--r--vendor/flume/src/lib.rs1142
-rw-r--r--vendor/flume/src/select.rs405
-rw-r--r--vendor/flume/src/signal.rs33
4 files changed, 0 insertions, 2123 deletions
diff --git a/vendor/flume/src/async.rs b/vendor/flume/src/async.rs
deleted file mode 100644
index fae44d4..0000000
--- a/vendor/flume/src/async.rs
+++ /dev/null
@@ -1,543 +0,0 @@
-//! Futures and other types that allow asynchronous interaction with channels.
-
-use std::{
- future::Future,
- pin::Pin,
- task::{Context, Poll, Waker},
- any::Any,
- ops::Deref,
-};
-use crate::*;
-use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
-use futures_sink::Sink;
-use spin1::Mutex as Spinlock;
-
-struct AsyncSignal {
- waker: Spinlock<Waker>,
- woken: AtomicBool,
- stream: bool,
-}
-
-impl AsyncSignal {
- fn new(cx: &Context, stream: bool) -> Self {
- AsyncSignal {
- waker: Spinlock::new(cx.waker().clone()),
- woken: AtomicBool::new(false),
- stream,
- }
- }
-}
-
-impl Signal for AsyncSignal {
- fn fire(&self) -> bool {
- self.woken.store(true, Ordering::SeqCst);
- self.waker.lock().wake_by_ref();
- self.stream
- }
-
- fn as_any(&self) -> &(dyn Any + 'static) { self }
- fn as_ptr(&self) -> *const () { self as *const _ as *const () }
-}
-
-impl<T> Hook<T, AsyncSignal> {
- // Update the hook to point to the given Waker.
- // Returns whether the hook has been previously awakened
- fn update_waker(&self, cx_waker: &Waker) -> bool {
- let mut waker = self.1.waker.lock();
- let woken = self.1.woken.load(Ordering::SeqCst);
- if !waker.will_wake(cx_waker) {
- *waker = cx_waker.clone();
-
- // Avoid the edge case where the waker was woken just before the wakers were
- // swapped.
- if woken {
- cx_waker.wake_by_ref();
- }
- }
- woken
- }
-}
-
-#[derive(Clone)]
-enum OwnedOrRef<'a, T> {
- Owned(T),
- Ref(&'a T),
-}
-
-impl<'a, T> Deref for OwnedOrRef<'a, T> {
- type Target = T;
-
- fn deref(&self) -> &T {
- match self {
- OwnedOrRef::Owned(arc) => &arc,
- OwnedOrRef::Ref(r) => r,
- }
- }
-}
-
-impl<T> Sender<T> {
- /// Asynchronously send a value into the channel, returning an error if all receivers have been
- /// dropped. If the channel is bounded and is full, the returned future will yield to the async
- /// runtime.
- ///
- /// In the current implementation, the returned future will not yield to the async runtime if the
- /// channel is unbounded. This may change in later versions.
- pub fn send_async(&self, item: T) -> SendFut<T> {
- SendFut {
- sender: OwnedOrRef::Ref(&self),
- hook: Some(SendState::NotYetSent(item)),
- }
- }
-
- /// Convert this sender into a future that asynchronously sends a single message into the channel,
- /// returning an error if all receivers have been dropped. If the channel is bounded and is full,
- /// this future will yield to the async runtime.
- ///
- /// In the current implementation, the returned future will not yield to the async runtime if the
- /// channel is unbounded. This may change in later versions.
- pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
- SendFut {
- sender: OwnedOrRef::Owned(self),
- hook: Some(SendState::NotYetSent(item)),
- }
- }
-
- /// Create an asynchronous sink that uses this sender to asynchronously send messages into the
- /// channel. The sender will continue to be usable after the sink has been dropped.
- ///
- /// In the current implementation, the returned sink will not yield to the async runtime if the
- /// channel is unbounded. This may change in later versions.
- pub fn sink(&self) -> SendSink<'_, T> {
- SendSink(SendFut {
- sender: OwnedOrRef::Ref(&self),
- hook: None,
- })
- }
-
- /// Convert this sender into a sink that allows asynchronously sending messages into the channel.
- ///
- /// In the current implementation, the returned sink will not yield to the async runtime if the
- /// channel is unbounded. This may change in later versions.
- pub fn into_sink<'a>(self) -> SendSink<'a, T> {
- SendSink(SendFut {
- sender: OwnedOrRef::Owned(self),
- hook: None,
- })
- }
-}
-
-enum SendState<T> {
- NotYetSent(T),
- QueuedItem(Arc<Hook<T, AsyncSignal>>),
-}
-
-/// A future that sends a value into a channel.
-///
-/// Can be created via [`Sender::send_async`] or [`Sender::into_send_async`].
-#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
-pub struct SendFut<'a, T> {
- sender: OwnedOrRef<'a, Sender<T>>,
- // Only none after dropping
- hook: Option<SendState<T>>,
-}
-
-impl<T> std::marker::Unpin for SendFut<'_, T> {}
-
-impl<'a, T> SendFut<'a, T> {
- /// Reset the hook, clearing it and removing it from the waiting sender's queue. This is called
- /// on drop and just before `start_send` in the `Sink` implementation.
- fn reset_hook(&mut self) {
- if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
- let hook: Arc<Hook<T, dyn Signal>> = hook;
- wait_lock(&self.sender.shared.chan).sending
- .as_mut()
- .unwrap().1
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- }
-
- /// See [`Sender::is_disconnected`].
- pub fn is_disconnected(&self) -> bool {
- self.sender.is_disconnected()
- }
-
- /// See [`Sender::is_empty`].
- pub fn is_empty(&self) -> bool {
- self.sender.is_empty()
- }
-
- /// See [`Sender::is_full`].
- pub fn is_full(&self) -> bool {
- self.sender.is_full()
- }
-
- /// See [`Sender::len`].
- pub fn len(&self) -> usize {
- self.sender.len()
- }
-
- /// See [`Sender::capacity`].
- pub fn capacity(&self) -> Option<usize> {
- self.sender.capacity()
- }
-}
-
-impl<'a, T> Drop for SendFut<'a, T> {
- fn drop(&mut self) {
- self.reset_hook()
- }
-}
-
-
-impl<'a, T> Future for SendFut<'a, T> {
- type Output = Result<(), SendError<T>>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
- if hook.is_empty() {
- Poll::Ready(Ok(()))
- } else if self.sender.shared.is_disconnected() {
- let item = hook.try_take();
- self.hook = None;
- match item {
- Some(item) => Poll::Ready(Err(SendError(item))),
- None => Poll::Ready(Ok(())),
- }
- } else {
- hook.update_waker(cx.waker());
- Poll::Pending
- }
- } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
- let this = self.get_mut();
- let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
-
- shared.send(
- // item
- item,
- // should_block
- true,
- // make_signal
- |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
- // do_block
- |hook| {
- *this_hook = Some(SendState::QueuedItem(hook));
- Poll::Pending
- },
- )
- .map(|r| r.map_err(|err| match err {
- TrySendTimeoutError::Disconnected(msg) => SendError(msg),
- _ => unreachable!(),
- }))
- } else { // Nothing to do
- Poll::Ready(Ok(()))
- }
- }
-}
-
-impl<'a, T> FusedFuture for SendFut<'a, T> {
- fn is_terminated(&self) -> bool {
- self.sender.shared.is_disconnected()
- }
-}
-
-/// A sink that allows sending values into a channel.
-///
-/// Can be created via [`Sender::sink`] or [`Sender::into_sink`].
-pub struct SendSink<'a, T>(SendFut<'a, T>);
-
-impl<'a, T> SendSink<'a, T> {
- /// Returns a clone of a sending half of the channel of this sink.
- pub fn sender(&self) -> &Sender<T> {
- &self.0.sender
- }
-
- /// See [`Sender::is_disconnected`].
- pub fn is_disconnected(&self) -> bool {
- self.0.is_disconnected()
- }
-
- /// See [`Sender::is_empty`].
- pub fn is_empty(&self) -> bool {
- self.0.is_empty()
- }
-
- /// See [`Sender::is_full`].
- pub fn is_full(&self) -> bool {
- self.0.is_full()
- }
-
- /// See [`Sender::len`].
- pub fn len(&self) -> usize {
- self.0.len()
- }
-
- /// See [`Sender::capacity`].
- pub fn capacity(&self) -> Option<usize> {
- self.0.capacity()
- }
-
- /// Returns whether the SendSinks are belong to the same channel.
- pub fn same_channel(&self, other: &Self) -> bool {
- self.sender().same_channel(other.sender())
- }
-}
-
-impl<'a, T> Sink<T> for SendSink<'a, T> {
- type Error = SendError<T>;
-
- fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- Pin::new(&mut self.0).poll(cx)
- }
-
- fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
- self.0.reset_hook();
- self.0.hook = Some(SendState::NotYetSent(item));
-
- Ok(())
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
- }
-
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- Pin::new(&mut self.0).poll(cx) // TODO: A different strategy here?
- }
-}
-
-impl<'a, T> Clone for SendSink<'a, T> {
- fn clone(&self) -> SendSink<'a, T> {
- SendSink(SendFut {
- sender: self.0.sender.clone(),
- hook: None,
- })
- }
-}
-
-impl<T> Receiver<T> {
- /// Asynchronously receive a value from the channel, returning an error if all senders have been
- /// dropped. If the channel is empty, the returned future will yield to the async runtime.
- pub fn recv_async(&self) -> RecvFut<'_, T> {
- RecvFut::new(OwnedOrRef::Ref(self))
- }
-
- /// Convert this receiver into a future that asynchronously receives a single message from the
- /// channel, returning an error if all senders have been dropped. If the channel is empty, this
- /// future will yield to the async runtime.
- pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
- RecvFut::new(OwnedOrRef::Owned(self))
- }
-
- /// Create an asynchronous stream that uses this receiver to asynchronously receive messages
- /// from the channel. The receiver will continue to be usable after the stream has been dropped.
- pub fn stream(&self) -> RecvStream<'_, T> {
- RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
- }
-
- /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel.
- pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
- RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
- }
-}
-
-/// A future which allows asynchronously receiving a message.
-///
-/// Can be created via [`Receiver::recv_async`] or [`Receiver::into_recv_async`].
-#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
-pub struct RecvFut<'a, T> {
- receiver: OwnedOrRef<'a, Receiver<T>>,
- hook: Option<Arc<Hook<T, AsyncSignal>>>,
-}
-
-impl<'a, T> RecvFut<'a, T> {
- fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
- Self {
- receiver,
- hook: None,
- }
- }
-
- /// Reset the hook, clearing it and removing it from the waiting receivers queue and waking
- /// another receiver if this receiver has been woken, so as not to cause any missed wakeups.
- /// This is called on drop and after a new item is received in `Stream::poll_next`.
- fn reset_hook(&mut self) {
- if let Some(hook) = self.hook.take() {
- let hook: Arc<Hook<T, dyn Signal>> = hook;
- let mut chan = wait_lock(&self.receiver.shared.chan);
- // We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers?
- chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) {
- // If this signal has been fired, but we're being dropped (and so not listening to it),
- // pass the signal on to another receiver
- chan.try_wake_receiver_if_pending();
- }
- }
- }
-
- fn poll_inner(
- self: Pin<&mut Self>,
- cx: &mut Context,
- stream: bool,
- ) -> Poll<Result<T, RecvError>> {
- if self.hook.is_some() {
- match self.receiver.shared.recv_sync(None) {
- Ok(msg) => return Poll::Ready(Ok(msg)),
- Err(TryRecvTimeoutError::Disconnected) => {
- return Poll::Ready(Err(RecvError::Disconnected))
- }
- _ => (),
- }
-
- let hook = self.hook.as_ref().map(Arc::clone).unwrap();
- if hook.update_waker(cx.waker()) {
- // If the previous hook was awakened, we need to insert it back to the
- // queue, otherwise, it remains valid.
- wait_lock(&self.receiver.shared.chan)
- .waiting
- .push_back(hook);
- }
- // To avoid a missed wakeup, re-check disconnect status here because the channel might have
- // gotten shut down before we had a chance to push our hook
- if self.receiver.shared.is_disconnected() {
- // And now, to avoid a race condition between the first recv attempt and the disconnect check we
- // just performed, attempt to recv again just in case we missed something.
- Poll::Ready(
- self.receiver
- .shared
- .recv_sync(None)
- .map(Ok)
- .unwrap_or(Err(RecvError::Disconnected)),
- )
- } else {
- Poll::Pending
- }
- } else {
- let mut_self = self.get_mut();
- let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
-
- shared.recv(
- // should_block
- true,
- // make_signal
- || Hook::trigger(AsyncSignal::new(cx, stream)),
- // do_block
- |hook| {
- *this_hook = Some(hook);
- Poll::Pending
- },
- )
- .map(|r| r.map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
- _ => unreachable!(),
- }))
- }
- }
-
- /// See [`Receiver::is_disconnected`].
- pub fn is_disconnected(&self) -> bool {
- self.receiver.is_disconnected()
- }
-
- /// See [`Receiver::is_empty`].
- pub fn is_empty(&self) -> bool {
- self.receiver.is_empty()
- }
-
- /// See [`Receiver::is_full`].
- pub fn is_full(&self) -> bool {
- self.receiver.is_full()
- }
-
- /// See [`Receiver::len`].
- pub fn len(&self) -> usize {
- self.receiver.len()
- }
-
- /// See [`Receiver::capacity`].
- pub fn capacity(&self) -> Option<usize> {
- self.receiver.capacity()
- }
-}
-
-impl<'a, T> Drop for RecvFut<'a, T> {
- fn drop(&mut self) {
- self.reset_hook();
- }
-}
-
-impl<'a, T> Future for RecvFut<'a, T> {
- type Output = Result<T, RecvError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.poll_inner(cx, false) // stream = false
- }
-}
-
-impl<'a, T> FusedFuture for RecvFut<'a, T> {
- fn is_terminated(&self) -> bool {
- self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
- }
-}
-
-/// A stream which allows asynchronously receiving messages.
-///
-/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`].
-pub struct RecvStream<'a, T>(RecvFut<'a, T>);
-
-impl<'a, T> RecvStream<'a, T> {
- /// See [`Receiver::is_disconnected`].
- pub fn is_disconnected(&self) -> bool {
- self.0.is_disconnected()
- }
-
- /// See [`Receiver::is_empty`].
- pub fn is_empty(&self) -> bool {
- self.0.is_empty()
- }
-
- /// See [`Receiver::is_full`].
- pub fn is_full(&self) -> bool {
- self.0.is_full()
- }
-
- /// See [`Receiver::len`].
- pub fn len(&self) -> usize {
- self.0.len()
- }
-
- /// See [`Receiver::capacity`].
- pub fn capacity(&self) -> Option<usize> {
- self.0.capacity()
- }
-
- /// Returns whether the SendSinks are belong to the same channel.
- pub fn same_channel(&self, other: &Self) -> bool {
- self.0.receiver.same_channel(&*other.0.receiver)
- }
-}
-
-impl<'a, T> Clone for RecvStream<'a, T> {
- fn clone(&self) -> RecvStream<'a, T> {
- RecvStream(RecvFut::new(self.0.receiver.clone()))
- }
-}
-
-impl<'a, T> Stream for RecvStream<'a, T> {
- type Item = T;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true
- Poll::Pending => Poll::Pending,
- Poll::Ready(item) => {
- self.0.reset_hook();
- Poll::Ready(item.ok())
- }
- }
- }
-}
-
-impl<'a, T> FusedStream for RecvStream<'a, T> {
- fn is_terminated(&self) -> bool {
- self.0.is_terminated()
- }
-}
diff --git a/vendor/flume/src/lib.rs b/vendor/flume/src/lib.rs
deleted file mode 100644
index c9bb3ee..0000000
--- a/vendor/flume/src/lib.rs
+++ /dev/null
@@ -1,1142 +0,0 @@
-//! # Flume
-//!
-//! A blazingly fast multi-producer, multi-consumer channel.
-//!
-//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
-//!
-//! ## Why Flume?
-//!
-//! - **Featureful**: Unbounded, bounded and rendezvous queues
-//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
-//! - **Safe**: No `unsafe` code anywhere in the codebase!
-//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
-//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
-//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
-//! - **Simple**: Few dependencies, minimal codebase, fast to compile
-//! - **Asynchronous**: `async` support, including mix 'n match with sync code
-//! - **Ergonomic**: Powerful `select`-like interface
-//!
-//! ## Example
-//!
-//! ```
-//! let (tx, rx) = flume::unbounded();
-//!
-//! tx.send(42).unwrap();
-//! assert_eq!(rx.recv().unwrap(), 42);
-//! ```
-
-#![deny(missing_docs)]
-
-#[cfg(feature = "select")]
-pub mod select;
-#[cfg(feature = "async")]
-pub mod r#async;
-
-mod signal;
-
-// Reexports
-#[cfg(feature = "select")]
-pub use select::Selector;
-
-use std::{
- collections::VecDeque,
- sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
- time::{Duration, Instant},
- marker::PhantomData,
- thread,
- fmt,
-};
-
-#[cfg(feature = "spin")]
-use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
-use crate::signal::{Signal, SyncSignal};
-
-/// An error that may be emitted when attempting to send a value into a channel on a sender when
-/// all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub struct SendError<T>(pub T);
-
-impl<T> SendError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T { self.0 }
-}
-
-impl<T> fmt::Debug for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "SendError(..)".fmt(f)
- }
-}
-
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "sending on a closed channel".fmt(f)
- }
-}
-
-impl<T> std::error::Error for SendError<T> {}
-
-/// An error that may be emitted when attempting to send a value into a channel on a sender when
-/// the channel is full or all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub enum TrySendError<T> {
- /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
- Full(T),
- /// All channel receivers were dropped and so the message has nobody to receive it.
- Disconnected(T),
-}
-
-impl<T> TrySendError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T {
- match self {
- Self::Full(msg) | Self::Disconnected(msg) => msg,
- }
- }
-}
-
-impl<T> fmt::Debug for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- TrySendError::Full(..) => "Full(..)".fmt(f),
- TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
- }
- }
-}
-
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- TrySendError::Full(..) => "sending on a full channel".fmt(f),
- TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
- }
- }
-}
-
-impl<T> std::error::Error for TrySendError<T> {}
-
-impl<T> From<SendError<T>> for TrySendError<T> {
- fn from(err: SendError<T>) -> Self {
- match err {
- SendError(item) => Self::Disconnected(item),
- }
- }
-}
-
-/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
-/// the send operation times out or all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub enum SendTimeoutError<T> {
- /// A timeout occurred when attempting to send the message.
- Timeout(T),
- /// All channel receivers were dropped and so the message has nobody to receive it.
- Disconnected(T),
-}
-
-impl<T> SendTimeoutError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T {
- match self {
- Self::Timeout(msg) | Self::Disconnected(msg) => msg,
- }
- }
-}
-
-impl<T> fmt::Debug for SendTimeoutError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "SendTimeoutError(..)".fmt(f)
- }
-}
-
-impl<T> fmt::Display for SendTimeoutError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
- SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
- }
- }
-}
-
-impl<T> std::error::Error for SendTimeoutError<T> {}
-
-impl<T> From<SendError<T>> for SendTimeoutError<T> {
- fn from(err: SendError<T>) -> Self {
- match err {
- SendError(item) => Self::Disconnected(item),
- }
- }
-}
-
-enum TrySendTimeoutError<T> {
- Full(T),
- Disconnected(T),
- Timeout(T),
-}
-
-/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
-/// are dropped and there are no more messages in the channel.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum RecvError {
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for RecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- RecvError::Disconnected => "receiving on a closed channel".fmt(f),
- }
- }
-}
-
-impl std::error::Error for RecvError {}
-
-/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
-/// messages in the channel. If there are no messages in the channel and all senders are dropped,
-/// then `TryRecvError::Disconnected` will be returned.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum TryRecvError {
- /// The channel was empty when the receive was attempted.
- Empty,
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for TryRecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- TryRecvError::Empty => "receiving on an empty channel".fmt(f),
- TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
- }
- }
-}
-
-impl std::error::Error for TryRecvError {}
-
-impl From<RecvError> for TryRecvError {
- fn from(err: RecvError) -> Self {
- match err {
- RecvError::Disconnected => Self::Disconnected,
- }
- }
-}
-
-/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
-/// when the receive operation times out or all senders are dropped and there are no values left
-/// in the channel.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum RecvTimeoutError {
- /// A timeout occurred when attempting to receive a message.
- Timeout,
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for RecvTimeoutError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
- RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
- }
- }
-}
-
-impl std::error::Error for RecvTimeoutError {}
-
-impl From<RecvError> for RecvTimeoutError {
- fn from(err: RecvError) -> Self {
- match err {
- RecvError::Disconnected => Self::Disconnected,
- }
- }
-}
-
-enum TryRecvTimeoutError {
- Empty,
- Timeout,
- Disconnected,
-}
-
-// TODO: Investigate some sort of invalidation flag for timeouts
-#[cfg(feature = "spin")]
-struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
-
-#[cfg(not(feature = "spin"))]
-struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
-
-#[cfg(feature = "spin")]
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(Some(Spinlock::new(msg)), signal))
- }
-
- fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
- self.0.as_ref().map(|s| s.lock())
- }
-}
-
-#[cfg(not(feature = "spin"))]
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(Some(Mutex::new(msg)), signal))
- }
-
- fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
- self.0.as_ref().map(|s| s.lock().unwrap())
- }
-}
-
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn fire_recv(&self) -> (T, &S) {
- let msg = self.lock().unwrap().take().unwrap();
- (msg, self.signal())
- }
-
- pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
- let ret = match self.lock() {
- Some(mut lock) => {
- *lock = Some(msg);
- None
- }
- None => Some(msg),
- };
- (ret, self.signal())
- }
-
- pub fn is_empty(&self) -> bool {
- self.lock().map(|s| s.is_none()).unwrap_or(true)
- }
-
- pub fn try_take(&self) -> Option<T> {
- self.lock().unwrap().take()
- }
-
- pub fn trigger(signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(None, signal))
- }
-
- pub fn signal(&self) -> &S {
- &self.1
- }
-
- pub fn fire_nothing(&self) -> bool {
- self.signal().fire()
- }
-}
-
-impl<T> Hook<T, SyncSignal> {
- pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- let msg = self.lock().unwrap().take();
- if let Some(msg) = msg {
- break Some(msg);
- } else if disconnected {
- break None;
- } else {
- self.signal().wait()
- }
- }
- }
-
- // Err(true) if timeout
- pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- let msg = self.lock().unwrap().take();
- if let Some(msg) = msg {
- break Ok(msg);
- } else if disconnected {
- break Err(false);
- } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
- self.signal().wait_timeout(dur);
- } else {
- break Err(true);
- }
- }
- }
-
- pub fn wait_send(&self, abort: &AtomicBool) {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- if disconnected || self.lock().unwrap().is_none() {
- break;
- }
-
- self.signal().wait();
- }
- }
-
- // Err(true) if timeout
- pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- if self.lock().unwrap().is_none() {
- break Ok(());
- } else if disconnected {
- break Err(false);
- } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
- self.signal().wait_timeout(dur);
- } else {
- break Err(true);
- }
- }
- }
-}
-
-#[cfg(feature = "spin")]
-#[inline]
-fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
- let mut i = 4;
- loop {
- for _ in 0..10 {
- if let Some(guard) = lock.try_lock() {
- return guard;
- }
- thread::yield_now();
- }
- // Sleep for at most ~1 ms
- thread::sleep(Duration::from_nanos(1 << i.min(20)));
- i += 1;
- }
-}
-
-#[cfg(not(feature = "spin"))]
-#[inline]
-fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
- lock.lock().unwrap()
-}
-
-#[cfg(not(feature = "spin"))]
-use std::sync::{Mutex, MutexGuard};
-
-#[cfg(feature = "spin")]
-type ChanLock<T> = Spinlock<T>;
-#[cfg(not(feature = "spin"))]
-type ChanLock<T> = Mutex<T>;
-
-
-type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
-struct Chan<T> {
- sending: Option<(usize, SignalVec<T>)>,
- queue: VecDeque<T>,
- waiting: SignalVec<T>,
-}
-
-impl<T> Chan<T> {
- fn pull_pending(&mut self, pull_extra: bool) {
- if let Some((cap, sending)) = &mut self.sending {
- let effective_cap = *cap + pull_extra as usize;
-
- while self.queue.len() < effective_cap {
- if let Some(s) = sending.pop_front() {
- let (msg, signal) = s.fire_recv();
- signal.fire();
- self.queue.push_back(msg);
- } else {
- break;
- }
- }
- }
- }
-
- fn try_wake_receiver_if_pending(&mut self) {
- if !self.queue.is_empty() {
- while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
- }
- }
-}
-
-struct Shared<T> {
- chan: ChanLock<Chan<T>>,
- disconnected: AtomicBool,
- sender_count: AtomicUsize,
- receiver_count: AtomicUsize,
-}
-
-impl<T> Shared<T> {
- fn new(cap: Option<usize>) -> Self {
- Self {
- chan: ChanLock::new(Chan {
- sending: cap.map(|cap| (cap, VecDeque::new())),
- queue: VecDeque::new(),
- waiting: VecDeque::new(),
- }),
- disconnected: AtomicBool::new(false),
- sender_count: AtomicUsize::new(1),
- receiver_count: AtomicUsize::new(1),
- }
- }
-
- fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
- &self,
- msg: T,
- should_block: bool,
- make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
- do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
- ) -> R {
- let mut chan = wait_lock(&self.chan);
-
- if self.is_disconnected() {
- Err(TrySendTimeoutError::Disconnected(msg)).into()
- } else if !chan.waiting.is_empty() {
- let mut msg = Some(msg);
-
- loop {
- let slot = chan.waiting.pop_front();
- match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
- // No more waiting receivers and msg in queue, so break out of the loop
- None if msg.is_none() => break,
- // No more waiting receivers, so add msg to queue and break out of the loop
- None => {
- chan.queue.push_back(msg.unwrap());
- break;
- }
- Some((Some(m), signal)) => {
- if signal.fire() {
- // Was async and a stream, so didn't acquire the message. Wake another
- // receiver, and do not yet push the message.
- msg.replace(m);
- continue;
- } else {
- // Was async and not a stream, so it did acquire the message. Push the
- // message to the queue for it to be received.
- chan.queue.push_back(m);
- drop(chan);
- break;
- }
- },
- Some((None, signal)) => {
- drop(chan);
- signal.fire();
- break; // Was sync, so it has acquired the message
- },
- }
- }
-
- Ok(()).into()
- } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
- chan.queue.push_back(msg);
- Ok(()).into()
- } else if should_block { // Only bounded from here on
- let hook = make_signal(msg);
- chan.sending.as_mut().unwrap().1.push_back(hook.clone());
- drop(chan);
-
- do_block(hook)
- } else {
- Err(TrySendTimeoutError::Full(msg)).into()
- }
- }
-
- fn send_sync(
- &self,
- msg: T,
- block: Option<Option<Instant>>,
- ) -> Result<(), TrySendTimeoutError<T>> {
- self.send(
- // msg
- msg,
- // should_block
- block.is_some(),
- // make_signal
- |msg| Hook::slot(Some(msg), SyncSignal::default()),
- // do_block
- |hook| if let Some(deadline) = block.unwrap() {
- hook.wait_deadline_send(&self.disconnected, deadline)
- .or_else(|timed_out| {
- if timed_out { // Remove our signal
- let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
- wait_lock(&self.chan).sending
- .as_mut()
- .unwrap().1
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- hook.try_take().map(|msg| if self.is_disconnected() {
- Err(TrySendTimeoutError::Disconnected(msg))
- } else {
- Err(TrySendTimeoutError::Timeout(msg))
- })
- .unwrap_or(Ok(()))
- })
- } else {
- hook.wait_send(&self.disconnected);
-
- match hook.try_take() {
- Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
- None => Ok(()),
- }
- },
- )
- }
-
- fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
- &self,
- should_block: bool,
- make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
- do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
- ) -> R {
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(true);
-
- if let Some(msg) = chan.queue.pop_front() {
- drop(chan);
- Ok(msg).into()
- } else if self.is_disconnected() {
- drop(chan);
- Err(TryRecvTimeoutError::Disconnected).into()
- } else if should_block {
- let hook = make_signal();
- chan.waiting.push_back(hook.clone());
- drop(chan);
-
- do_block(hook)
- } else {
- drop(chan);
- Err(TryRecvTimeoutError::Empty).into()
- }
- }
-
- fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
- self.recv(
- // should_block
- block.is_some(),
- // make_signal
- || Hook::slot(None, SyncSignal::default()),
- // do_block
- |hook| if let Some(deadline) = block.unwrap() {
- hook.wait_deadline_recv(&self.disconnected, deadline)
- .or_else(|timed_out| {
- if timed_out { // Remove our signal
- let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
- wait_lock(&self.chan).waiting
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- match hook.try_take() {
- Some(msg) => Ok(msg),
- None => {
- let disconnected = self.is_disconnected(); // Check disconnect *before* msg
- if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
- Ok(msg)
- } else if disconnected {
- Err(TryRecvTimeoutError::Disconnected)
- } else {
- Err(TryRecvTimeoutError::Timeout)
- }
- },
- }
- })
- } else {
- hook.wait_recv(&self.disconnected)
- .or_else(|| wait_lock(&self.chan).queue.pop_front())
- .ok_or(TryRecvTimeoutError::Disconnected)
- },
- )
- }
-
- /// Disconnect anything listening on this channel (this will not prevent receivers receiving
- /// msgs that have already been sent)
- fn disconnect_all(&self) {
- self.disconnected.store(true, Ordering::Relaxed);
-
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(false);
- if let Some((_, sending)) = chan.sending.as_ref() {
- sending.iter().for_each(|hook| {
- hook.signal().fire();
- })
- }
- chan.waiting.iter().for_each(|hook| {
- hook.signal().fire();
- });
- }
-
- fn is_disconnected(&self) -> bool {
- self.disconnected.load(Ordering::SeqCst)
- }
-
- fn is_empty(&self) -> bool {
- self.len() == 0
- }
-
- fn is_full(&self) -> bool {
- self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
- }
-
- fn len(&self) -> usize {
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(false);
- chan.queue.len()
- }
-
- fn capacity(&self) -> Option<usize> {
- wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
- }
-
- fn sender_count(&self) -> usize {
- self.sender_count.load(Ordering::Relaxed)
- }
-
- fn receiver_count(&self) -> usize {
- self.receiver_count.load(Ordering::Relaxed)
- }
-}
-
-/// A transmitting end of a channel.
-pub struct Sender<T> {
- shared: Arc<Shared<T>>,
-}
-
-impl<T> Sender<T> {
- /// Attempt to send a value into the channel. If the channel is bounded and full, or all
- /// receivers have been dropped, an error is returned. If the channel associated with this
- /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
- self.shared.send_sync(msg, None).map_err(|err| match err {
- TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
- TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped.
- /// If the channel is bounded and is full, this method will block until space is available
- /// or all receivers have been dropped. If the channel is unbounded, this method will not
- /// block.
- pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
- self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
- TrySendTimeoutError::Disconnected(msg) => SendError(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped
- /// or the deadline has passed. If the channel is bounded and is full, this method will
- /// block until space is available, the deadline is reached, or all receivers have been
- /// dropped.
- pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
- self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
- TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
- TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped
- /// or the timeout has expired. If the channel is bounded and is full, this method will
- /// block until space is available, the timeout has expired, or all receivers have been
- /// dropped.
- pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
- self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
- }
-
- /// Returns true if all receivers for this channel have been dropped.
- pub fn is_disconnected(&self) -> bool {
- self.shared.is_disconnected()
- }
-
- /// Returns true if the channel is empty.
- /// Note: Zero-capacity channels are always empty.
- pub fn is_empty(&self) -> bool {
- self.shared.is_empty()
- }
-
- /// Returns true if the channel is full.
- /// Note: Zero-capacity channels are always full.
- pub fn is_full(&self) -> bool {
- self.shared.is_full()
- }
-
- /// Returns the number of messages in the channel
- pub fn len(&self) -> usize {
- self.shared.len()
- }
-
- /// If the channel is bounded, returns its capacity.
- pub fn capacity(&self) -> Option<usize> {
- self.shared.capacity()
- }
-
- /// Get the number of senders that currently exist, including this one.
- pub fn sender_count(&self) -> usize {
- self.shared.sender_count()
- }
-
- /// Get the number of receivers that currently exist.
- ///
- /// Note that this method makes no guarantees that a subsequent send will succeed; it's
- /// possible that between `receiver_count()` being called and a `send()`, all open receivers
- /// could drop.
- pub fn receiver_count(&self) -> usize {
- self.shared.receiver_count()
- }
-
- /// Creates a [`WeakSender`] that does not keep the channel open.
- ///
- /// The channel is closed once all `Sender`s are dropped, even if there
- /// are still active `WeakSender`s.
- pub fn downgrade(&self) -> WeakSender<T> {
- WeakSender {
- shared: Arc::downgrade(&self.shared),
- }
- }
-
- /// Returns whether the senders are belong to the same channel.
- pub fn same_channel(&self, other: &Sender<T>) -> bool {
- Arc::ptr_eq(&self.shared, &other.shared)
- }
-}
-
-impl<T> Clone for Sender<T> {
- /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
- /// contents will only be cleaned up when all senders and the receiver have been dropped.
- fn clone(&self) -> Self {
- self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
- Self { shared: self.shared.clone() }
- }
-}
-
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Sender").finish()
- }
-}
-
-impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- // Notify receivers that all senders have been dropped if the number of senders drops to 0.
- if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
- self.shared.disconnect_all();
- }
- }
-}
-
-/// A sender that does not prevent the channel from being closed.
-///
-/// Weak senders do not count towards the number of active senders on the channel. As soon as
-/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
-/// `WeakSender`.
-///
-/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
-/// method.
-pub struct WeakSender<T> {
- shared: Weak<Shared<T>>,
-}
-
-impl<T> WeakSender<T> {
- /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
- ///
- /// Returns `None` if the channel was closed already. Note that a `Some` return value
- /// does not guarantee that the channel is still open.
- pub fn upgrade(&self) -> Option<Sender<T>> {
- self.shared
- .upgrade()
- // check that there are still live senders
- .filter(|shared| {
- shared
- .sender_count
- .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
- if count == 0 {
- // all senders are closed already -> don't increase the sender count
- None
- } else {
- // there is still at least one active sender
- Some(count + 1)
- }
- })
- .is_ok()
- })
- .map(|shared| Sender { shared })
- }
-}
-
-/// The receiving end of a channel.
-///
-/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
-/// Each message will only be received by a single receiver. This is useful for
-/// implementing work stealing for concurrent programs.
-pub struct Receiver<T> {
- shared: Arc<Shared<T>>,
-}
-
-impl<T> Receiver<T> {
- /// Attempt to fetch an incoming value from the channel associated with this receiver,
- /// returning an error if the channel is empty or if all senders have been dropped.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
- self.shared.recv_sync(None).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
- TryRecvTimeoutError::Empty => TryRecvError::Empty,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped.
- pub fn recv(&self) -> Result<T, RecvError> {
- self.shared.recv_sync(Some(None)).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped or the deadline has passed.
- pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
- self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
- TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped or the timeout has expired.
- pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
- self.recv_deadline(Instant::now().checked_add(dur).unwrap())
- }
-
- /// Create a blocking iterator over the values received on the channel that finishes iteration
- /// when all senders have been dropped.
- ///
- /// You can also create a self-owned iterator with [`Receiver::into_iter`].
- pub fn iter(&self) -> Iter<T> {
- Iter { receiver: &self }
- }
-
- /// A non-blocking iterator over the values received on the channel that finishes iteration
- /// when all senders have been dropped or the channel is empty.
- pub fn try_iter(&self) -> TryIter<T> {
- TryIter { receiver: &self }
- }
-
- /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
- /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
- /// the function has been called.
- pub fn drain(&self) -> Drain<T> {
- let mut chan = wait_lock(&self.shared.chan);
- chan.pull_pending(false);
- let queue = std::mem::take(&mut chan.queue);
-
- Drain { queue, _phantom: PhantomData }
- }
-
- /// Returns true if all senders for this channel have been dropped.
- pub fn is_disconnected(&self) -> bool {
- self.shared.is_disconnected()
- }
-
- /// Returns true if the channel is empty.
- /// Note: Zero-capacity channels are always empty.
- pub fn is_empty(&self) -> bool {
- self.shared.is_empty()
- }
-
- /// Returns true if the channel is full.
- /// Note: Zero-capacity channels are always full.
- pub fn is_full(&self) -> bool {
- self.shared.is_full()
- }
-
- /// Returns the number of messages in the channel.
- pub fn len(&self) -> usize {
- self.shared.len()
- }
-
- /// If the channel is bounded, returns its capacity.
- pub fn capacity(&self) -> Option<usize> {
- self.shared.capacity()
- }
-
- /// Get the number of senders that currently exist.
- pub fn sender_count(&self) -> usize {
- self.shared.sender_count()
- }
-
- /// Get the number of receivers that currently exist, including this one.
- pub fn receiver_count(&self) -> usize {
- self.shared.receiver_count()
- }
-
- /// Returns whether the receivers are belong to the same channel.
- pub fn same_channel(&self, other: &Receiver<T>) -> bool {
- Arc::ptr_eq(&self.shared, &other.shared)
- }
-}
-
-impl<T> Clone for Receiver<T> {
- /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
- /// channel contents will only be cleaned up when all senders and the receiver have been
- /// dropped.
- ///
- /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
- /// Each message will only be received by a single receiver. This is useful for
- /// implementing work stealing for concurrent programs.
- fn clone(&self) -> Self {
- self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
- Self { shared: self.shared.clone() }
- }
-}
-
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Receiver").finish()
- }
-}
-
-impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- // Notify senders that all receivers have been dropped if the number of receivers drops
- // to 0.
- if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
- self.shared.disconnect_all();
- }
- }
-}
-
-/// This exists as a shorthand for [`Receiver::iter`].
-impl<'a, T> IntoIterator for &'a Receiver<T> {
- type Item = T;
- type IntoIter = Iter<'a, T>;
-
- fn into_iter(self) -> Self::IntoIter {
- Iter { receiver: self }
- }
-}
-
-impl<T> IntoIterator for Receiver<T> {
- type Item = T;
- type IntoIter = IntoIter<T>;
-
- /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
- fn into_iter(self) -> Self::IntoIter {
- IntoIter { receiver: self }
- }
-}
-
-/// An iterator over the msgs received from a channel.
-pub struct Iter<'a, T> {
- receiver: &'a Receiver<T>,
-}
-
-impl<'a, T> Iterator for Iter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.recv().ok()
- }
-}
-
-/// An non-blocking iterator over the msgs received from a channel.
-pub struct TryIter<'a, T> {
- receiver: &'a Receiver<T>,
-}
-
-impl<'a, T> Iterator for TryIter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.try_recv().ok()
- }
-}
-
-/// An fixed-sized iterator over the msgs drained from a channel.
-#[derive(Debug)]
-pub struct Drain<'a, T> {
- queue: VecDeque<T>,
- /// A phantom field used to constrain the lifetime of this iterator. We do this because the
- /// implementation may change and we don't want to unintentionally constrain it. Removing this
- /// lifetime later is a possibility.
- _phantom: PhantomData<&'a ()>,
-}
-
-impl<'a, T> Iterator for Drain<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.queue.pop_front()
- }
-}
-
-impl<'a, T> ExactSizeIterator for Drain<'a, T> {
- fn len(&self) -> usize {
- self.queue.len()
- }
-}
-
-/// An owned iterator over the msgs received from a channel.
-pub struct IntoIter<T> {
- receiver: Receiver<T>,
-}
-
-impl<T> Iterator for IntoIter<T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.recv().ok()
- }
-}
-
-/// Create a channel with no maximum capacity.
-///
-/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
-/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
-/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
-/// may be cloned.
-///
-/// # Examples
-/// ```
-/// let (tx, rx) = flume::unbounded();
-///
-/// tx.send(42).unwrap();
-/// assert_eq!(rx.recv().unwrap(), 42);
-/// ```
-pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
- let shared = Arc::new(Shared::new(None));
- (
- Sender { shared: shared.clone() },
- Receiver { shared },
- )
-}
-
-/// Create a channel with a maximum capacity.
-///
-/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
-/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
-/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
-/// may be cloned.
-///
-/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
-/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
-/// is not desired, [`Sender::try_send`] may be used.
-///
-/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
-/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
-/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
-/// perform a handshake and transfer ownership of a value.
-///
-/// # Examples
-/// ```
-/// let (tx, rx) = flume::bounded(32);
-///
-/// for i in 1..33 {
-/// tx.send(i).unwrap();
-/// }
-/// assert!(tx.try_send(33).is_err());
-///
-/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
-/// ```
-pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
- let shared = Arc::new(Shared::new(Some(cap)));
- (
- Sender { shared: shared.clone() },
- Receiver { shared },
- )
-}
diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs
deleted file mode 100644
index cef15aa..0000000
--- a/vendor/flume/src/select.rs
+++ /dev/null
@@ -1,405 +0,0 @@
-//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.
-
-use crate::*;
-use spin1::Mutex as Spinlock;
-use std::{any::Any, marker::PhantomData};
-
-#[cfg(feature = "eventual-fairness")]
-use nanorand::Rng;
-
-// A unique token corresponding to an event in a selector
-type Token = usize;
-
-struct SelectSignal(
- thread::Thread,
- Token,
- AtomicBool,
- Arc<Spinlock<VecDeque<Token>>>,
-);
-
-impl Signal for SelectSignal {
- fn fire(&self) -> bool {
- self.2.store(true, Ordering::SeqCst);
- self.3.lock().push_back(self.1);
- self.0.unpark();
- false
- }
-
- fn as_any(&self) -> &(dyn Any + 'static) {
- self
- }
- fn as_ptr(&self) -> *const () {
- self as *const _ as *const ()
- }
-}
-
-trait Selection<'a, T> {
- fn init(&mut self) -> Option<T>;
- fn poll(&mut self) -> Option<T>;
- fn deinit(&mut self);
-}
-
-/// An error that may be emitted when attempting to wait for a value on a receiver.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum SelectError {
- /// A timeout occurred when waiting on a `Selector`.
- Timeout,
-}
-
-impl fmt::Display for SelectError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- SelectError::Timeout => "timeout occurred".fmt(f),
- }
- }
-}
-
-impl std::error::Error for SelectError {}
-
-/// A type used to wait upon multiple blocking operations at once.
-///
-/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour,
-/// allowing a thread to wait upon the result of more than one operation at once.
-///
-/// # Examples
-/// ```
-/// let (tx0, rx0) = flume::unbounded();
-/// let (tx1, rx1) = flume::unbounded();
-///
-/// std::thread::spawn(move || {
-/// tx0.send(true).unwrap();
-/// tx1.send(42).unwrap();
-/// });
-///
-/// flume::Selector::new()
-/// .recv(&rx0, |b| println!("Received {:?}", b))
-/// .recv(&rx1, |n| println!("Received {:?}", n))
-/// .wait();
-/// ```
-pub struct Selector<'a, T: 'a> {
- selections: Vec<Box<dyn Selection<'a, T> + 'a>>,
- next_poll: usize,
- signalled: Arc<Spinlock<VecDeque<Token>>>,
- #[cfg(feature = "eventual-fairness")]
- rng: nanorand::WyRand,
- phantom: PhantomData<*const ()>,
-}
-
-impl<'a, T: 'a> Default for Selector<'a, T> {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl<'a, T: 'a> fmt::Debug for Selector<'a, T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Selector").finish()
- }
-}
-
-impl<'a, T> Selector<'a, T> {
- /// Create a new selector.
- pub fn new() -> Self {
- Self {
- selections: Vec::new(),
- next_poll: 0,
- signalled: Arc::default(),
- phantom: PhantomData::default(),
- #[cfg(feature = "eventual-fairness")]
- rng: nanorand::WyRand::new(),
- }
- }
-
- /// Add a send operation to the selector that sends the provided value.
- ///
- /// Once added, the selector can be used to run the provided handler function on completion of this operation.
- pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>(
- mut self,
- sender: &'a Sender<U>,
- msg: U,
- mapper: F,
- ) -> Self {
- struct SendSelection<'a, T, F, U> {
- sender: &'a Sender<U>,
- msg: Option<U>,
- token: Token,
- signalled: Arc<Spinlock<VecDeque<Token>>>,
- hook: Option<Arc<Hook<U, SelectSignal>>>,
- mapper: F,
- phantom: PhantomData<T>,
- }
-
- impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U>
- where
- F: FnMut(Result<(), SendError<U>>) -> T,
- {
- fn init(&mut self) -> Option<T> {
- let token = self.token;
- let signalled = self.signalled.clone();
- let r = self.sender.shared.send(
- self.msg.take().unwrap(),
- true,
- |msg| {
- Hook::slot(
- Some(msg),
- SelectSignal(
- thread::current(),
- token,
- AtomicBool::new(false),
- signalled,
- ),
- )
- },
- // Always runs
- |h| {
- self.hook = Some(h);
- Ok(())
- },
- );
-
- if self.hook.is_none() {
- Some((self.mapper)(match r {
- Ok(()) => Ok(()),
- Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)),
- _ => unreachable!(),
- }))
- } else {
- None
- }
- }
-
- fn poll(&mut self) -> Option<T> {
- let res = if self.sender.shared.is_disconnected() {
- // Check the hook one last time
- if let Some(msg) = self.hook.as_ref()?.try_take() {
- Err(SendError(msg))
- } else {
- Ok(())
- }
- } else if self.hook.as_ref().unwrap().is_empty() {
- // The message was sent
- Ok(())
- } else {
- return None;
- };
-
- Some((&mut self.mapper)(res))
- }
-
- fn deinit(&mut self) {
- if let Some(hook) = self.hook.take() {
- // Remove hook
- let hook: Arc<Hook<U, dyn Signal>> = hook;
- wait_lock(&self.sender.shared.chan)
- .sending
- .as_mut()
- .unwrap()
- .1
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- }
- }
-
- let token = self.selections.len();
- self.selections.push(Box::new(SendSelection {
- sender,
- msg: Some(msg),
- token,
- signalled: self.signalled.clone(),
- hook: None,
- mapper,
- phantom: Default::default(),
- }));
-
- self
- }
-
- /// Add a receive operation to the selector.
- ///
- /// Once added, the selector can be used to run the provided handler function on completion of this operation.
- pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>(
- mut self,
- receiver: &'a Receiver<U>,
- mapper: F,
- ) -> Self {
- struct RecvSelection<'a, T, F, U> {
- receiver: &'a Receiver<U>,
- token: Token,
- signalled: Arc<Spinlock<VecDeque<Token>>>,
- hook: Option<Arc<Hook<U, SelectSignal>>>,
- mapper: F,
- received: bool,
- phantom: PhantomData<T>,
- }
-
- impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U>
- where
- F: FnMut(Result<U, RecvError>) -> T,
- {
- fn init(&mut self) -> Option<T> {
- let token = self.token;
- let signalled = self.signalled.clone();
- let r = self.receiver.shared.recv(
- true,
- || {
- Hook::trigger(SelectSignal(
- thread::current(),
- token,
- AtomicBool::new(false),
- signalled,
- ))
- },
- // Always runs
- |h| {
- self.hook = Some(h);
- Err(TryRecvTimeoutError::Timeout)
- },
- );
-
- if self.hook.is_none() {
- Some((self.mapper)(match r {
- Ok(msg) => Ok(msg),
- Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected),
- _ => unreachable!(),
- }))
- } else {
- None
- }
- }
-
- fn poll(&mut self) -> Option<T> {
- let res = if let Ok(msg) = self.receiver.try_recv() {
- self.received = true;
- Ok(msg)
- } else if self.receiver.shared.is_disconnected() {
- Err(RecvError::Disconnected)
- } else {
- return None;
- };
-
- Some((&mut self.mapper)(res))
- }
-
- fn deinit(&mut self) {
- if let Some(hook) = self.hook.take() {
- // Remove hook
- let hook: Arc<Hook<U, dyn Signal>> = hook;
- wait_lock(&self.receiver.shared.chan)
- .waiting
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- // If we were woken, but never polled, wake up another
- if !self.received
- && hook
- .signal()
- .as_any()
- .downcast_ref::<SelectSignal>()
- .unwrap()
- .2
- .load(Ordering::SeqCst)
- {
- wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending();
- }
- }
- }
- }
-
- let token = self.selections.len();
- self.selections.push(Box::new(RecvSelection {
- receiver,
- token,
- signalled: self.signalled.clone(),
- hook: None,
- mapper,
- received: false,
- phantom: Default::default(),
- }));
-
- self
- }
-
- fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> {
- #[cfg(feature = "eventual-fairness")]
- {
- self.next_poll = self.rng.generate_range(0..self.selections.len());
- }
-
- let res = 'outer: loop {
- // Init signals
- for _ in 0..self.selections.len() {
- if let Some(val) = self.selections[self.next_poll].init() {
- break 'outer Some(val);
- }
- self.next_poll = (self.next_poll + 1) % self.selections.len();
- }
-
- // Speculatively poll
- if let Some(msg) = self.poll() {
- break 'outer Some(msg);
- }
-
- loop {
- if let Some(deadline) = deadline {
- if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
- thread::park_timeout(dur);
- }
- } else {
- thread::park();
- }
-
- if deadline.map(|d| Instant::now() >= d).unwrap_or(false) {
- break 'outer self.poll();
- }
-
- let token = if let Some(token) = self.signalled.lock().pop_front() {
- token
- } else {
- // Spurious wakeup, park again
- continue;
- };
-
- // Attempt to receive a message
- if let Some(msg) = self.selections[token].poll() {
- break 'outer Some(msg);
- }
- }
- };
-
- // Deinit signals
- for s in &mut self.selections {
- s.deinit();
- }
-
- res
- }
-
- fn poll(&mut self) -> Option<T> {
- for _ in 0..self.selections.len() {
- if let Some(val) = self.selections[self.next_poll].poll() {
- return Some(val);
- }
- self.next_poll = (self.next_poll + 1) % self.selections.len();
- }
- None
- }
-
- /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness`
- /// feature flag is enabled, this method is fair and will handle a random event of those that are ready.
- pub fn wait(self) -> T {
- self.wait_inner(None).unwrap()
- }
-
- /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the
- /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that
- /// are ready.
- pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> {
- self.wait_inner(Some(Instant::now() + dur))
- .ok_or(SelectError::Timeout)
- }
-
- /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached.
- /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those
- /// that are ready.
- pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> {
- self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout)
- }
-}
diff --git a/vendor/flume/src/signal.rs b/vendor/flume/src/signal.rs
deleted file mode 100644
index 89395a3..0000000
--- a/vendor/flume/src/signal.rs
+++ /dev/null
@@ -1,33 +0,0 @@
-use std::{thread::{self, Thread}, time::Duration, any::Any};
-
-pub trait Signal: Send + Sync + 'static {
- /// Fire the signal, returning whether it is a stream signal. This is because streams do not
- /// acquire a message when woken, so signals must be fired until one that does acquire a message
- /// is fired, otherwise a wakeup could be missed, leading to a lost message until one is eagerly
- /// grabbed by a receiver.
- fn fire(&self) -> bool;
- fn as_any(&self) -> &(dyn Any + 'static);
- fn as_ptr(&self) -> *const ();
-}
-
-pub struct SyncSignal(Thread);
-
-impl Default for SyncSignal {
- fn default() -> Self {
- Self(thread::current())
- }
-}
-
-impl Signal for SyncSignal {
- fn fire(&self) -> bool {
- self.0.unpark();
- false
- }
- fn as_any(&self) -> &(dyn Any + 'static) { self }
- fn as_ptr(&self) -> *const () { self as *const _ as *const () }
-}
-
-impl SyncSignal {
- pub fn wait(&self) { thread::park(); }
- pub fn wait_timeout(&self, dur: Duration) { thread::park_timeout(dur); }
-}