From 1b6a04ca5504955c571d1c97504fb45ea0befee4 Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Mon, 8 Jan 2024 01:21:28 +0400 Subject: Initial vendor packages Signed-off-by: Valentin Popov --- vendor/crossbeam-epoch/src/sync/queue.rs | 468 +++++++++++++++++++++++++++++++ 1 file changed, 468 insertions(+) create mode 100644 vendor/crossbeam-epoch/src/sync/queue.rs (limited to 'vendor/crossbeam-epoch/src/sync/queue.rs') diff --git a/vendor/crossbeam-epoch/src/sync/queue.rs b/vendor/crossbeam-epoch/src/sync/queue.rs new file mode 100644 index 0000000..76c326b --- /dev/null +++ b/vendor/crossbeam-epoch/src/sync/queue.rs @@ -0,0 +1,468 @@ +//! Michael-Scott lock-free queue. +//! +//! Usable with any number of producers and consumers. +//! +//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue +//! Algorithms. PODC 1996. +//! +//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a +//! Practical Lock-Free Queue Algorithm. + +use core::mem::MaybeUninit; +use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +use crossbeam_utils::CachePadded; + +use crate::{unprotected, Atomic, Guard, Owned, Shared}; + +// The representation here is a singly-linked list, with a sentinel node at the front. In general +// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or +// all `Blocked` (requests for data from blocked threads). +#[derive(Debug)] +pub(crate) struct Queue { + head: CachePadded>>, + tail: CachePadded>>, +} + +struct Node { + /// The slot in which a value of type `T` can be stored. + /// + /// The type of `data` is `MaybeUninit` because a `Node` doesn't always contain a `T`. + /// For example, the sentinel node in a queue never contains a value: its slot is always empty. + /// Other nodes start their life with a push operation and contain a value until it gets popped + /// out. After that such empty nodes get added to the collector for destruction. + data: MaybeUninit, + + next: Atomic>, +} + +// Any particular `T` should never be accessed concurrently, so no need for `Sync`. +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} + +impl Queue { + /// Create a new, empty queue. + pub(crate) fn new() -> Queue { + let q = Queue { + head: CachePadded::new(Atomic::null()), + tail: CachePadded::new(Atomic::null()), + }; + let sentinel = Owned::new(Node { + data: MaybeUninit::uninit(), + next: Atomic::null(), + }); + unsafe { + let guard = unprotected(); + let sentinel = sentinel.into_shared(guard); + q.head.store(sentinel, Relaxed); + q.tail.store(sentinel, Relaxed); + q + } + } + + /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on + /// success. The queue's `tail` pointer may be updated. + #[inline(always)] + fn push_internal( + &self, + onto: Shared<'_, Node>, + new: Shared<'_, Node>, + guard: &Guard, + ) -> bool { + // is `onto` the actual tail? + let o = unsafe { onto.deref() }; + let next = o.next.load(Acquire, guard); + if unsafe { next.as_ref().is_some() } { + // if not, try to "help" by moving the tail pointer forward + let _ = self + .tail + .compare_exchange(onto, next, Release, Relaxed, guard); + false + } else { + // looks like the actual tail; attempt to link in `n` + let result = o + .next + .compare_exchange(Shared::null(), new, Release, Relaxed, guard) + .is_ok(); + if result { + // try to move the tail pointer forward + let _ = self + .tail + .compare_exchange(onto, new, Release, Relaxed, guard); + } + result + } + } + + /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. + pub(crate) fn push(&self, t: T, guard: &Guard) { + let new = Owned::new(Node { + data: MaybeUninit::new(t), + next: Atomic::null(), + }); + let new = Owned::into_shared(new, guard); + + loop { + // We push onto the tail, so we'll start optimistically by looking there first. + let tail = self.tail.load(Acquire, guard); + + // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. + if self.push_internal(tail, new, guard) { + break; + } + } + } + + /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. + #[inline(always)] + fn pop_internal(&self, guard: &Guard) -> Result, ()> { + let head = self.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + let next = h.next.load(Acquire, guard); + match unsafe { next.as_ref() } { + Some(n) => unsafe { + self.head + .compare_exchange(head, next, Release, Relaxed, guard) + .map(|_| { + let tail = self.tail.load(Relaxed, guard); + // Advance the tail so that we don't retire a pointer to a reachable node. + if head == tail { + let _ = self + .tail + .compare_exchange(tail, next, Release, Relaxed, guard); + } + guard.defer_destroy(head); + Some(n.data.assume_init_read()) + }) + .map_err(|_| ()) + }, + None => Ok(None), + } + } + + /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue + /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. + #[inline(always)] + fn pop_if_internal(&self, condition: F, guard: &Guard) -> Result, ()> + where + T: Sync, + F: Fn(&T) -> bool, + { + let head = self.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + let next = h.next.load(Acquire, guard); + match unsafe { next.as_ref() } { + Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { + self.head + .compare_exchange(head, next, Release, Relaxed, guard) + .map(|_| { + let tail = self.tail.load(Relaxed, guard); + // Advance the tail so that we don't retire a pointer to a reachable node. + if head == tail { + let _ = self + .tail + .compare_exchange(tail, next, Release, Relaxed, guard); + } + guard.defer_destroy(head); + Some(n.data.assume_init_read()) + }) + .map_err(|_| ()) + }, + None | Some(_) => Ok(None), + } + } + + /// Attempts to dequeue from the front. + /// + /// Returns `None` if the queue is observed to be empty. + pub(crate) fn try_pop(&self, guard: &Guard) -> Option { + loop { + if let Ok(head) = self.pop_internal(guard) { + return head; + } + } + } + + /// Attempts to dequeue from the front, if the item satisfies the given condition. + /// + /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given + /// condition. + pub(crate) fn try_pop_if(&self, condition: F, guard: &Guard) -> Option + where + T: Sync, + F: Fn(&T) -> bool, + { + loop { + if let Ok(head) = self.pop_if_internal(&condition, guard) { + return head; + } + } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let guard = unprotected(); + + while self.try_pop(guard).is_some() {} + + // Destroy the remaining sentinel node. + let sentinel = self.head.load(Relaxed, guard); + drop(sentinel.into_owned()); + } + } +} + +#[cfg(all(test, not(crossbeam_loom)))] +mod test { + use super::*; + use crate::pin; + use crossbeam_utils::thread; + + struct Queue { + queue: super::Queue, + } + + impl Queue { + pub(crate) fn new() -> Queue { + Queue { + queue: super::Queue::new(), + } + } + + pub(crate) fn push(&self, t: T) { + let guard = &pin(); + self.queue.push(t, guard); + } + + pub(crate) fn is_empty(&self) -> bool { + let guard = &pin(); + let head = self.queue.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + h.next.load(Acquire, guard).is_null() + } + + pub(crate) fn try_pop(&self) -> Option { + let guard = &pin(); + self.queue.try_pop(guard) + } + + pub(crate) fn pop(&self) -> T { + loop { + match self.try_pop() { + None => continue, + Some(t) => return t, + } + } + } + } + + #[cfg(miri)] + const CONC_COUNT: i64 = 1000; + #[cfg(not(miri))] + const CONC_COUNT: i64 = 1000000; + + #[test] + fn push_try_pop_1() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + q.push(37); + assert!(!q.is_empty()); + assert_eq!(q.try_pop(), Some(37)); + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_2() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + q.push(37); + q.push(48); + assert_eq!(q.try_pop(), Some(37)); + assert!(!q.is_empty()); + assert_eq!(q.try_pop(), Some(48)); + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_many_seq() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + for i in 0..200 { + q.push(i) + } + assert!(!q.is_empty()); + for i in 0..200 { + assert_eq!(q.try_pop(), Some(i)); + } + assert!(q.is_empty()); + } + + #[test] + fn push_pop_1() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + q.push(37); + assert!(!q.is_empty()); + assert_eq!(q.pop(), 37); + assert!(q.is_empty()); + } + + #[test] + fn push_pop_2() { + let q: Queue = Queue::new(); + q.push(37); + q.push(48); + assert_eq!(q.pop(), 37); + assert_eq!(q.pop(), 48); + } + + #[test] + fn push_pop_many_seq() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + for i in 0..200 { + q.push(i) + } + assert!(!q.is_empty()); + for i in 0..200 { + assert_eq!(q.pop(), i); + } + assert!(q.is_empty()); + } + + #[test] + fn push_try_pop_many_spsc() { + let q: Queue = Queue::new(); + assert!(q.is_empty()); + + thread::scope(|scope| { + scope.spawn(|_| { + let mut next = 0; + + while next < CONC_COUNT { + if let Some(elem) = q.try_pop() { + assert_eq!(elem, next); + next += 1; + } + } + }); + + for i in 0..CONC_COUNT { + q.push(i) + } + }) + .unwrap(); + } + + #[test] + fn push_try_pop_many_spmc() { + fn recv(_t: i32, q: &Queue) { + let mut cur = -1; + for _i in 0..CONC_COUNT { + if let Some(elem) = q.try_pop() { + assert!(elem > cur); + cur = elem; + + if cur == CONC_COUNT - 1 { + break; + } + } + } + } + + let q: Queue = Queue::new(); + assert!(q.is_empty()); + thread::scope(|scope| { + for i in 0..3 { + let q = &q; + scope.spawn(move |_| recv(i, q)); + } + + scope.spawn(|_| { + for i in 0..CONC_COUNT { + q.push(i); + } + }); + }) + .unwrap(); + } + + #[test] + fn push_try_pop_many_mpmc() { + enum LR { + Left(i64), + Right(i64), + } + + let q: Queue = Queue::new(); + assert!(q.is_empty()); + + thread::scope(|scope| { + for _t in 0..2 { + scope.spawn(|_| { + for i in CONC_COUNT - 1..CONC_COUNT { + q.push(LR::Left(i)) + } + }); + scope.spawn(|_| { + for i in CONC_COUNT - 1..CONC_COUNT { + q.push(LR::Right(i)) + } + }); + scope.spawn(|_| { + let mut vl = vec![]; + let mut vr = vec![]; + for _i in 0..CONC_COUNT { + match q.try_pop() { + Some(LR::Left(x)) => vl.push(x), + Some(LR::Right(x)) => vr.push(x), + _ => {} + } + } + + let mut vl2 = vl.clone(); + let mut vr2 = vr.clone(); + vl2.sort_unstable(); + vr2.sort_unstable(); + + assert_eq!(vl, vl2); + assert_eq!(vr, vr2); + }); + } + }) + .unwrap(); + } + + #[test] + fn push_pop_many_spsc() { + let q: Queue = Queue::new(); + + thread::scope(|scope| { + scope.spawn(|_| { + let mut next = 0; + while next < CONC_COUNT { + assert_eq!(q.pop(), next); + next += 1; + } + }); + + for i in 0..CONC_COUNT { + q.push(i) + } + }) + .unwrap(); + assert!(q.is_empty()); + } + + #[test] + fn is_empty_dont_pop() { + let q: Queue = Queue::new(); + q.push(20); + q.push(20); + assert!(!q.is_empty()); + assert!(!q.is_empty()); + assert!(q.try_pop().is_some()); + } +} -- cgit v1.2.3