//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. use crate::*; use spin1::Mutex as Spinlock; use std::{any::Any, marker::PhantomData}; #[cfg(feature = "eventual-fairness")] use nanorand::Rng; // A unique token corresponding to an event in a selector type Token = usize; struct SelectSignal( thread::Thread, Token, AtomicBool, Arc>>, ); impl Signal for SelectSignal { fn fire(&self) -> bool { self.2.store(true, Ordering::SeqCst); self.3.lock().push_back(self.1); self.0.unpark(); false } fn as_any(&self) -> &(dyn Any + 'static) { self } fn as_ptr(&self) -> *const () { self as *const _ as *const () } } trait Selection<'a, T> { fn init(&mut self) -> Option; fn poll(&mut self) -> Option; fn deinit(&mut self); } /// An error that may be emitted when attempting to wait for a value on a receiver. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum SelectError { /// A timeout occurred when waiting on a `Selector`. Timeout, } impl fmt::Display for SelectError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SelectError::Timeout => "timeout occurred".fmt(f), } } } impl std::error::Error for SelectError {} /// A type used to wait upon multiple blocking operations at once. /// /// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour, /// allowing a thread to wait upon the result of more than one operation at once. /// /// # Examples /// ``` /// let (tx0, rx0) = flume::unbounded(); /// let (tx1, rx1) = flume::unbounded(); /// /// std::thread::spawn(move || { /// tx0.send(true).unwrap(); /// tx1.send(42).unwrap(); /// }); /// /// flume::Selector::new() /// .recv(&rx0, |b| println!("Received {:?}", b)) /// .recv(&rx1, |n| println!("Received {:?}", n)) /// .wait(); /// ``` pub struct Selector<'a, T: 'a> { selections: Vec + 'a>>, next_poll: usize, signalled: Arc>>, #[cfg(feature = "eventual-fairness")] rng: nanorand::WyRand, phantom: PhantomData<*const ()>, } impl<'a, T: 'a> Default for Selector<'a, T> { fn default() -> Self { Self::new() } } impl<'a, T: 'a> fmt::Debug for Selector<'a, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Selector").finish() } } impl<'a, T> Selector<'a, T> { /// Create a new selector. pub fn new() -> Self { Self { selections: Vec::new(), next_poll: 0, signalled: Arc::default(), phantom: PhantomData::default(), #[cfg(feature = "eventual-fairness")] rng: nanorand::WyRand::new(), } } /// Add a send operation to the selector that sends the provided value. /// /// Once added, the selector can be used to run the provided handler function on completion of this operation. pub fn send>) -> T + 'a>( mut self, sender: &'a Sender, msg: U, mapper: F, ) -> Self { struct SendSelection<'a, T, F, U> { sender: &'a Sender, msg: Option, token: Token, signalled: Arc>>, hook: Option>>, mapper: F, phantom: PhantomData, } impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U> where F: FnMut(Result<(), SendError>) -> T, { fn init(&mut self) -> Option { let token = self.token; let signalled = self.signalled.clone(); let r = self.sender.shared.send( self.msg.take().unwrap(), true, |msg| { Hook::slot( Some(msg), SelectSignal( thread::current(), token, AtomicBool::new(false), signalled, ), ) }, // Always runs |h| { self.hook = Some(h); Ok(()) }, ); if self.hook.is_none() { Some((self.mapper)(match r { Ok(()) => Ok(()), Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)), _ => unreachable!(), })) } else { None } } fn poll(&mut self) -> Option { let res = if self.sender.shared.is_disconnected() { // Check the hook one last time if let Some(msg) = self.hook.as_ref()?.try_take() { Err(SendError(msg)) } else { Ok(()) } } else if self.hook.as_ref().unwrap().is_empty() { // The message was sent Ok(()) } else { return None; }; Some((&mut self.mapper)(res)) } fn deinit(&mut self) { if let Some(hook) = self.hook.take() { // Remove hook let hook: Arc> = hook; wait_lock(&self.sender.shared.chan) .sending .as_mut() .unwrap() .1 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); } } } let token = self.selections.len(); self.selections.push(Box::new(SendSelection { sender, msg: Some(msg), token, signalled: self.signalled.clone(), hook: None, mapper, phantom: Default::default(), })); self } /// Add a receive operation to the selector. /// /// Once added, the selector can be used to run the provided handler function on completion of this operation. pub fn recv) -> T + 'a>( mut self, receiver: &'a Receiver, mapper: F, ) -> Self { struct RecvSelection<'a, T, F, U> { receiver: &'a Receiver, token: Token, signalled: Arc>>, hook: Option>>, mapper: F, received: bool, phantom: PhantomData, } impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U> where F: FnMut(Result) -> T, { fn init(&mut self) -> Option { let token = self.token; let signalled = self.signalled.clone(); let r = self.receiver.shared.recv( true, || { Hook::trigger(SelectSignal( thread::current(), token, AtomicBool::new(false), signalled, )) }, // Always runs |h| { self.hook = Some(h); Err(TryRecvTimeoutError::Timeout) }, ); if self.hook.is_none() { Some((self.mapper)(match r { Ok(msg) => Ok(msg), Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected), _ => unreachable!(), })) } else { None } } fn poll(&mut self) -> Option { let res = if let Ok(msg) = self.receiver.try_recv() { self.received = true; Ok(msg) } else if self.receiver.shared.is_disconnected() { Err(RecvError::Disconnected) } else { return None; }; Some((&mut self.mapper)(res)) } fn deinit(&mut self) { if let Some(hook) = self.hook.take() { // Remove hook let hook: Arc> = hook; wait_lock(&self.receiver.shared.chan) .waiting .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); // If we were woken, but never polled, wake up another if !self.received && hook .signal() .as_any() .downcast_ref::() .unwrap() .2 .load(Ordering::SeqCst) { wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending(); } } } } let token = self.selections.len(); self.selections.push(Box::new(RecvSelection { receiver, token, signalled: self.signalled.clone(), hook: None, mapper, received: false, phantom: Default::default(), })); self } fn wait_inner(mut self, deadline: Option) -> Option { #[cfg(feature = "eventual-fairness")] { self.next_poll = self.rng.generate_range(0..self.selections.len()); } let res = 'outer: loop { // Init signals for _ in 0..self.selections.len() { if let Some(val) = self.selections[self.next_poll].init() { break 'outer Some(val); } self.next_poll = (self.next_poll + 1) % self.selections.len(); } // Speculatively poll if let Some(msg) = self.poll() { break 'outer Some(msg); } loop { if let Some(deadline) = deadline { if let Some(dur) = deadline.checked_duration_since(Instant::now()) { thread::park_timeout(dur); } } else { thread::park(); } if deadline.map(|d| Instant::now() >= d).unwrap_or(false) { break 'outer self.poll(); } let token = if let Some(token) = self.signalled.lock().pop_front() { token } else { // Spurious wakeup, park again continue; }; // Attempt to receive a message if let Some(msg) = self.selections[token].poll() { break 'outer Some(msg); } } }; // Deinit signals for s in &mut self.selections { s.deinit(); } res } fn poll(&mut self) -> Option { for _ in 0..self.selections.len() { if let Some(val) = self.selections[self.next_poll].poll() { return Some(val); } self.next_poll = (self.next_poll + 1) % self.selections.len(); } None } /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness` /// feature flag is enabled, this method is fair and will handle a random event of those that are ready. pub fn wait(self) -> T { self.wait_inner(None).unwrap() } /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that /// are ready. pub fn wait_timeout(self, dur: Duration) -> Result { self.wait_inner(Some(Instant::now() + dur)) .ok_or(SelectError::Timeout) } /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached. /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those /// that are ready. pub fn wait_deadline(self, deadline: Instant) -> Result { self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout) } }