aboutsummaryrefslogtreecommitdiff
path: root/vendor/flume/src/select.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/flume/src/select.rs')
-rw-r--r--vendor/flume/src/select.rs405
1 files changed, 405 insertions, 0 deletions
diff --git a/vendor/flume/src/select.rs b/vendor/flume/src/select.rs
new file mode 100644
index 0000000..cef15aa
--- /dev/null
+++ b/vendor/flume/src/select.rs
@@ -0,0 +1,405 @@
+//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.
+
+use crate::*;
+use spin1::Mutex as Spinlock;
+use std::{any::Any, marker::PhantomData};
+
+#[cfg(feature = "eventual-fairness")]
+use nanorand::Rng;
+
+// A unique token corresponding to an event in a selector
+type Token = usize;
+
+struct SelectSignal(
+ thread::Thread,
+ Token,
+ AtomicBool,
+ Arc<Spinlock<VecDeque<Token>>>,
+);
+
+impl Signal for SelectSignal {
+ fn fire(&self) -> bool {
+ self.2.store(true, Ordering::SeqCst);
+ self.3.lock().push_back(self.1);
+ self.0.unpark();
+ false
+ }
+
+ fn as_any(&self) -> &(dyn Any + 'static) {
+ self
+ }
+ fn as_ptr(&self) -> *const () {
+ self as *const _ as *const ()
+ }
+}
+
+trait Selection<'a, T> {
+ fn init(&mut self) -> Option<T>;
+ fn poll(&mut self) -> Option<T>;
+ fn deinit(&mut self);
+}
+
+/// An error that may be emitted when attempting to wait for a value on a receiver.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum SelectError {
+ /// A timeout occurred when waiting on a `Selector`.
+ Timeout,
+}
+
+impl fmt::Display for SelectError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SelectError::Timeout => "timeout occurred".fmt(f),
+ }
+ }
+}
+
+impl std::error::Error for SelectError {}
+
+/// A type used to wait upon multiple blocking operations at once.
+///
+/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour,
+/// allowing a thread to wait upon the result of more than one operation at once.
+///
+/// # Examples
+/// ```
+/// let (tx0, rx0) = flume::unbounded();
+/// let (tx1, rx1) = flume::unbounded();
+///
+/// std::thread::spawn(move || {
+/// tx0.send(true).unwrap();
+/// tx1.send(42).unwrap();
+/// });
+///
+/// flume::Selector::new()
+/// .recv(&rx0, |b| println!("Received {:?}", b))
+/// .recv(&rx1, |n| println!("Received {:?}", n))
+/// .wait();
+/// ```
+pub struct Selector<'a, T: 'a> {
+ selections: Vec<Box<dyn Selection<'a, T> + 'a>>,
+ next_poll: usize,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ #[cfg(feature = "eventual-fairness")]
+ rng: nanorand::WyRand,
+ phantom: PhantomData<*const ()>,
+}
+
+impl<'a, T: 'a> Default for Selector<'a, T> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<'a, T: 'a> fmt::Debug for Selector<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Selector").finish()
+ }
+}
+
+impl<'a, T> Selector<'a, T> {
+ /// Create a new selector.
+ pub fn new() -> Self {
+ Self {
+ selections: Vec::new(),
+ next_poll: 0,
+ signalled: Arc::default(),
+ phantom: PhantomData::default(),
+ #[cfg(feature = "eventual-fairness")]
+ rng: nanorand::WyRand::new(),
+ }
+ }
+
+ /// Add a send operation to the selector that sends the provided value.
+ ///
+ /// Once added, the selector can be used to run the provided handler function on completion of this operation.
+ pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>(
+ mut self,
+ sender: &'a Sender<U>,
+ msg: U,
+ mapper: F,
+ ) -> Self {
+ struct SendSelection<'a, T, F, U> {
+ sender: &'a Sender<U>,
+ msg: Option<U>,
+ token: Token,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ hook: Option<Arc<Hook<U, SelectSignal>>>,
+ mapper: F,
+ phantom: PhantomData<T>,
+ }
+
+ impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U>
+ where
+ F: FnMut(Result<(), SendError<U>>) -> T,
+ {
+ fn init(&mut self) -> Option<T> {
+ let token = self.token;
+ let signalled = self.signalled.clone();
+ let r = self.sender.shared.send(
+ self.msg.take().unwrap(),
+ true,
+ |msg| {
+ Hook::slot(
+ Some(msg),
+ SelectSignal(
+ thread::current(),
+ token,
+ AtomicBool::new(false),
+ signalled,
+ ),
+ )
+ },
+ // Always runs
+ |h| {
+ self.hook = Some(h);
+ Ok(())
+ },
+ );
+
+ if self.hook.is_none() {
+ Some((self.mapper)(match r {
+ Ok(()) => Ok(()),
+ Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)),
+ _ => unreachable!(),
+ }))
+ } else {
+ None
+ }
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ let res = if self.sender.shared.is_disconnected() {
+ // Check the hook one last time
+ if let Some(msg) = self.hook.as_ref()?.try_take() {
+ Err(SendError(msg))
+ } else {
+ Ok(())
+ }
+ } else if self.hook.as_ref().unwrap().is_empty() {
+ // The message was sent
+ Ok(())
+ } else {
+ return None;
+ };
+
+ Some((&mut self.mapper)(res))
+ }
+
+ fn deinit(&mut self) {
+ if let Some(hook) = self.hook.take() {
+ // Remove hook
+ let hook: Arc<Hook<U, dyn Signal>> = hook;
+ wait_lock(&self.sender.shared.chan)
+ .sending
+ .as_mut()
+ .unwrap()
+ .1
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ }
+ }
+ }
+
+ let token = self.selections.len();
+ self.selections.push(Box::new(SendSelection {
+ sender,
+ msg: Some(msg),
+ token,
+ signalled: self.signalled.clone(),
+ hook: None,
+ mapper,
+ phantom: Default::default(),
+ }));
+
+ self
+ }
+
+ /// Add a receive operation to the selector.
+ ///
+ /// Once added, the selector can be used to run the provided handler function on completion of this operation.
+ pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>(
+ mut self,
+ receiver: &'a Receiver<U>,
+ mapper: F,
+ ) -> Self {
+ struct RecvSelection<'a, T, F, U> {
+ receiver: &'a Receiver<U>,
+ token: Token,
+ signalled: Arc<Spinlock<VecDeque<Token>>>,
+ hook: Option<Arc<Hook<U, SelectSignal>>>,
+ mapper: F,
+ received: bool,
+ phantom: PhantomData<T>,
+ }
+
+ impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U>
+ where
+ F: FnMut(Result<U, RecvError>) -> T,
+ {
+ fn init(&mut self) -> Option<T> {
+ let token = self.token;
+ let signalled = self.signalled.clone();
+ let r = self.receiver.shared.recv(
+ true,
+ || {
+ Hook::trigger(SelectSignal(
+ thread::current(),
+ token,
+ AtomicBool::new(false),
+ signalled,
+ ))
+ },
+ // Always runs
+ |h| {
+ self.hook = Some(h);
+ Err(TryRecvTimeoutError::Timeout)
+ },
+ );
+
+ if self.hook.is_none() {
+ Some((self.mapper)(match r {
+ Ok(msg) => Ok(msg),
+ Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected),
+ _ => unreachable!(),
+ }))
+ } else {
+ None
+ }
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ let res = if let Ok(msg) = self.receiver.try_recv() {
+ self.received = true;
+ Ok(msg)
+ } else if self.receiver.shared.is_disconnected() {
+ Err(RecvError::Disconnected)
+ } else {
+ return None;
+ };
+
+ Some((&mut self.mapper)(res))
+ }
+
+ fn deinit(&mut self) {
+ if let Some(hook) = self.hook.take() {
+ // Remove hook
+ let hook: Arc<Hook<U, dyn Signal>> = hook;
+ wait_lock(&self.receiver.shared.chan)
+ .waiting
+ .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
+ // If we were woken, but never polled, wake up another
+ if !self.received
+ && hook
+ .signal()
+ .as_any()
+ .downcast_ref::<SelectSignal>()
+ .unwrap()
+ .2
+ .load(Ordering::SeqCst)
+ {
+ wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending();
+ }
+ }
+ }
+ }
+
+ let token = self.selections.len();
+ self.selections.push(Box::new(RecvSelection {
+ receiver,
+ token,
+ signalled: self.signalled.clone(),
+ hook: None,
+ mapper,
+ received: false,
+ phantom: Default::default(),
+ }));
+
+ self
+ }
+
+ fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> {
+ #[cfg(feature = "eventual-fairness")]
+ {
+ self.next_poll = self.rng.generate_range(0..self.selections.len());
+ }
+
+ let res = 'outer: loop {
+ // Init signals
+ for _ in 0..self.selections.len() {
+ if let Some(val) = self.selections[self.next_poll].init() {
+ break 'outer Some(val);
+ }
+ self.next_poll = (self.next_poll + 1) % self.selections.len();
+ }
+
+ // Speculatively poll
+ if let Some(msg) = self.poll() {
+ break 'outer Some(msg);
+ }
+
+ loop {
+ if let Some(deadline) = deadline {
+ if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
+ thread::park_timeout(dur);
+ }
+ } else {
+ thread::park();
+ }
+
+ if deadline.map(|d| Instant::now() >= d).unwrap_or(false) {
+ break 'outer self.poll();
+ }
+
+ let token = if let Some(token) = self.signalled.lock().pop_front() {
+ token
+ } else {
+ // Spurious wakeup, park again
+ continue;
+ };
+
+ // Attempt to receive a message
+ if let Some(msg) = self.selections[token].poll() {
+ break 'outer Some(msg);
+ }
+ }
+ };
+
+ // Deinit signals
+ for s in &mut self.selections {
+ s.deinit();
+ }
+
+ res
+ }
+
+ fn poll(&mut self) -> Option<T> {
+ for _ in 0..self.selections.len() {
+ if let Some(val) = self.selections[self.next_poll].poll() {
+ return Some(val);
+ }
+ self.next_poll = (self.next_poll + 1) % self.selections.len();
+ }
+ None
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness`
+ /// feature flag is enabled, this method is fair and will handle a random event of those that are ready.
+ pub fn wait(self) -> T {
+ self.wait_inner(None).unwrap()
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the
+ /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that
+ /// are ready.
+ pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> {
+ self.wait_inner(Some(Instant::now() + dur))
+ .ok_or(SelectError::Timeout)
+ }
+
+ /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached.
+ /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those
+ /// that are ready.
+ pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> {
+ self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout)
+ }
+}