diff options
Diffstat (limited to 'vendor/flume/src/select.rs')
-rw-r--r-- | vendor/flume/src/select.rs | 405 |
1 files changed, 0 insertions, 405 deletions
diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs deleted file mode 100644 index cef15aa..0000000 --- a/vendor/flume/src/select.rs +++ /dev/null @@ -1,405 +0,0 @@ -//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. - -use crate::*; -use spin1::Mutex as Spinlock; -use std::{any::Any, marker::PhantomData}; - -#[cfg(feature = "eventual-fairness")] -use nanorand::Rng; - -// A unique token corresponding to an event in a selector -type Token = usize; - -struct SelectSignal( - thread::Thread, - Token, - AtomicBool, - Arc<Spinlock<VecDeque<Token>>>, -); - -impl Signal for SelectSignal { - fn fire(&self) -> bool { - self.2.store(true, Ordering::SeqCst); - self.3.lock().push_back(self.1); - self.0.unpark(); - false - } - - fn as_any(&self) -> &(dyn Any + 'static) { - self - } - fn as_ptr(&self) -> *const () { - self as *const _ as *const () - } -} - -trait Selection<'a, T> { - fn init(&mut self) -> Option<T>; - fn poll(&mut self) -> Option<T>; - fn deinit(&mut self); -} - -/// An error that may be emitted when attempting to wait for a value on a receiver. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum SelectError { - /// A timeout occurred when waiting on a `Selector`. - Timeout, -} - -impl fmt::Display for SelectError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SelectError::Timeout => "timeout occurred".fmt(f), - } - } -} - -impl std::error::Error for SelectError {} - -/// A type used to wait upon multiple blocking operations at once. -/// -/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour, -/// allowing a thread to wait upon the result of more than one operation at once. -/// -/// # Examples -/// ``` -/// let (tx0, rx0) = flume::unbounded(); -/// let (tx1, rx1) = flume::unbounded(); -/// -/// std::thread::spawn(move || { -/// tx0.send(true).unwrap(); -/// tx1.send(42).unwrap(); -/// }); -/// -/// flume::Selector::new() -/// .recv(&rx0, |b| println!("Received {:?}", b)) -/// .recv(&rx1, |n| println!("Received {:?}", n)) -/// .wait(); -/// ``` -pub struct Selector<'a, T: 'a> { - selections: Vec<Box<dyn Selection<'a, T> + 'a>>, - next_poll: usize, - signalled: Arc<Spinlock<VecDeque<Token>>>, - #[cfg(feature = "eventual-fairness")] - rng: nanorand::WyRand, - phantom: PhantomData<*const ()>, -} - -impl<'a, T: 'a> Default for Selector<'a, T> { - fn default() -> Self { - Self::new() - } -} - -impl<'a, T: 'a> fmt::Debug for Selector<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Selector").finish() - } -} - -impl<'a, T> Selector<'a, T> { - /// Create a new selector. - pub fn new() -> Self { - Self { - selections: Vec::new(), - next_poll: 0, - signalled: Arc::default(), - phantom: PhantomData::default(), - #[cfg(feature = "eventual-fairness")] - rng: nanorand::WyRand::new(), - } - } - - /// Add a send operation to the selector that sends the provided value. - /// - /// Once added, the selector can be used to run the provided handler function on completion of this operation. - pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>( - mut self, - sender: &'a Sender<U>, - msg: U, - mapper: F, - ) -> Self { - struct SendSelection<'a, T, F, U> { - sender: &'a Sender<U>, - msg: Option<U>, - token: Token, - signalled: Arc<Spinlock<VecDeque<Token>>>, - hook: Option<Arc<Hook<U, SelectSignal>>>, - mapper: F, - phantom: PhantomData<T>, - } - - impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U> - where - F: FnMut(Result<(), SendError<U>>) -> T, - { - fn init(&mut self) -> Option<T> { - let token = self.token; - let signalled = self.signalled.clone(); - let r = self.sender.shared.send( - self.msg.take().unwrap(), - true, - |msg| { - Hook::slot( - Some(msg), - SelectSignal( - thread::current(), - token, - AtomicBool::new(false), - signalled, - ), - ) - }, - // Always runs - |h| { - self.hook = Some(h); - Ok(()) - }, - ); - - if self.hook.is_none() { - Some((self.mapper)(match r { - Ok(()) => Ok(()), - Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)), - _ => unreachable!(), - })) - } else { - None - } - } - - fn poll(&mut self) -> Option<T> { - let res = if self.sender.shared.is_disconnected() { - // Check the hook one last time - if let Some(msg) = self.hook.as_ref()?.try_take() { - Err(SendError(msg)) - } else { - Ok(()) - } - } else if self.hook.as_ref().unwrap().is_empty() { - // The message was sent - Ok(()) - } else { - return None; - }; - - Some((&mut self.mapper)(res)) - } - - fn deinit(&mut self) { - if let Some(hook) = self.hook.take() { - // Remove hook - let hook: Arc<Hook<U, dyn Signal>> = hook; - wait_lock(&self.sender.shared.chan) - .sending - .as_mut() - .unwrap() - .1 - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - } - } - } - - let token = self.selections.len(); - self.selections.push(Box::new(SendSelection { - sender, - msg: Some(msg), - token, - signalled: self.signalled.clone(), - hook: None, - mapper, - phantom: Default::default(), - })); - - self - } - - /// Add a receive operation to the selector. - /// - /// Once added, the selector can be used to run the provided handler function on completion of this operation. - pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>( - mut self, - receiver: &'a Receiver<U>, - mapper: F, - ) -> Self { - struct RecvSelection<'a, T, F, U> { - receiver: &'a Receiver<U>, - token: Token, - signalled: Arc<Spinlock<VecDeque<Token>>>, - hook: Option<Arc<Hook<U, SelectSignal>>>, - mapper: F, - received: bool, - phantom: PhantomData<T>, - } - - impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U> - where - F: FnMut(Result<U, RecvError>) -> T, - { - fn init(&mut self) -> Option<T> { - let token = self.token; - let signalled = self.signalled.clone(); - let r = self.receiver.shared.recv( - true, - || { - Hook::trigger(SelectSignal( - thread::current(), - token, - AtomicBool::new(false), - signalled, - )) - }, - // Always runs - |h| { - self.hook = Some(h); - Err(TryRecvTimeoutError::Timeout) - }, - ); - - if self.hook.is_none() { - Some((self.mapper)(match r { - Ok(msg) => Ok(msg), - Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected), - _ => unreachable!(), - })) - } else { - None - } - } - - fn poll(&mut self) -> Option<T> { - let res = if let Ok(msg) = self.receiver.try_recv() { - self.received = true; - Ok(msg) - } else if self.receiver.shared.is_disconnected() { - Err(RecvError::Disconnected) - } else { - return None; - }; - - Some((&mut self.mapper)(res)) - } - - fn deinit(&mut self) { - if let Some(hook) = self.hook.take() { - // Remove hook - let hook: Arc<Hook<U, dyn Signal>> = hook; - wait_lock(&self.receiver.shared.chan) - .waiting - .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); - // If we were woken, but never polled, wake up another - if !self.received - && hook - .signal() - .as_any() - .downcast_ref::<SelectSignal>() - .unwrap() - .2 - .load(Ordering::SeqCst) - { - wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending(); - } - } - } - } - - let token = self.selections.len(); - self.selections.push(Box::new(RecvSelection { - receiver, - token, - signalled: self.signalled.clone(), - hook: None, - mapper, - received: false, - phantom: Default::default(), - })); - - self - } - - fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> { - #[cfg(feature = "eventual-fairness")] - { - self.next_poll = self.rng.generate_range(0..self.selections.len()); - } - - let res = 'outer: loop { - // Init signals - for _ in 0..self.selections.len() { - if let Some(val) = self.selections[self.next_poll].init() { - break 'outer Some(val); - } - self.next_poll = (self.next_poll + 1) % self.selections.len(); - } - - // Speculatively poll - if let Some(msg) = self.poll() { - break 'outer Some(msg); - } - - loop { - if let Some(deadline) = deadline { - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - thread::park_timeout(dur); - } - } else { - thread::park(); - } - - if deadline.map(|d| Instant::now() >= d).unwrap_or(false) { - break 'outer self.poll(); - } - - let token = if let Some(token) = self.signalled.lock().pop_front() { - token - } else { - // Spurious wakeup, park again - continue; - }; - - // Attempt to receive a message - if let Some(msg) = self.selections[token].poll() { - break 'outer Some(msg); - } - } - }; - - // Deinit signals - for s in &mut self.selections { - s.deinit(); - } - - res - } - - fn poll(&mut self) -> Option<T> { - for _ in 0..self.selections.len() { - if let Some(val) = self.selections[self.next_poll].poll() { - return Some(val); - } - self.next_poll = (self.next_poll + 1) % self.selections.len(); - } - None - } - - /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness` - /// feature flag is enabled, this method is fair and will handle a random event of those that are ready. - pub fn wait(self) -> T { - self.wait_inner(None).unwrap() - } - - /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the - /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that - /// are ready. - pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> { - self.wait_inner(Some(Instant::now() + dur)) - .ok_or(SelectError::Timeout) - } - - /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached. - /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those - /// that are ready. - pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> { - self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout) - } -} |