summaryrefslogtreecommitdiff
path: root/vendor/crossbeam-epoch/src/sync
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/crossbeam-epoch/src/sync
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/crossbeam-epoch/src/sync')
-rw-r--r--vendor/crossbeam-epoch/src/sync/list.rs487
-rw-r--r--vendor/crossbeam-epoch/src/sync/mod.rs7
-rw-r--r--vendor/crossbeam-epoch/src/sync/once_lock.rs88
-rw-r--r--vendor/crossbeam-epoch/src/sync/queue.rs468
4 files changed, 1050 insertions, 0 deletions
diff --git a/vendor/crossbeam-epoch/src/sync/list.rs b/vendor/crossbeam-epoch/src/sync/list.rs
new file mode 100644
index 0000000..52ffd6f
--- /dev/null
+++ b/vendor/crossbeam-epoch/src/sync/list.rs
@@ -0,0 +1,487 @@
+//! Lock-free intrusive linked list.
+//!
+//! Ideas from Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA
+//! 2002. <http://dl.acm.org/citation.cfm?id=564870.564881>
+
+use core::marker::PhantomData;
+use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+
+use crate::{unprotected, Atomic, Guard, Shared};
+
+/// An entry in a linked list.
+///
+/// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different
+/// cache-line than thread-local data in terms of performance.
+#[derive(Debug)]
+pub(crate) struct Entry {
+ /// The next entry in the linked list.
+ /// If the tag is 1, this entry is marked as deleted.
+ next: Atomic<Entry>,
+}
+
+/// Implementing this trait asserts that the type `T` can be used as an element in the intrusive
+/// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance
+/// of `Entry`.
+///
+/// # Example
+///
+/// ```ignore
+/// struct A {
+/// entry: Entry,
+/// data: usize,
+/// }
+///
+/// impl IsElement<A> for A {
+/// fn entry_of(a: &A) -> &Entry {
+/// let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry;
+/// unsafe { &*entry_ptr }
+/// }
+///
+/// unsafe fn element_of(entry: &Entry) -> &T {
+/// let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T;
+/// &*elem_ptr
+/// }
+///
+/// unsafe fn finalize(entry: &Entry, guard: &Guard) {
+/// guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
+/// }
+/// }
+/// ```
+///
+/// This trait is implemented on a type separate from `T` (although it can be just `T`), because
+/// one type might be placeable into multiple lists, in which case it would require multiple
+/// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>`
+/// represents a distinct `Entry` in `T`.
+///
+/// For example, we can insert the following struct into two lists using `entry1` for one
+/// and `entry2` for the other:
+///
+/// ```ignore
+/// struct B {
+/// entry1: Entry,
+/// entry2: Entry,
+/// data: usize,
+/// }
+/// ```
+///
+pub(crate) trait IsElement<T> {
+ /// Returns a reference to this element's `Entry`.
+ fn entry_of(_: &T) -> &Entry;
+
+ /// Given a reference to an element's entry, returns that element.
+ ///
+ /// ```ignore
+ /// let elem = ListElement::new();
+ /// assert_eq!(elem.entry_of(),
+ /// unsafe { ListElement::element_of(elem.entry_of()) } );
+ /// ```
+ ///
+ /// # Safety
+ ///
+ /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
+ /// of the element type (`T`).
+ unsafe fn element_of(_: &Entry) -> &T;
+
+ /// The function that is called when an entry is unlinked from list.
+ ///
+ /// # Safety
+ ///
+ /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
+ /// of the element type (`T`).
+ unsafe fn finalize(_: &Entry, _: &Guard);
+}
+
+/// A lock-free, intrusive linked list of type `T`.
+#[derive(Debug)]
+pub(crate) struct List<T, C: IsElement<T> = T> {
+ /// The head of the linked list.
+ head: Atomic<Entry>,
+
+ /// The phantom data for using `T` and `C`.
+ _marker: PhantomData<(T, C)>,
+}
+
+/// An iterator used for retrieving values from the list.
+pub(crate) struct Iter<'g, T, C: IsElement<T>> {
+ /// The guard that protects the iteration.
+ guard: &'g Guard,
+
+ /// Pointer from the predecessor to the current entry.
+ pred: &'g Atomic<Entry>,
+
+ /// The current entry.
+ curr: Shared<'g, Entry>,
+
+ /// The list head, needed for restarting iteration.
+ head: &'g Atomic<Entry>,
+
+ /// Logically, we store a borrow of an instance of `T` and
+ /// use the type information from `C`.
+ _marker: PhantomData<(&'g T, C)>,
+}
+
+/// An error that occurs during iteration over the list.
+#[derive(PartialEq, Debug)]
+pub(crate) enum IterError {
+ /// A concurrent thread modified the state of the list at the same place that this iterator
+ /// was inspecting. Subsequent iteration will restart from the beginning of the list.
+ Stalled,
+}
+
+impl Default for Entry {
+ /// Returns the empty entry.
+ fn default() -> Self {
+ Self {
+ next: Atomic::null(),
+ }
+ }
+}
+
+impl Entry {
+ /// Marks this entry as deleted, deferring the actual deallocation to a later iteration.
+ ///
+ /// # Safety
+ ///
+ /// The entry should be a member of a linked list, and it should not have been deleted.
+ /// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C`
+ /// is the associated helper for the linked list.
+ pub(crate) unsafe fn delete(&self, guard: &Guard) {
+ self.next.fetch_or(1, Release, guard);
+ }
+}
+
+impl<T, C: IsElement<T>> List<T, C> {
+ /// Returns a new, empty linked list.
+ pub(crate) fn new() -> Self {
+ Self {
+ head: Atomic::null(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Inserts `entry` into the head of the list.
+ ///
+ /// # Safety
+ ///
+ /// You should guarantee that:
+ ///
+ /// - `container` is not null
+ /// - `container` is immovable, e.g. inside an `Owned`
+ /// - the same `Entry` is not inserted more than once
+ /// - the inserted object will be removed before the list is dropped
+ pub(crate) unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) {
+ // Insert right after head, i.e. at the beginning of the list.
+ let to = &self.head;
+ // Get the intrusively stored Entry of the new element to insert.
+ let entry: &Entry = C::entry_of(container.deref());
+ // Make a Shared ptr to that Entry.
+ let entry_ptr = Shared::from(entry as *const _);
+ // Read the current successor of where we want to insert.
+ let mut next = to.load(Relaxed, guard);
+
+ loop {
+ // Set the Entry of the to-be-inserted element to point to the previous successor of
+ // `to`.
+ entry.next.store(next, Relaxed);
+ match to.compare_exchange_weak(next, entry_ptr, Release, Relaxed, guard) {
+ Ok(_) => break,
+ // We lost the race or weak CAS failed spuriously. Update the successor and try
+ // again.
+ Err(err) => next = err.current,
+ }
+ }
+ }
+
+ /// Returns an iterator over all objects.
+ ///
+ /// # Caveat
+ ///
+ /// Every object that is inserted at the moment this function is called and persists at least
+ /// until the end of iteration will be returned. Since this iterator traverses a lock-free
+ /// linked list that may be concurrently modified, some additional caveats apply:
+ ///
+ /// 1. If a new object is inserted during iteration, it may or may not be returned.
+ /// 2. If an object is deleted during iteration, it may or may not be returned.
+ /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning
+ /// thread will continue to iterate over the same list.
+ pub(crate) fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> {
+ Iter {
+ guard,
+ pred: &self.head,
+ curr: self.head.load(Acquire, guard),
+ head: &self.head,
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl<T, C: IsElement<T>> Drop for List<T, C> {
+ fn drop(&mut self) {
+ unsafe {
+ let guard = unprotected();
+ let mut curr = self.head.load(Relaxed, guard);
+ while let Some(c) = curr.as_ref() {
+ let succ = c.next.load(Relaxed, guard);
+ // Verify that all elements have been removed from the list.
+ assert_eq!(succ.tag(), 1);
+
+ C::finalize(curr.deref(), guard);
+ curr = succ;
+ }
+ }
+ }
+}
+
+impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
+ type Item = Result<&'g T, IterError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ while let Some(c) = unsafe { self.curr.as_ref() } {
+ let succ = c.next.load(Acquire, self.guard);
+
+ if succ.tag() == 1 {
+ // This entry was removed. Try unlinking it from the list.
+ let succ = succ.with_tag(0);
+
+ // The tag should always be zero, because removing a node after a logically deleted
+ // node leaves the list in an invalid state.
+ debug_assert!(self.curr.tag() == 0);
+
+ // Try to unlink `curr` from the list, and get the new value of `self.pred`.
+ let succ = match self
+ .pred
+ .compare_exchange(self.curr, succ, Acquire, Acquire, self.guard)
+ {
+ Ok(_) => {
+ // We succeeded in unlinking `curr`, so we have to schedule
+ // deallocation. Deferred drop is okay, because `list.delete()` can only be
+ // called if `T: 'static`.
+ unsafe {
+ C::finalize(self.curr.deref(), self.guard);
+ }
+
+ // `succ` is the new value of `self.pred`.
+ succ
+ }
+ Err(e) => {
+ // `e.current` is the current value of `self.pred`.
+ e.current
+ }
+ };
+
+ // If the predecessor node is already marked as deleted, we need to restart from
+ // `head`.
+ if succ.tag() != 0 {
+ self.pred = self.head;
+ self.curr = self.head.load(Acquire, self.guard);
+
+ return Some(Err(IterError::Stalled));
+ }
+
+ // Move over the removed by only advancing `curr`, not `pred`.
+ self.curr = succ;
+ continue;
+ }
+
+ // Move one step forward.
+ self.pred = &c.next;
+ self.curr = succ;
+
+ return Some(Ok(unsafe { C::element_of(c) }));
+ }
+
+ // We reached the end of the list.
+ None
+ }
+}
+
+#[cfg(all(test, not(crossbeam_loom)))]
+mod tests {
+ use super::*;
+ use crate::{Collector, Owned};
+ use crossbeam_utils::thread;
+ use std::sync::Barrier;
+
+ impl IsElement<Entry> for Entry {
+ fn entry_of(entry: &Entry) -> &Entry {
+ entry
+ }
+
+ unsafe fn element_of(entry: &Entry) -> &Entry {
+ entry
+ }
+
+ unsafe fn finalize(entry: &Entry, guard: &Guard) {
+ guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
+ }
+ }
+
+ /// Checks whether the list retains inserted elements
+ /// and returns them in the correct order.
+ #[test]
+ fn insert() {
+ let collector = Collector::new();
+ let handle = collector.register();
+ let guard = handle.pin();
+
+ let l: List<Entry> = List::new();
+
+ let e1 = Owned::new(Entry::default()).into_shared(&guard);
+ let e2 = Owned::new(Entry::default()).into_shared(&guard);
+ let e3 = Owned::new(Entry::default()).into_shared(&guard);
+
+ unsafe {
+ l.insert(e1, &guard);
+ l.insert(e2, &guard);
+ l.insert(e3, &guard);
+ }
+
+ let mut iter = l.iter(&guard);
+ let maybe_e3 = iter.next();
+ assert!(maybe_e3.is_some());
+ assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
+ let maybe_e2 = iter.next();
+ assert!(maybe_e2.is_some());
+ assert!(maybe_e2.unwrap().unwrap() as *const Entry == e2.as_raw());
+ let maybe_e1 = iter.next();
+ assert!(maybe_e1.is_some());
+ assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
+ assert!(iter.next().is_none());
+
+ unsafe {
+ e1.as_ref().unwrap().delete(&guard);
+ e2.as_ref().unwrap().delete(&guard);
+ e3.as_ref().unwrap().delete(&guard);
+ }
+ }
+
+ /// Checks whether elements can be removed from the list and whether
+ /// the correct elements are removed.
+ #[test]
+ fn delete() {
+ let collector = Collector::new();
+ let handle = collector.register();
+ let guard = handle.pin();
+
+ let l: List<Entry> = List::new();
+
+ let e1 = Owned::new(Entry::default()).into_shared(&guard);
+ let e2 = Owned::new(Entry::default()).into_shared(&guard);
+ let e3 = Owned::new(Entry::default()).into_shared(&guard);
+ unsafe {
+ l.insert(e1, &guard);
+ l.insert(e2, &guard);
+ l.insert(e3, &guard);
+ e2.as_ref().unwrap().delete(&guard);
+ }
+
+ let mut iter = l.iter(&guard);
+ let maybe_e3 = iter.next();
+ assert!(maybe_e3.is_some());
+ assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
+ let maybe_e1 = iter.next();
+ assert!(maybe_e1.is_some());
+ assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
+ assert!(iter.next().is_none());
+
+ unsafe {
+ e1.as_ref().unwrap().delete(&guard);
+ e3.as_ref().unwrap().delete(&guard);
+ }
+
+ let mut iter = l.iter(&guard);
+ assert!(iter.next().is_none());
+ }
+
+ const THREADS: usize = 8;
+ const ITERS: usize = 512;
+
+ /// Contends the list on insert and delete operations to make sure they can run concurrently.
+ #[test]
+ fn insert_delete_multi() {
+ let collector = Collector::new();
+
+ let l: List<Entry> = List::new();
+ let b = Barrier::new(THREADS);
+
+ thread::scope(|s| {
+ for _ in 0..THREADS {
+ s.spawn(|_| {
+ b.wait();
+
+ let handle = collector.register();
+ let guard: Guard = handle.pin();
+ let mut v = Vec::with_capacity(ITERS);
+
+ for _ in 0..ITERS {
+ let e = Owned::new(Entry::default()).into_shared(&guard);
+ v.push(e);
+ unsafe {
+ l.insert(e, &guard);
+ }
+ }
+
+ for e in v {
+ unsafe {
+ e.as_ref().unwrap().delete(&guard);
+ }
+ }
+ });
+ }
+ })
+ .unwrap();
+
+ let handle = collector.register();
+ let guard = handle.pin();
+
+ let mut iter = l.iter(&guard);
+ assert!(iter.next().is_none());
+ }
+
+ /// Contends the list on iteration to make sure that it can be iterated over concurrently.
+ #[test]
+ fn iter_multi() {
+ let collector = Collector::new();
+
+ let l: List<Entry> = List::new();
+ let b = Barrier::new(THREADS);
+
+ thread::scope(|s| {
+ for _ in 0..THREADS {
+ s.spawn(|_| {
+ b.wait();
+
+ let handle = collector.register();
+ let guard: Guard = handle.pin();
+ let mut v = Vec::with_capacity(ITERS);
+
+ for _ in 0..ITERS {
+ let e = Owned::new(Entry::default()).into_shared(&guard);
+ v.push(e);
+ unsafe {
+ l.insert(e, &guard);
+ }
+ }
+
+ let mut iter = l.iter(&guard);
+ for _ in 0..ITERS {
+ assert!(iter.next().is_some());
+ }
+
+ for e in v {
+ unsafe {
+ e.as_ref().unwrap().delete(&guard);
+ }
+ }
+ });
+ }
+ })
+ .unwrap();
+
+ let handle = collector.register();
+ let guard = handle.pin();
+
+ let mut iter = l.iter(&guard);
+ assert!(iter.next().is_none());
+ }
+}
diff --git a/vendor/crossbeam-epoch/src/sync/mod.rs b/vendor/crossbeam-epoch/src/sync/mod.rs
new file mode 100644
index 0000000..08981be
--- /dev/null
+++ b/vendor/crossbeam-epoch/src/sync/mod.rs
@@ -0,0 +1,7 @@
+//! Synchronization primitives.
+
+pub(crate) mod list;
+#[cfg(feature = "std")]
+#[cfg(not(crossbeam_loom))]
+pub(crate) mod once_lock;
+pub(crate) mod queue;
diff --git a/vendor/crossbeam-epoch/src/sync/once_lock.rs b/vendor/crossbeam-epoch/src/sync/once_lock.rs
new file mode 100644
index 0000000..e057aca
--- /dev/null
+++ b/vendor/crossbeam-epoch/src/sync/once_lock.rs
@@ -0,0 +1,88 @@
+// Based on unstable std::sync::OnceLock.
+//
+// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs
+
+use core::cell::UnsafeCell;
+use core::mem::MaybeUninit;
+use std::sync::Once;
+
+pub(crate) struct OnceLock<T> {
+ once: Once,
+ value: UnsafeCell<MaybeUninit<T>>,
+ // Unlike std::sync::OnceLock, we don't need PhantomData here because
+ // we don't use #[may_dangle].
+}
+
+unsafe impl<T: Sync + Send> Sync for OnceLock<T> {}
+unsafe impl<T: Send> Send for OnceLock<T> {}
+
+impl<T> OnceLock<T> {
+ /// Creates a new empty cell.
+ #[must_use]
+ pub(crate) const fn new() -> Self {
+ Self {
+ once: Once::new(),
+ value: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ }
+
+ /// Gets the contents of the cell, initializing it with `f` if the cell
+ /// was empty.
+ ///
+ /// Many threads may call `get_or_init` concurrently with different
+ /// initializing functions, but it is guaranteed that only one function
+ /// will be executed.
+ ///
+ /// # Panics
+ ///
+ /// If `f` panics, the panic is propagated to the caller, and the cell
+ /// remains uninitialized.
+ ///
+ /// It is an error to reentrantly initialize the cell from `f`. The
+ /// exact outcome is unspecified. Current implementation deadlocks, but
+ /// this may be changed to a panic in the future.
+ pub(crate) fn get_or_init<F>(&self, f: F) -> &T
+ where
+ F: FnOnce() -> T,
+ {
+ // Fast path check
+ if self.once.is_completed() {
+ // SAFETY: The inner value has been initialized
+ return unsafe { self.get_unchecked() };
+ }
+ self.initialize(f);
+
+ // SAFETY: The inner value has been initialized
+ unsafe { self.get_unchecked() }
+ }
+
+ #[cold]
+ fn initialize<F>(&self, f: F)
+ where
+ F: FnOnce() -> T,
+ {
+ let slot = self.value.get();
+
+ self.once.call_once(|| {
+ let value = f();
+ unsafe { slot.write(MaybeUninit::new(value)) }
+ });
+ }
+
+ /// # Safety
+ ///
+ /// The value must be initialized
+ unsafe fn get_unchecked(&self) -> &T {
+ debug_assert!(self.once.is_completed());
+ &*self.value.get().cast::<T>()
+ }
+}
+
+impl<T> Drop for OnceLock<T> {
+ fn drop(&mut self) {
+ if self.once.is_completed() {
+ // SAFETY: The inner value has been initialized
+ unsafe { (*self.value.get()).assume_init_drop() };
+ }
+ }
+}
diff --git a/vendor/crossbeam-epoch/src/sync/queue.rs b/vendor/crossbeam-epoch/src/sync/queue.rs
new file mode 100644
index 0000000..76c326b
--- /dev/null
+++ b/vendor/crossbeam-epoch/src/sync/queue.rs
@@ -0,0 +1,468 @@
+//! Michael-Scott lock-free queue.
+//!
+//! Usable with any number of producers and consumers.
+//!
+//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
+//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
+//!
+//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
+//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
+
+use core::mem::MaybeUninit;
+use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+
+use crossbeam_utils::CachePadded;
+
+use crate::{unprotected, Atomic, Guard, Owned, Shared};
+
+// The representation here is a singly-linked list, with a sentinel node at the front. In general
+// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
+// all `Blocked` (requests for data from blocked threads).
+#[derive(Debug)]
+pub(crate) struct Queue<T> {
+ head: CachePadded<Atomic<Node<T>>>,
+ tail: CachePadded<Atomic<Node<T>>>,
+}
+
+struct Node<T> {
+ /// The slot in which a value of type `T` can be stored.
+ ///
+ /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
+ /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
+ /// Other nodes start their life with a push operation and contain a value until it gets popped
+ /// out. After that such empty nodes get added to the collector for destruction.
+ data: MaybeUninit<T>,
+
+ next: Atomic<Node<T>>,
+}
+
+// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
+unsafe impl<T: Send> Sync for Queue<T> {}
+unsafe impl<T: Send> Send for Queue<T> {}
+
+impl<T> Queue<T> {
+ /// Create a new, empty queue.
+ pub(crate) fn new() -> Queue<T> {
+ let q = Queue {
+ head: CachePadded::new(Atomic::null()),
+ tail: CachePadded::new(Atomic::null()),
+ };
+ let sentinel = Owned::new(Node {
+ data: MaybeUninit::uninit(),
+ next: Atomic::null(),
+ });
+ unsafe {
+ let guard = unprotected();
+ let sentinel = sentinel.into_shared(guard);
+ q.head.store(sentinel, Relaxed);
+ q.tail.store(sentinel, Relaxed);
+ q
+ }
+ }
+
+ /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
+ /// success. The queue's `tail` pointer may be updated.
+ #[inline(always)]
+ fn push_internal(
+ &self,
+ onto: Shared<'_, Node<T>>,
+ new: Shared<'_, Node<T>>,
+ guard: &Guard,
+ ) -> bool {
+ // is `onto` the actual tail?
+ let o = unsafe { onto.deref() };
+ let next = o.next.load(Acquire, guard);
+ if unsafe { next.as_ref().is_some() } {
+ // if not, try to "help" by moving the tail pointer forward
+ let _ = self
+ .tail
+ .compare_exchange(onto, next, Release, Relaxed, guard);
+ false
+ } else {
+ // looks like the actual tail; attempt to link in `n`
+ let result = o
+ .next
+ .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
+ .is_ok();
+ if result {
+ // try to move the tail pointer forward
+ let _ = self
+ .tail
+ .compare_exchange(onto, new, Release, Relaxed, guard);
+ }
+ result
+ }
+ }
+
+ /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
+ pub(crate) fn push(&self, t: T, guard: &Guard) {
+ let new = Owned::new(Node {
+ data: MaybeUninit::new(t),
+ next: Atomic::null(),
+ });
+ let new = Owned::into_shared(new, guard);
+
+ loop {
+ // We push onto the tail, so we'll start optimistically by looking there first.
+ let tail = self.tail.load(Acquire, guard);
+
+ // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
+ if self.push_internal(tail, new, guard) {
+ break;
+ }
+ }
+ }
+
+ /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
+ #[inline(always)]
+ fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
+ let head = self.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ let next = h.next.load(Acquire, guard);
+ match unsafe { next.as_ref() } {
+ Some(n) => unsafe {
+ self.head
+ .compare_exchange(head, next, Release, Relaxed, guard)
+ .map(|_| {
+ let tail = self.tail.load(Relaxed, guard);
+ // Advance the tail so that we don't retire a pointer to a reachable node.
+ if head == tail {
+ let _ = self
+ .tail
+ .compare_exchange(tail, next, Release, Relaxed, guard);
+ }
+ guard.defer_destroy(head);
+ Some(n.data.assume_init_read())
+ })
+ .map_err(|_| ())
+ },
+ None => Ok(None),
+ }
+ }
+
+ /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
+ /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
+ #[inline(always)]
+ fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
+ where
+ T: Sync,
+ F: Fn(&T) -> bool,
+ {
+ let head = self.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ let next = h.next.load(Acquire, guard);
+ match unsafe { next.as_ref() } {
+ Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
+ self.head
+ .compare_exchange(head, next, Release, Relaxed, guard)
+ .map(|_| {
+ let tail = self.tail.load(Relaxed, guard);
+ // Advance the tail so that we don't retire a pointer to a reachable node.
+ if head == tail {
+ let _ = self
+ .tail
+ .compare_exchange(tail, next, Release, Relaxed, guard);
+ }
+ guard.defer_destroy(head);
+ Some(n.data.assume_init_read())
+ })
+ .map_err(|_| ())
+ },
+ None | Some(_) => Ok(None),
+ }
+ }
+
+ /// Attempts to dequeue from the front.
+ ///
+ /// Returns `None` if the queue is observed to be empty.
+ pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
+ loop {
+ if let Ok(head) = self.pop_internal(guard) {
+ return head;
+ }
+ }
+ }
+
+ /// Attempts to dequeue from the front, if the item satisfies the given condition.
+ ///
+ /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
+ /// condition.
+ pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
+ where
+ T: Sync,
+ F: Fn(&T) -> bool,
+ {
+ loop {
+ if let Ok(head) = self.pop_if_internal(&condition, guard) {
+ return head;
+ }
+ }
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let guard = unprotected();
+
+ while self.try_pop(guard).is_some() {}
+
+ // Destroy the remaining sentinel node.
+ let sentinel = self.head.load(Relaxed, guard);
+ drop(sentinel.into_owned());
+ }
+ }
+}
+
+#[cfg(all(test, not(crossbeam_loom)))]
+mod test {
+ use super::*;
+ use crate::pin;
+ use crossbeam_utils::thread;
+
+ struct Queue<T> {
+ queue: super::Queue<T>,
+ }
+
+ impl<T> Queue<T> {
+ pub(crate) fn new() -> Queue<T> {
+ Queue {
+ queue: super::Queue::new(),
+ }
+ }
+
+ pub(crate) fn push(&self, t: T) {
+ let guard = &pin();
+ self.queue.push(t, guard);
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ let guard = &pin();
+ let head = self.queue.head.load(Acquire, guard);
+ let h = unsafe { head.deref() };
+ h.next.load(Acquire, guard).is_null()
+ }
+
+ pub(crate) fn try_pop(&self) -> Option<T> {
+ let guard = &pin();
+ self.queue.try_pop(guard)
+ }
+
+ pub(crate) fn pop(&self) -> T {
+ loop {
+ match self.try_pop() {
+ None => continue,
+ Some(t) => return t,
+ }
+ }
+ }
+ }
+
+ #[cfg(miri)]
+ const CONC_COUNT: i64 = 1000;
+ #[cfg(not(miri))]
+ const CONC_COUNT: i64 = 1000000;
+
+ #[test]
+ fn push_try_pop_1() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ assert!(!q.is_empty());
+ assert_eq!(q.try_pop(), Some(37));
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_2() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ q.push(48);
+ assert_eq!(q.try_pop(), Some(37));
+ assert!(!q.is_empty());
+ assert_eq!(q.try_pop(), Some(48));
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_many_seq() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ for i in 0..200 {
+ q.push(i)
+ }
+ assert!(!q.is_empty());
+ for i in 0..200 {
+ assert_eq!(q.try_pop(), Some(i));
+ }
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_pop_1() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ q.push(37);
+ assert!(!q.is_empty());
+ assert_eq!(q.pop(), 37);
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_pop_2() {
+ let q: Queue<i64> = Queue::new();
+ q.push(37);
+ q.push(48);
+ assert_eq!(q.pop(), 37);
+ assert_eq!(q.pop(), 48);
+ }
+
+ #[test]
+ fn push_pop_many_seq() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ for i in 0..200 {
+ q.push(i)
+ }
+ assert!(!q.is_empty());
+ for i in 0..200 {
+ assert_eq!(q.pop(), i);
+ }
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn push_try_pop_many_spsc() {
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+
+ thread::scope(|scope| {
+ scope.spawn(|_| {
+ let mut next = 0;
+
+ while next < CONC_COUNT {
+ if let Some(elem) = q.try_pop() {
+ assert_eq!(elem, next);
+ next += 1;
+ }
+ }
+ });
+
+ for i in 0..CONC_COUNT {
+ q.push(i)
+ }
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_try_pop_many_spmc() {
+ fn recv(_t: i32, q: &Queue<i64>) {
+ let mut cur = -1;
+ for _i in 0..CONC_COUNT {
+ if let Some(elem) = q.try_pop() {
+ assert!(elem > cur);
+ cur = elem;
+
+ if cur == CONC_COUNT - 1 {
+ break;
+ }
+ }
+ }
+ }
+
+ let q: Queue<i64> = Queue::new();
+ assert!(q.is_empty());
+ thread::scope(|scope| {
+ for i in 0..3 {
+ let q = &q;
+ scope.spawn(move |_| recv(i, q));
+ }
+
+ scope.spawn(|_| {
+ for i in 0..CONC_COUNT {
+ q.push(i);
+ }
+ });
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_try_pop_many_mpmc() {
+ enum LR {
+ Left(i64),
+ Right(i64),
+ }
+
+ let q: Queue<LR> = Queue::new();
+ assert!(q.is_empty());
+
+ thread::scope(|scope| {
+ for _t in 0..2 {
+ scope.spawn(|_| {
+ for i in CONC_COUNT - 1..CONC_COUNT {
+ q.push(LR::Left(i))
+ }
+ });
+ scope.spawn(|_| {
+ for i in CONC_COUNT - 1..CONC_COUNT {
+ q.push(LR::Right(i))
+ }
+ });
+ scope.spawn(|_| {
+ let mut vl = vec![];
+ let mut vr = vec![];
+ for _i in 0..CONC_COUNT {
+ match q.try_pop() {
+ Some(LR::Left(x)) => vl.push(x),
+ Some(LR::Right(x)) => vr.push(x),
+ _ => {}
+ }
+ }
+
+ let mut vl2 = vl.clone();
+ let mut vr2 = vr.clone();
+ vl2.sort_unstable();
+ vr2.sort_unstable();
+
+ assert_eq!(vl, vl2);
+ assert_eq!(vr, vr2);
+ });
+ }
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn push_pop_many_spsc() {
+ let q: Queue<i64> = Queue::new();
+
+ thread::scope(|scope| {
+ scope.spawn(|_| {
+ let mut next = 0;
+ while next < CONC_COUNT {
+ assert_eq!(q.pop(), next);
+ next += 1;
+ }
+ });
+
+ for i in 0..CONC_COUNT {
+ q.push(i)
+ }
+ })
+ .unwrap();
+ assert!(q.is_empty());
+ }
+
+ #[test]
+ fn is_empty_dont_pop() {
+ let q: Queue<i64> = Queue::new();
+ q.push(20);
+ q.push(20);
+ assert!(!q.is_empty());
+ assert!(!q.is_empty());
+ assert!(q.try_pop().is_some());
+ }
+}