diff options
Diffstat (limited to 'vendor/flume/src/select.rs')
-rw-r--r-- | vendor/flume/src/select.rs | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs new file mode 100644 index 0000000..cef15aa --- /dev/null +++ b/vendor/flume/src/select.rs @@ -0,0 +1,405 @@ +//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. + +use crate::*; +use spin1::Mutex as Spinlock; +use std::{any::Any, marker::PhantomData}; + +#[cfg(feature = "eventual-fairness")] +use nanorand::Rng; + +// A unique token corresponding to an event in a selector +type Token = usize; + +struct SelectSignal( + thread::Thread, + Token, + AtomicBool, + Arc<Spinlock<VecDeque<Token>>>, +); + +impl Signal for SelectSignal { + fn fire(&self) -> bool { + self.2.store(true, Ordering::SeqCst); + self.3.lock().push_back(self.1); + self.0.unpark(); + false + } + + fn as_any(&self) -> &(dyn Any + 'static) { + self + } + fn as_ptr(&self) -> *const () { + self as *const _ as *const () + } +} + +trait Selection<'a, T> { + fn init(&mut self) -> Option<T>; + fn poll(&mut self) -> Option<T>; + fn deinit(&mut self); +} + +/// An error that may be emitted when attempting to wait for a value on a receiver. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum SelectError { + /// A timeout occurred when waiting on a `Selector`. + Timeout, +} + +impl fmt::Display for SelectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SelectError::Timeout => "timeout occurred".fmt(f), + } + } +} + +impl std::error::Error for SelectError {} + +/// A type used to wait upon multiple blocking operations at once. +/// +/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour, +/// allowing a thread to wait upon the result of more than one operation at once. +/// +/// # Examples +/// ``` +/// let (tx0, rx0) = flume::unbounded(); +/// let (tx1, rx1) = flume::unbounded(); +/// +/// std::thread::spawn(move || { +/// tx0.send(true).unwrap(); +/// tx1.send(42).unwrap(); +/// }); +/// +/// flume::Selector::new() +/// .recv(&rx0, |b| println!("Received {:?}", b)) +/// .recv(&rx1, |n| println!("Received {:?}", n)) +/// .wait(); +/// ``` +pub struct Selector<'a, T: 'a> { + selections: Vec<Box<dyn Selection<'a, T> + 'a>>, + next_poll: usize, + signalled: Arc<Spinlock<VecDeque<Token>>>, + #[cfg(feature = "eventual-fairness")] + rng: nanorand::WyRand, + phantom: PhantomData<*const ()>, +} + +impl<'a, T: 'a> Default for Selector<'a, T> { + fn default() -> Self { + Self::new() + } +} + +impl<'a, T: 'a> fmt::Debug for Selector<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Selector").finish() + } +} + +impl<'a, T> Selector<'a, T> { + /// Create a new selector. + pub fn new() -> Self { + Self { + selections: Vec::new(), + next_poll: 0, + signalled: Arc::default(), + phantom: PhantomData::default(), + #[cfg(feature = "eventual-fairness")] + rng: nanorand::WyRand::new(), + } + } + + /// Add a send operation to the selector that sends the provided value. + /// + /// Once added, the selector can be used to run the provided handler function on completion of this operation. + pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>( + mut self, + sender: &'a Sender<U>, + msg: U, + mapper: F, + ) -> Self { + struct SendSelection<'a, T, F, U> { + sender: &'a Sender<U>, + msg: Option<U>, + token: Token, + signalled: Arc<Spinlock<VecDeque<Token>>>, + hook: Option<Arc<Hook<U, SelectSignal>>>, + mapper: F, + phantom: PhantomData<T>, + } + + impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U> + where + F: FnMut(Result<(), SendError<U>>) -> T, + { + fn init(&mut self) -> Option<T> { + let token = self.token; + let signalled = self.signalled.clone(); + let r = self.sender.shared.send( + self.msg.take().unwrap(), + true, + |msg| { + Hook::slot( + Some(msg), + SelectSignal( + thread::current(), + token, + AtomicBool::new(false), + signalled, + ), + ) + }, + // Always runs + |h| { + self.hook = Some(h); + Ok(()) + }, + ); + + if self.hook.is_none() { + Some((self.mapper)(match r { + Ok(()) => Ok(()), + Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)), + _ => unreachable!(), + })) + } else { + None + } + } + + fn poll(&mut self) -> Option<T> { + let res = if self.sender.shared.is_disconnected() { + // Check the hook one last time + if let Some(msg) = self.hook.as_ref()?.try_take() { + Err(SendError(msg)) + } else { + Ok(()) + } + } else if self.hook.as_ref().unwrap().is_empty() { + // The message was sent + Ok(()) + } else { + return None; + }; + + Some((&mut self.mapper)(res)) + } + + fn deinit(&mut self) { + if let Some(hook) = self.hook.take() { + // Remove hook + let hook: Arc<Hook<U, dyn Signal>> = hook; + wait_lock(&self.sender.shared.chan) + .sending + .as_mut() + .unwrap() + .1 + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + } + } + } + + let token = self.selections.len(); + self.selections.push(Box::new(SendSelection { + sender, + msg: Some(msg), + token, + signalled: self.signalled.clone(), + hook: None, + mapper, + phantom: Default::default(), + })); + + self + } + + /// Add a receive operation to the selector. + /// + /// Once added, the selector can be used to run the provided handler function on completion of this operation. + pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>( + mut self, + receiver: &'a Receiver<U>, + mapper: F, + ) -> Self { + struct RecvSelection<'a, T, F, U> { + receiver: &'a Receiver<U>, + token: Token, + signalled: Arc<Spinlock<VecDeque<Token>>>, + hook: Option<Arc<Hook<U, SelectSignal>>>, + mapper: F, + received: bool, + phantom: PhantomData<T>, + } + + impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U> + where + F: FnMut(Result<U, RecvError>) -> T, + { + fn init(&mut self) -> Option<T> { + let token = self.token; + let signalled = self.signalled.clone(); + let r = self.receiver.shared.recv( + true, + || { + Hook::trigger(SelectSignal( + thread::current(), + token, + AtomicBool::new(false), + signalled, + )) + }, + // Always runs + |h| { + self.hook = Some(h); + Err(TryRecvTimeoutError::Timeout) + }, + ); + + if self.hook.is_none() { + Some((self.mapper)(match r { + Ok(msg) => Ok(msg), + Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected), + _ => unreachable!(), + })) + } else { + None + } + } + + fn poll(&mut self) -> Option<T> { + let res = if let Ok(msg) = self.receiver.try_recv() { + self.received = true; + Ok(msg) + } else if self.receiver.shared.is_disconnected() { + Err(RecvError::Disconnected) + } else { + return None; + }; + + Some((&mut self.mapper)(res)) + } + + fn deinit(&mut self) { + if let Some(hook) = self.hook.take() { + // Remove hook + let hook: Arc<Hook<U, dyn Signal>> = hook; + wait_lock(&self.receiver.shared.chan) + .waiting + .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); + // If we were woken, but never polled, wake up another + if !self.received + && hook + .signal() + .as_any() + .downcast_ref::<SelectSignal>() + .unwrap() + .2 + .load(Ordering::SeqCst) + { + wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending(); + } + } + } + } + + let token = self.selections.len(); + self.selections.push(Box::new(RecvSelection { + receiver, + token, + signalled: self.signalled.clone(), + hook: None, + mapper, + received: false, + phantom: Default::default(), + })); + + self + } + + fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> { + #[cfg(feature = "eventual-fairness")] + { + self.next_poll = self.rng.generate_range(0..self.selections.len()); + } + + let res = 'outer: loop { + // Init signals + for _ in 0..self.selections.len() { + if let Some(val) = self.selections[self.next_poll].init() { + break 'outer Some(val); + } + self.next_poll = (self.next_poll + 1) % self.selections.len(); + } + + // Speculatively poll + if let Some(msg) = self.poll() { + break 'outer Some(msg); + } + + loop { + if let Some(deadline) = deadline { + if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + thread::park_timeout(dur); + } + } else { + thread::park(); + } + + if deadline.map(|d| Instant::now() >= d).unwrap_or(false) { + break 'outer self.poll(); + } + + let token = if let Some(token) = self.signalled.lock().pop_front() { + token + } else { + // Spurious wakeup, park again + continue; + }; + + // Attempt to receive a message + if let Some(msg) = self.selections[token].poll() { + break 'outer Some(msg); + } + } + }; + + // Deinit signals + for s in &mut self.selections { + s.deinit(); + } + + res + } + + fn poll(&mut self) -> Option<T> { + for _ in 0..self.selections.len() { + if let Some(val) = self.selections[self.next_poll].poll() { + return Some(val); + } + self.next_poll = (self.next_poll + 1) % self.selections.len(); + } + None + } + + /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness` + /// feature flag is enabled, this method is fair and will handle a random event of those that are ready. + pub fn wait(self) -> T { + self.wait_inner(None).unwrap() + } + + /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the + /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that + /// are ready. + pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> { + self.wait_inner(Some(Instant::now() + dur)) + .ok_or(SelectError::Timeout) + } + + /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached. + /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those + /// that are ready. + pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> { + self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout) + } +} |