aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/src/lib.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
committerValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
commita990de90fe41456a23e58bd087d2f107d321f3a1 (patch)
tree15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/flume/src/lib.rs
parent3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff)
downloadfparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz
fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip
Deleted vendor folder
Diffstat (limited to 'vendor/flume/src/lib.rs')
-rw-r--r--vendor/flume/src/lib.rs1142
1 files changed, 0 insertions, 1142 deletions
diff --git a/vendor/flume/src/lib.rs b/vendor/flume/src/lib.rs
deleted file mode 100644
index c9bb3ee..0000000
--- a/vendor/flume/src/lib.rs
+++ /dev/null
@@ -1,1142 +0,0 @@
-//! # Flume
-//!
-//! A blazingly fast multi-producer, multi-consumer channel.
-//!
-//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
-//!
-//! ## Why Flume?
-//!
-//! - **Featureful**: Unbounded, bounded and rendezvous queues
-//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
-//! - **Safe**: No `unsafe` code anywhere in the codebase!
-//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
-//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
-//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
-//! - **Simple**: Few dependencies, minimal codebase, fast to compile
-//! - **Asynchronous**: `async` support, including mix 'n match with sync code
-//! - **Ergonomic**: Powerful `select`-like interface
-//!
-//! ## Example
-//!
-//! ```
-//! let (tx, rx) = flume::unbounded();
-//!
-//! tx.send(42).unwrap();
-//! assert_eq!(rx.recv().unwrap(), 42);
-//! ```
-
-#![deny(missing_docs)]
-
-#[cfg(feature = "select")]
-pub mod select;
-#[cfg(feature = "async")]
-pub mod r#async;
-
-mod signal;
-
-// Reexports
-#[cfg(feature = "select")]
-pub use select::Selector;
-
-use std::{
- collections::VecDeque,
- sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
- time::{Duration, Instant},
- marker::PhantomData,
- thread,
- fmt,
-};
-
-#[cfg(feature = "spin")]
-use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
-use crate::signal::{Signal, SyncSignal};
-
-/// An error that may be emitted when attempting to send a value into a channel on a sender when
-/// all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub struct SendError<T>(pub T);
-
-impl<T> SendError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T { self.0 }
-}
-
-impl<T> fmt::Debug for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "SendError(..)".fmt(f)
- }
-}
-
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "sending on a closed channel".fmt(f)
- }
-}
-
-impl<T> std::error::Error for SendError<T> {}
-
-/// An error that may be emitted when attempting to send a value into a channel on a sender when
-/// the channel is full or all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub enum TrySendError<T> {
- /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
- Full(T),
- /// All channel receivers were dropped and so the message has nobody to receive it.
- Disconnected(T),
-}
-
-impl<T> TrySendError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T {
- match self {
- Self::Full(msg) | Self::Disconnected(msg) => msg,
- }
- }
-}
-
-impl<T> fmt::Debug for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- TrySendError::Full(..) => "Full(..)".fmt(f),
- TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
- }
- }
-}
-
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- TrySendError::Full(..) => "sending on a full channel".fmt(f),
- TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
- }
- }
-}
-
-impl<T> std::error::Error for TrySendError<T> {}
-
-impl<T> From<SendError<T>> for TrySendError<T> {
- fn from(err: SendError<T>) -> Self {
- match err {
- SendError(item) => Self::Disconnected(item),
- }
- }
-}
-
-/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
-/// the send operation times out or all receivers are dropped.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub enum SendTimeoutError<T> {
- /// A timeout occurred when attempting to send the message.
- Timeout(T),
- /// All channel receivers were dropped and so the message has nobody to receive it.
- Disconnected(T),
-}
-
-impl<T> SendTimeoutError<T> {
- /// Consume the error, yielding the message that failed to send.
- pub fn into_inner(self) -> T {
- match self {
- Self::Timeout(msg) | Self::Disconnected(msg) => msg,
- }
- }
-}
-
-impl<T> fmt::Debug for SendTimeoutError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "SendTimeoutError(..)".fmt(f)
- }
-}
-
-impl<T> fmt::Display for SendTimeoutError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
- SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
- }
- }
-}
-
-impl<T> std::error::Error for SendTimeoutError<T> {}
-
-impl<T> From<SendError<T>> for SendTimeoutError<T> {
- fn from(err: SendError<T>) -> Self {
- match err {
- SendError(item) => Self::Disconnected(item),
- }
- }
-}
-
-enum TrySendTimeoutError<T> {
- Full(T),
- Disconnected(T),
- Timeout(T),
-}
-
-/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
-/// are dropped and there are no more messages in the channel.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum RecvError {
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for RecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- RecvError::Disconnected => "receiving on a closed channel".fmt(f),
- }
- }
-}
-
-impl std::error::Error for RecvError {}
-
-/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
-/// messages in the channel. If there are no messages in the channel and all senders are dropped,
-/// then `TryRecvError::Disconnected` will be returned.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum TryRecvError {
- /// The channel was empty when the receive was attempted.
- Empty,
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for TryRecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- TryRecvError::Empty => "receiving on an empty channel".fmt(f),
- TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
- }
- }
-}
-
-impl std::error::Error for TryRecvError {}
-
-impl From<RecvError> for TryRecvError {
- fn from(err: RecvError) -> Self {
- match err {
- RecvError::Disconnected => Self::Disconnected,
- }
- }
-}
-
-/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
-/// when the receive operation times out or all senders are dropped and there are no values left
-/// in the channel.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum RecvTimeoutError {
- /// A timeout occurred when attempting to receive a message.
- Timeout,
- /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
- Disconnected,
-}
-
-impl fmt::Display for RecvTimeoutError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
- RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
- }
- }
-}
-
-impl std::error::Error for RecvTimeoutError {}
-
-impl From<RecvError> for RecvTimeoutError {
- fn from(err: RecvError) -> Self {
- match err {
- RecvError::Disconnected => Self::Disconnected,
- }
- }
-}
-
-enum TryRecvTimeoutError {
- Empty,
- Timeout,
- Disconnected,
-}
-
-// TODO: Investigate some sort of invalidation flag for timeouts
-#[cfg(feature = "spin")]
-struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
-
-#[cfg(not(feature = "spin"))]
-struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
-
-#[cfg(feature = "spin")]
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(Some(Spinlock::new(msg)), signal))
- }
-
- fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
- self.0.as_ref().map(|s| s.lock())
- }
-}
-
-#[cfg(not(feature = "spin"))]
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(Some(Mutex::new(msg)), signal))
- }
-
- fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
- self.0.as_ref().map(|s| s.lock().unwrap())
- }
-}
-
-impl<T, S: ?Sized + Signal> Hook<T, S> {
- pub fn fire_recv(&self) -> (T, &S) {
- let msg = self.lock().unwrap().take().unwrap();
- (msg, self.signal())
- }
-
- pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
- let ret = match self.lock() {
- Some(mut lock) => {
- *lock = Some(msg);
- None
- }
- None => Some(msg),
- };
- (ret, self.signal())
- }
-
- pub fn is_empty(&self) -> bool {
- self.lock().map(|s| s.is_none()).unwrap_or(true)
- }
-
- pub fn try_take(&self) -> Option<T> {
- self.lock().unwrap().take()
- }
-
- pub fn trigger(signal: S) -> Arc<Self>
- where
- S: Sized,
- {
- Arc::new(Self(None, signal))
- }
-
- pub fn signal(&self) -> &S {
- &self.1
- }
-
- pub fn fire_nothing(&self) -> bool {
- self.signal().fire()
- }
-}
-
-impl<T> Hook<T, SyncSignal> {
- pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- let msg = self.lock().unwrap().take();
- if let Some(msg) = msg {
- break Some(msg);
- } else if disconnected {
- break None;
- } else {
- self.signal().wait()
- }
- }
- }
-
- // Err(true) if timeout
- pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- let msg = self.lock().unwrap().take();
- if let Some(msg) = msg {
- break Ok(msg);
- } else if disconnected {
- break Err(false);
- } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
- self.signal().wait_timeout(dur);
- } else {
- break Err(true);
- }
- }
- }
-
- pub fn wait_send(&self, abort: &AtomicBool) {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- if disconnected || self.lock().unwrap().is_none() {
- break;
- }
-
- self.signal().wait();
- }
- }
-
- // Err(true) if timeout
- pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
- loop {
- let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
- if self.lock().unwrap().is_none() {
- break Ok(());
- } else if disconnected {
- break Err(false);
- } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
- self.signal().wait_timeout(dur);
- } else {
- break Err(true);
- }
- }
- }
-}
-
-#[cfg(feature = "spin")]
-#[inline]
-fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
- let mut i = 4;
- loop {
- for _ in 0..10 {
- if let Some(guard) = lock.try_lock() {
- return guard;
- }
- thread::yield_now();
- }
- // Sleep for at most ~1 ms
- thread::sleep(Duration::from_nanos(1 << i.min(20)));
- i += 1;
- }
-}
-
-#[cfg(not(feature = "spin"))]
-#[inline]
-fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
- lock.lock().unwrap()
-}
-
-#[cfg(not(feature = "spin"))]
-use std::sync::{Mutex, MutexGuard};
-
-#[cfg(feature = "spin")]
-type ChanLock<T> = Spinlock<T>;
-#[cfg(not(feature = "spin"))]
-type ChanLock<T> = Mutex<T>;
-
-
-type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
-struct Chan<T> {
- sending: Option<(usize, SignalVec<T>)>,
- queue: VecDeque<T>,
- waiting: SignalVec<T>,
-}
-
-impl<T> Chan<T> {
- fn pull_pending(&mut self, pull_extra: bool) {
- if let Some((cap, sending)) = &mut self.sending {
- let effective_cap = *cap + pull_extra as usize;
-
- while self.queue.len() < effective_cap {
- if let Some(s) = sending.pop_front() {
- let (msg, signal) = s.fire_recv();
- signal.fire();
- self.queue.push_back(msg);
- } else {
- break;
- }
- }
- }
- }
-
- fn try_wake_receiver_if_pending(&mut self) {
- if !self.queue.is_empty() {
- while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
- }
- }
-}
-
-struct Shared<T> {
- chan: ChanLock<Chan<T>>,
- disconnected: AtomicBool,
- sender_count: AtomicUsize,
- receiver_count: AtomicUsize,
-}
-
-impl<T> Shared<T> {
- fn new(cap: Option<usize>) -> Self {
- Self {
- chan: ChanLock::new(Chan {
- sending: cap.map(|cap| (cap, VecDeque::new())),
- queue: VecDeque::new(),
- waiting: VecDeque::new(),
- }),
- disconnected: AtomicBool::new(false),
- sender_count: AtomicUsize::new(1),
- receiver_count: AtomicUsize::new(1),
- }
- }
-
- fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
- &self,
- msg: T,
- should_block: bool,
- make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
- do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
- ) -> R {
- let mut chan = wait_lock(&self.chan);
-
- if self.is_disconnected() {
- Err(TrySendTimeoutError::Disconnected(msg)).into()
- } else if !chan.waiting.is_empty() {
- let mut msg = Some(msg);
-
- loop {
- let slot = chan.waiting.pop_front();
- match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
- // No more waiting receivers and msg in queue, so break out of the loop
- None if msg.is_none() => break,
- // No more waiting receivers, so add msg to queue and break out of the loop
- None => {
- chan.queue.push_back(msg.unwrap());
- break;
- }
- Some((Some(m), signal)) => {
- if signal.fire() {
- // Was async and a stream, so didn't acquire the message. Wake another
- // receiver, and do not yet push the message.
- msg.replace(m);
- continue;
- } else {
- // Was async and not a stream, so it did acquire the message. Push the
- // message to the queue for it to be received.
- chan.queue.push_back(m);
- drop(chan);
- break;
- }
- },
- Some((None, signal)) => {
- drop(chan);
- signal.fire();
- break; // Was sync, so it has acquired the message
- },
- }
- }
-
- Ok(()).into()
- } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
- chan.queue.push_back(msg);
- Ok(()).into()
- } else if should_block { // Only bounded from here on
- let hook = make_signal(msg);
- chan.sending.as_mut().unwrap().1.push_back(hook.clone());
- drop(chan);
-
- do_block(hook)
- } else {
- Err(TrySendTimeoutError::Full(msg)).into()
- }
- }
-
- fn send_sync(
- &self,
- msg: T,
- block: Option<Option<Instant>>,
- ) -> Result<(), TrySendTimeoutError<T>> {
- self.send(
- // msg
- msg,
- // should_block
- block.is_some(),
- // make_signal
- |msg| Hook::slot(Some(msg), SyncSignal::default()),
- // do_block
- |hook| if let Some(deadline) = block.unwrap() {
- hook.wait_deadline_send(&self.disconnected, deadline)
- .or_else(|timed_out| {
- if timed_out { // Remove our signal
- let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
- wait_lock(&self.chan).sending
- .as_mut()
- .unwrap().1
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- hook.try_take().map(|msg| if self.is_disconnected() {
- Err(TrySendTimeoutError::Disconnected(msg))
- } else {
- Err(TrySendTimeoutError::Timeout(msg))
- })
- .unwrap_or(Ok(()))
- })
- } else {
- hook.wait_send(&self.disconnected);
-
- match hook.try_take() {
- Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
- None => Ok(()),
- }
- },
- )
- }
-
- fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
- &self,
- should_block: bool,
- make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
- do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
- ) -> R {
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(true);
-
- if let Some(msg) = chan.queue.pop_front() {
- drop(chan);
- Ok(msg).into()
- } else if self.is_disconnected() {
- drop(chan);
- Err(TryRecvTimeoutError::Disconnected).into()
- } else if should_block {
- let hook = make_signal();
- chan.waiting.push_back(hook.clone());
- drop(chan);
-
- do_block(hook)
- } else {
- drop(chan);
- Err(TryRecvTimeoutError::Empty).into()
- }
- }
-
- fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
- self.recv(
- // should_block
- block.is_some(),
- // make_signal
- || Hook::slot(None, SyncSignal::default()),
- // do_block
- |hook| if let Some(deadline) = block.unwrap() {
- hook.wait_deadline_recv(&self.disconnected, deadline)
- .or_else(|timed_out| {
- if timed_out { // Remove our signal
- let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
- wait_lock(&self.chan).waiting
- .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
- }
- match hook.try_take() {
- Some(msg) => Ok(msg),
- None => {
- let disconnected = self.is_disconnected(); // Check disconnect *before* msg
- if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
- Ok(msg)
- } else if disconnected {
- Err(TryRecvTimeoutError::Disconnected)
- } else {
- Err(TryRecvTimeoutError::Timeout)
- }
- },
- }
- })
- } else {
- hook.wait_recv(&self.disconnected)
- .or_else(|| wait_lock(&self.chan).queue.pop_front())
- .ok_or(TryRecvTimeoutError::Disconnected)
- },
- )
- }
-
- /// Disconnect anything listening on this channel (this will not prevent receivers receiving
- /// msgs that have already been sent)
- fn disconnect_all(&self) {
- self.disconnected.store(true, Ordering::Relaxed);
-
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(false);
- if let Some((_, sending)) = chan.sending.as_ref() {
- sending.iter().for_each(|hook| {
- hook.signal().fire();
- })
- }
- chan.waiting.iter().for_each(|hook| {
- hook.signal().fire();
- });
- }
-
- fn is_disconnected(&self) -> bool {
- self.disconnected.load(Ordering::SeqCst)
- }
-
- fn is_empty(&self) -> bool {
- self.len() == 0
- }
-
- fn is_full(&self) -> bool {
- self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
- }
-
- fn len(&self) -> usize {
- let mut chan = wait_lock(&self.chan);
- chan.pull_pending(false);
- chan.queue.len()
- }
-
- fn capacity(&self) -> Option<usize> {
- wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
- }
-
- fn sender_count(&self) -> usize {
- self.sender_count.load(Ordering::Relaxed)
- }
-
- fn receiver_count(&self) -> usize {
- self.receiver_count.load(Ordering::Relaxed)
- }
-}
-
-/// A transmitting end of a channel.
-pub struct Sender<T> {
- shared: Arc<Shared<T>>,
-}
-
-impl<T> Sender<T> {
- /// Attempt to send a value into the channel. If the channel is bounded and full, or all
- /// receivers have been dropped, an error is returned. If the channel associated with this
- /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
- pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
- self.shared.send_sync(msg, None).map_err(|err| match err {
- TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
- TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped.
- /// If the channel is bounded and is full, this method will block until space is available
- /// or all receivers have been dropped. If the channel is unbounded, this method will not
- /// block.
- pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
- self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
- TrySendTimeoutError::Disconnected(msg) => SendError(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped
- /// or the deadline has passed. If the channel is bounded and is full, this method will
- /// block until space is available, the deadline is reached, or all receivers have been
- /// dropped.
- pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
- self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
- TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
- TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
- _ => unreachable!(),
- })
- }
-
- /// Send a value into the channel, returning an error if all receivers have been dropped
- /// or the timeout has expired. If the channel is bounded and is full, this method will
- /// block until space is available, the timeout has expired, or all receivers have been
- /// dropped.
- pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
- self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
- }
-
- /// Returns true if all receivers for this channel have been dropped.
- pub fn is_disconnected(&self) -> bool {
- self.shared.is_disconnected()
- }
-
- /// Returns true if the channel is empty.
- /// Note: Zero-capacity channels are always empty.
- pub fn is_empty(&self) -> bool {
- self.shared.is_empty()
- }
-
- /// Returns true if the channel is full.
- /// Note: Zero-capacity channels are always full.
- pub fn is_full(&self) -> bool {
- self.shared.is_full()
- }
-
- /// Returns the number of messages in the channel
- pub fn len(&self) -> usize {
- self.shared.len()
- }
-
- /// If the channel is bounded, returns its capacity.
- pub fn capacity(&self) -> Option<usize> {
- self.shared.capacity()
- }
-
- /// Get the number of senders that currently exist, including this one.
- pub fn sender_count(&self) -> usize {
- self.shared.sender_count()
- }
-
- /// Get the number of receivers that currently exist.
- ///
- /// Note that this method makes no guarantees that a subsequent send will succeed; it's
- /// possible that between `receiver_count()` being called and a `send()`, all open receivers
- /// could drop.
- pub fn receiver_count(&self) -> usize {
- self.shared.receiver_count()
- }
-
- /// Creates a [`WeakSender`] that does not keep the channel open.
- ///
- /// The channel is closed once all `Sender`s are dropped, even if there
- /// are still active `WeakSender`s.
- pub fn downgrade(&self) -> WeakSender<T> {
- WeakSender {
- shared: Arc::downgrade(&self.shared),
- }
- }
-
- /// Returns whether the senders are belong to the same channel.
- pub fn same_channel(&self, other: &Sender<T>) -> bool {
- Arc::ptr_eq(&self.shared, &other.shared)
- }
-}
-
-impl<T> Clone for Sender<T> {
- /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
- /// contents will only be cleaned up when all senders and the receiver have been dropped.
- fn clone(&self) -> Self {
- self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
- Self { shared: self.shared.clone() }
- }
-}
-
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Sender").finish()
- }
-}
-
-impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- // Notify receivers that all senders have been dropped if the number of senders drops to 0.
- if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
- self.shared.disconnect_all();
- }
- }
-}
-
-/// A sender that does not prevent the channel from being closed.
-///
-/// Weak senders do not count towards the number of active senders on the channel. As soon as
-/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
-/// `WeakSender`.
-///
-/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
-/// method.
-pub struct WeakSender<T> {
- shared: Weak<Shared<T>>,
-}
-
-impl<T> WeakSender<T> {
- /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
- ///
- /// Returns `None` if the channel was closed already. Note that a `Some` return value
- /// does not guarantee that the channel is still open.
- pub fn upgrade(&self) -> Option<Sender<T>> {
- self.shared
- .upgrade()
- // check that there are still live senders
- .filter(|shared| {
- shared
- .sender_count
- .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
- if count == 0 {
- // all senders are closed already -> don't increase the sender count
- None
- } else {
- // there is still at least one active sender
- Some(count + 1)
- }
- })
- .is_ok()
- })
- .map(|shared| Sender { shared })
- }
-}
-
-/// The receiving end of a channel.
-///
-/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
-/// Each message will only be received by a single receiver. This is useful for
-/// implementing work stealing for concurrent programs.
-pub struct Receiver<T> {
- shared: Arc<Shared<T>>,
-}
-
-impl<T> Receiver<T> {
- /// Attempt to fetch an incoming value from the channel associated with this receiver,
- /// returning an error if the channel is empty or if all senders have been dropped.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
- self.shared.recv_sync(None).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
- TryRecvTimeoutError::Empty => TryRecvError::Empty,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped.
- pub fn recv(&self) -> Result<T, RecvError> {
- self.shared.recv_sync(Some(None)).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped or the deadline has passed.
- pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
- self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
- TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
- TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
- _ => unreachable!(),
- })
- }
-
- /// Wait for an incoming value from the channel associated with this receiver, returning an
- /// error if all senders have been dropped or the timeout has expired.
- pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
- self.recv_deadline(Instant::now().checked_add(dur).unwrap())
- }
-
- /// Create a blocking iterator over the values received on the channel that finishes iteration
- /// when all senders have been dropped.
- ///
- /// You can also create a self-owned iterator with [`Receiver::into_iter`].
- pub fn iter(&self) -> Iter<T> {
- Iter { receiver: &self }
- }
-
- /// A non-blocking iterator over the values received on the channel that finishes iteration
- /// when all senders have been dropped or the channel is empty.
- pub fn try_iter(&self) -> TryIter<T> {
- TryIter { receiver: &self }
- }
-
- /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
- /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
- /// the function has been called.
- pub fn drain(&self) -> Drain<T> {
- let mut chan = wait_lock(&self.shared.chan);
- chan.pull_pending(false);
- let queue = std::mem::take(&mut chan.queue);
-
- Drain { queue, _phantom: PhantomData }
- }
-
- /// Returns true if all senders for this channel have been dropped.
- pub fn is_disconnected(&self) -> bool {
- self.shared.is_disconnected()
- }
-
- /// Returns true if the channel is empty.
- /// Note: Zero-capacity channels are always empty.
- pub fn is_empty(&self) -> bool {
- self.shared.is_empty()
- }
-
- /// Returns true if the channel is full.
- /// Note: Zero-capacity channels are always full.
- pub fn is_full(&self) -> bool {
- self.shared.is_full()
- }
-
- /// Returns the number of messages in the channel.
- pub fn len(&self) -> usize {
- self.shared.len()
- }
-
- /// If the channel is bounded, returns its capacity.
- pub fn capacity(&self) -> Option<usize> {
- self.shared.capacity()
- }
-
- /// Get the number of senders that currently exist.
- pub fn sender_count(&self) -> usize {
- self.shared.sender_count()
- }
-
- /// Get the number of receivers that currently exist, including this one.
- pub fn receiver_count(&self) -> usize {
- self.shared.receiver_count()
- }
-
- /// Returns whether the receivers are belong to the same channel.
- pub fn same_channel(&self, other: &Receiver<T>) -> bool {
- Arc::ptr_eq(&self.shared, &other.shared)
- }
-}
-
-impl<T> Clone for Receiver<T> {
- /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
- /// channel contents will only be cleaned up when all senders and the receiver have been
- /// dropped.
- ///
- /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
- /// Each message will only be received by a single receiver. This is useful for
- /// implementing work stealing for concurrent programs.
- fn clone(&self) -> Self {
- self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
- Self { shared: self.shared.clone() }
- }
-}
-
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Receiver").finish()
- }
-}
-
-impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- // Notify senders that all receivers have been dropped if the number of receivers drops
- // to 0.
- if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
- self.shared.disconnect_all();
- }
- }
-}
-
-/// This exists as a shorthand for [`Receiver::iter`].
-impl<'a, T> IntoIterator for &'a Receiver<T> {
- type Item = T;
- type IntoIter = Iter<'a, T>;
-
- fn into_iter(self) -> Self::IntoIter {
- Iter { receiver: self }
- }
-}
-
-impl<T> IntoIterator for Receiver<T> {
- type Item = T;
- type IntoIter = IntoIter<T>;
-
- /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
- fn into_iter(self) -> Self::IntoIter {
- IntoIter { receiver: self }
- }
-}
-
-/// An iterator over the msgs received from a channel.
-pub struct Iter<'a, T> {
- receiver: &'a Receiver<T>,
-}
-
-impl<'a, T> Iterator for Iter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.recv().ok()
- }
-}
-
-/// An non-blocking iterator over the msgs received from a channel.
-pub struct TryIter<'a, T> {
- receiver: &'a Receiver<T>,
-}
-
-impl<'a, T> Iterator for TryIter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.try_recv().ok()
- }
-}
-
-/// An fixed-sized iterator over the msgs drained from a channel.
-#[derive(Debug)]
-pub struct Drain<'a, T> {
- queue: VecDeque<T>,
- /// A phantom field used to constrain the lifetime of this iterator. We do this because the
- /// implementation may change and we don't want to unintentionally constrain it. Removing this
- /// lifetime later is a possibility.
- _phantom: PhantomData<&'a ()>,
-}
-
-impl<'a, T> Iterator for Drain<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.queue.pop_front()
- }
-}
-
-impl<'a, T> ExactSizeIterator for Drain<'a, T> {
- fn len(&self) -> usize {
- self.queue.len()
- }
-}
-
-/// An owned iterator over the msgs received from a channel.
-pub struct IntoIter<T> {
- receiver: Receiver<T>,
-}
-
-impl<T> Iterator for IntoIter<T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.receiver.recv().ok()
- }
-}
-
-/// Create a channel with no maximum capacity.
-///
-/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
-/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
-/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
-/// may be cloned.
-///
-/// # Examples
-/// ```
-/// let (tx, rx) = flume::unbounded();
-///
-/// tx.send(42).unwrap();
-/// assert_eq!(rx.recv().unwrap(), 42);
-/// ```
-pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
- let shared = Arc::new(Shared::new(None));
- (
- Sender { shared: shared.clone() },
- Receiver { shared },
- )
-}
-
-/// Create a channel with a maximum capacity.
-///
-/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
-/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
-/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
-/// may be cloned.
-///
-/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
-/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
-/// is not desired, [`Sender::try_send`] may be used.
-///
-/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
-/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
-/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
-/// perform a handshake and transfer ownership of a value.
-///
-/// # Examples
-/// ```
-/// let (tx, rx) = flume::bounded(32);
-///
-/// for i in 1..33 {
-/// tx.send(i).unwrap();
-/// }
-/// assert!(tx.try_send(33).is_err());
-///
-/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
-/// ```
-pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
- let shared = Arc::new(Shared::new(Some(cap)));
- (
- Sender { shared: shared.clone() },
- Receiver { shared },
- )
-}