aboutsummaryrefslogtreecommitdiff
path: root/vendor/crossbeam-epoch/src/internal.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
committerValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
commita990de90fe41456a23e58bd087d2f107d321f3a1 (patch)
tree15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/crossbeam-epoch/src/internal.rs
parent3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff)
downloadfparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz
fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip
Deleted vendor folder
Diffstat (limited to 'vendor/crossbeam-epoch/src/internal.rs')
-rw-r--r--vendor/crossbeam-epoch/src/internal.rs600
1 files changed, 0 insertions, 600 deletions
diff --git a/vendor/crossbeam-epoch/src/internal.rs b/vendor/crossbeam-epoch/src/internal.rs
deleted file mode 100644
index b2e9e71..0000000
--- a/vendor/crossbeam-epoch/src/internal.rs
+++ /dev/null
@@ -1,600 +0,0 @@
-//! The global data and participant for garbage collection.
-//!
-//! # Registration
-//!
-//! In order to track all participants in one place, we need some form of participant
-//! registration. When a participant is created, it is registered to a global lock-free
-//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
-//! list.
-//!
-//! # Pinning
-//!
-//! Every participant contains an integer that tells whether the participant is pinned and if so,
-//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
-//! aids in periodic global epoch advancement.
-//!
-//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
-//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
-//!
-//! # Thread-local bag
-//!
-//! Objects that get unlinked from concurrent data structures must be stashed away until the global
-//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
-//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
-//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
-//! for amortizing the synchronization cost of pushing the garbages to a global queue.
-//!
-//! # Global queue
-//!
-//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
-//! destroyed along the way. This design reduces contention on data structures. The global queue
-//! cannot be explicitly accessed: the only way to interact with it is by calling functions
-//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
-//! garbage collection.
-//!
-//! Ideally each instance of concurrent data structure may have its own queue that gets fully
-//! destroyed as soon as the data structure gets dropped.
-
-use crate::primitive::cell::UnsafeCell;
-use crate::primitive::sync::atomic::{self, Ordering};
-use core::cell::Cell;
-use core::mem::{self, ManuallyDrop};
-use core::num::Wrapping;
-use core::{fmt, ptr};
-
-use crossbeam_utils::CachePadded;
-
-use crate::atomic::{Owned, Shared};
-use crate::collector::{Collector, LocalHandle};
-use crate::deferred::Deferred;
-use crate::epoch::{AtomicEpoch, Epoch};
-use crate::guard::{unprotected, Guard};
-use crate::sync::list::{Entry, IsElement, IterError, List};
-use crate::sync::queue::Queue;
-
-/// Maximum number of objects a bag can contain.
-#[cfg(not(any(crossbeam_sanitize, miri)))]
-const MAX_OBJECTS: usize = 64;
-// Makes it more likely to trigger any potential data races.
-#[cfg(any(crossbeam_sanitize, miri))]
-const MAX_OBJECTS: usize = 4;
-
-/// A bag of deferred functions.
-pub(crate) struct Bag {
- /// Stashed objects.
- deferreds: [Deferred; MAX_OBJECTS],
- len: usize,
-}
-
-/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
-unsafe impl Send for Bag {}
-
-impl Bag {
- /// Returns a new, empty bag.
- pub(crate) fn new() -> Self {
- Self::default()
- }
-
- /// Returns `true` if the bag is empty.
- pub(crate) fn is_empty(&self) -> bool {
- self.len == 0
- }
-
- /// Attempts to insert a deferred function into the bag.
- ///
- /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
- /// full.
- ///
- /// # Safety
- ///
- /// It should be safe for another thread to execute the given function.
- pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
- if self.len < MAX_OBJECTS {
- self.deferreds[self.len] = deferred;
- self.len += 1;
- Ok(())
- } else {
- Err(deferred)
- }
- }
-
- /// Seals the bag with the given epoch.
- fn seal(self, epoch: Epoch) -> SealedBag {
- SealedBag { epoch, _bag: self }
- }
-}
-
-impl Default for Bag {
- fn default() -> Self {
- Bag {
- len: 0,
- deferreds: [Deferred::NO_OP; MAX_OBJECTS],
- }
- }
-}
-
-impl Drop for Bag {
- fn drop(&mut self) {
- // Call all deferred functions.
- for deferred in &mut self.deferreds[..self.len] {
- let no_op = Deferred::NO_OP;
- let owned_deferred = mem::replace(deferred, no_op);
- owned_deferred.call();
- }
- }
-}
-
-// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
-impl fmt::Debug for Bag {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Bag")
- .field("deferreds", &&self.deferreds[..self.len])
- .finish()
- }
-}
-
-/// A pair of an epoch and a bag.
-#[derive(Default, Debug)]
-struct SealedBag {
- epoch: Epoch,
- _bag: Bag,
-}
-
-/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
-unsafe impl Sync for SealedBag {}
-
-impl SealedBag {
- /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
- fn is_expired(&self, global_epoch: Epoch) -> bool {
- // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
- // is within one epoch of the current one cannot be destroyed yet.
- global_epoch.wrapping_sub(self.epoch) >= 2
- }
-}
-
-/// The global data for a garbage collector.
-pub(crate) struct Global {
- /// The intrusive linked list of `Local`s.
- locals: List<Local>,
-
- /// The global queue of bags of deferred functions.
- queue: Queue<SealedBag>,
-
- /// The global epoch.
- pub(crate) epoch: CachePadded<AtomicEpoch>,
-}
-
-impl Global {
- /// Number of bags to destroy.
- const COLLECT_STEPS: usize = 8;
-
- /// Creates a new global data for garbage collection.
- #[inline]
- pub(crate) fn new() -> Self {
- Self {
- locals: List::new(),
- queue: Queue::new(),
- epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
- }
- }
-
- /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
- pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
- let bag = mem::replace(bag, Bag::new());
-
- atomic::fence(Ordering::SeqCst);
-
- let epoch = self.epoch.load(Ordering::Relaxed);
- self.queue.push(bag.seal(epoch), guard);
- }
-
- /// Collects several bags from the global queue and executes deferred functions in them.
- ///
- /// Note: This may itself produce garbage and in turn allocate new bags.
- ///
- /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
- /// path. In other words, we want the compiler to optimize branching for the case when
- /// `collect()` is not called.
- #[cold]
- pub(crate) fn collect(&self, guard: &Guard) {
- let global_epoch = self.try_advance(guard);
-
- let steps = if cfg!(crossbeam_sanitize) {
- usize::max_value()
- } else {
- Self::COLLECT_STEPS
- };
-
- for _ in 0..steps {
- match self.queue.try_pop_if(
- &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
- guard,
- ) {
- None => break,
- Some(sealed_bag) => drop(sealed_bag),
- }
- }
- }
-
- /// Attempts to advance the global epoch.
- ///
- /// The global epoch can advance only if all currently pinned participants have been pinned in
- /// the current epoch.
- ///
- /// Returns the current global epoch.
- ///
- /// `try_advance()` is annotated `#[cold]` because it is rarely called.
- #[cold]
- pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
- let global_epoch = self.epoch.load(Ordering::Relaxed);
- atomic::fence(Ordering::SeqCst);
-
- // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
- // easy to implement in a lock-free manner. However, traversal can be slow due to cache
- // misses and data dependencies. We should experiment with other data structures as well.
- for local in self.locals.iter(guard) {
- match local {
- Err(IterError::Stalled) => {
- // A concurrent thread stalled this iteration. That thread might also try to
- // advance the epoch, in which case we leave the job to it. Otherwise, the
- // epoch will not be advanced.
- return global_epoch;
- }
- Ok(local) => {
- let local_epoch = local.epoch.load(Ordering::Relaxed);
-
- // If the participant was pinned in a different epoch, we cannot advance the
- // global epoch just yet.
- if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
- return global_epoch;
- }
- }
- }
- }
- atomic::fence(Ordering::Acquire);
-
- // All pinned participants were pinned in the current global epoch.
- // Now let's advance the global epoch...
- //
- // Note that if another thread already advanced it before us, this store will simply
- // overwrite the global epoch with the same value. This is true because `try_advance` was
- // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
- // advanced two steps ahead of it.
- let new_epoch = global_epoch.successor();
- self.epoch.store(new_epoch, Ordering::Release);
- new_epoch
- }
-}
-
-/// Participant for garbage collection.
-#[repr(C)] // Note: `entry` must be the first field
-pub(crate) struct Local {
- /// A node in the intrusive linked list of `Local`s.
- entry: Entry,
-
- /// A reference to the global data.
- ///
- /// When all guards and handles get dropped, this reference is destroyed.
- collector: UnsafeCell<ManuallyDrop<Collector>>,
-
- /// The local bag of deferred functions.
- pub(crate) bag: UnsafeCell<Bag>,
-
- /// The number of guards keeping this participant pinned.
- guard_count: Cell<usize>,
-
- /// The number of active handles.
- handle_count: Cell<usize>,
-
- /// Total number of pinnings performed.
- ///
- /// This is just an auxiliary counter that sometimes kicks off collection.
- pin_count: Cell<Wrapping<usize>>,
-
- /// The local epoch.
- epoch: CachePadded<AtomicEpoch>,
-}
-
-// Make sure `Local` is less than or equal to 2048 bytes.
-// https://github.com/crossbeam-rs/crossbeam/issues/551
-#[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
-#[test]
-fn local_size() {
- // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
- // assert!(
- // core::mem::size_of::<Local>() <= 2048,
- // "An allocation of `Local` should be <= 2048 bytes."
- // );
-}
-
-impl Local {
- /// Number of pinnings after which a participant will execute some deferred functions from the
- /// global queue.
- const PINNINGS_BETWEEN_COLLECT: usize = 128;
-
- /// Registers a new `Local` in the provided `Global`.
- pub(crate) fn register(collector: &Collector) -> LocalHandle {
- unsafe {
- // Since we dereference no pointers in this block, it is safe to use `unprotected`.
-
- let local = Owned::new(Local {
- entry: Entry::default(),
- collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
- bag: UnsafeCell::new(Bag::new()),
- guard_count: Cell::new(0),
- handle_count: Cell::new(1),
- pin_count: Cell::new(Wrapping(0)),
- epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
- })
- .into_shared(unprotected());
- collector.global.locals.insert(local, unprotected());
- LocalHandle {
- local: local.as_raw(),
- }
- }
- }
-
- /// Returns a reference to the `Global` in which this `Local` resides.
- #[inline]
- pub(crate) fn global(&self) -> &Global {
- &self.collector().global
- }
-
- /// Returns a reference to the `Collector` in which this `Local` resides.
- #[inline]
- pub(crate) fn collector(&self) -> &Collector {
- self.collector.with(|c| unsafe { &**c })
- }
-
- /// Returns `true` if the current participant is pinned.
- #[inline]
- pub(crate) fn is_pinned(&self) -> bool {
- self.guard_count.get() > 0
- }
-
- /// Adds `deferred` to the thread-local bag.
- ///
- /// # Safety
- ///
- /// It should be safe for another thread to execute the given function.
- pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
- let bag = self.bag.with_mut(|b| &mut *b);
-
- while let Err(d) = bag.try_push(deferred) {
- self.global().push_bag(bag, guard);
- deferred = d;
- }
- }
-
- pub(crate) fn flush(&self, guard: &Guard) {
- let bag = self.bag.with_mut(|b| unsafe { &mut *b });
-
- if !bag.is_empty() {
- self.global().push_bag(bag, guard);
- }
-
- self.global().collect(guard);
- }
-
- /// Pins the `Local`.
- #[inline]
- pub(crate) fn pin(&self) -> Guard {
- let guard = Guard { local: self };
-
- let guard_count = self.guard_count.get();
- self.guard_count.set(guard_count.checked_add(1).unwrap());
-
- if guard_count == 0 {
- let global_epoch = self.global().epoch.load(Ordering::Relaxed);
- let new_epoch = global_epoch.pinned();
-
- // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
- // The fence makes sure that any future loads from `Atomic`s will not happen before
- // this store.
- if cfg!(all(
- any(target_arch = "x86", target_arch = "x86_64"),
- not(miri)
- )) {
- // HACK(stjepang): On x86 architectures there are two different ways of executing
- // a `SeqCst` fence.
- //
- // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
- // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
- // instruction.
- //
- // Both instructions have the effect of a full barrier, but benchmarks have shown
- // that the second one makes pinning faster in this particular case. It is not
- // clear that this is permitted by the C++ memory model (SC fences work very
- // differently from SC accesses), but experimental evidence suggests that this
- // works fine. Using inline assembly would be a viable (and correct) alternative,
- // but alas, that is not possible on stable Rust.
- let current = Epoch::starting();
- let res = self.epoch.compare_exchange(
- current,
- new_epoch,
- Ordering::SeqCst,
- Ordering::SeqCst,
- );
- debug_assert!(res.is_ok(), "participant was expected to be unpinned");
- // We add a compiler fence to make it less likely for LLVM to do something wrong
- // here. Formally, this is not enough to get rid of data races; practically,
- // it should go a long way.
- atomic::compiler_fence(Ordering::SeqCst);
- } else {
- self.epoch.store(new_epoch, Ordering::Relaxed);
- atomic::fence(Ordering::SeqCst);
- }
-
- // Increment the pin counter.
- let count = self.pin_count.get();
- self.pin_count.set(count + Wrapping(1));
-
- // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
- // some garbage.
- if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
- self.global().collect(&guard);
- }
- }
-
- guard
- }
-
- /// Unpins the `Local`.
- #[inline]
- pub(crate) fn unpin(&self) {
- let guard_count = self.guard_count.get();
- self.guard_count.set(guard_count - 1);
-
- if guard_count == 1 {
- self.epoch.store(Epoch::starting(), Ordering::Release);
-
- if self.handle_count.get() == 0 {
- self.finalize();
- }
- }
- }
-
- /// Unpins and then pins the `Local`.
- #[inline]
- pub(crate) fn repin(&self) {
- let guard_count = self.guard_count.get();
-
- // Update the local epoch only if there's only one guard.
- if guard_count == 1 {
- let epoch = self.epoch.load(Ordering::Relaxed);
- let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
-
- // Update the local epoch only if the global epoch is greater than the local epoch.
- if epoch != global_epoch {
- // We store the new epoch with `Release` because we need to ensure any memory
- // accesses from the previous epoch do not leak into the new one.
- self.epoch.store(global_epoch, Ordering::Release);
-
- // However, we don't need a following `SeqCst` fence, because it is safe for memory
- // accesses from the new epoch to be executed before updating the local epoch. At
- // worse, other threads will see the new epoch late and delay GC slightly.
- }
- }
- }
-
- /// Increments the handle count.
- #[inline]
- pub(crate) fn acquire_handle(&self) {
- let handle_count = self.handle_count.get();
- debug_assert!(handle_count >= 1);
- self.handle_count.set(handle_count + 1);
- }
-
- /// Decrements the handle count.
- #[inline]
- pub(crate) fn release_handle(&self) {
- let guard_count = self.guard_count.get();
- let handle_count = self.handle_count.get();
- debug_assert!(handle_count >= 1);
- self.handle_count.set(handle_count - 1);
-
- if guard_count == 0 && handle_count == 1 {
- self.finalize();
- }
- }
-
- /// Removes the `Local` from the global linked list.
- #[cold]
- fn finalize(&self) {
- debug_assert_eq!(self.guard_count.get(), 0);
- debug_assert_eq!(self.handle_count.get(), 0);
-
- // Temporarily increment handle count. This is required so that the following call to `pin`
- // doesn't call `finalize` again.
- self.handle_count.set(1);
- unsafe {
- // Pin and move the local bag into the global queue. It's important that `push_bag`
- // doesn't defer destruction on any new garbage.
- let guard = &self.pin();
- self.global()
- .push_bag(self.bag.with_mut(|b| &mut *b), guard);
- }
- // Revert the handle count back to zero.
- self.handle_count.set(0);
-
- unsafe {
- // Take the reference to the `Global` out of this `Local`. Since we're not protected
- // by a guard at this time, it's crucial that the reference is read before marking the
- // `Local` as deleted.
- let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
-
- // Mark this node in the linked list as deleted.
- self.entry.delete(unprotected());
-
- // Finally, drop the reference to the global. Note that this might be the last reference
- // to the `Global`. If so, the global data will be destroyed and all deferred functions
- // in its queue will be executed.
- drop(collector);
- }
- }
-}
-
-impl IsElement<Self> for Local {
- fn entry_of(local: &Self) -> &Entry {
- // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
- unsafe {
- let entry_ptr = (local as *const Self).cast::<Entry>();
- &*entry_ptr
- }
- }
-
- unsafe fn element_of(entry: &Entry) -> &Self {
- // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
- let local_ptr = (entry as *const Entry).cast::<Self>();
- &*local_ptr
- }
-
- unsafe fn finalize(entry: &Entry, guard: &Guard) {
- guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
- }
-}
-
-#[cfg(all(test, not(crossbeam_loom)))]
-mod tests {
- use std::sync::atomic::{AtomicUsize, Ordering};
-
- use super::*;
-
- #[test]
- fn check_defer() {
- static FLAG: AtomicUsize = AtomicUsize::new(0);
- fn set() {
- FLAG.store(42, Ordering::Relaxed);
- }
-
- let d = Deferred::new(set);
- assert_eq!(FLAG.load(Ordering::Relaxed), 0);
- d.call();
- assert_eq!(FLAG.load(Ordering::Relaxed), 42);
- }
-
- #[test]
- fn check_bag() {
- static FLAG: AtomicUsize = AtomicUsize::new(0);
- fn incr() {
- FLAG.fetch_add(1, Ordering::Relaxed);
- }
-
- let mut bag = Bag::new();
- assert!(bag.is_empty());
-
- for _ in 0..MAX_OBJECTS {
- assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
- assert!(!bag.is_empty());
- assert_eq!(FLAG.load(Ordering::Relaxed), 0);
- }
-
- let result = unsafe { bag.try_push(Deferred::new(incr)) };
- assert!(result.is_err());
- assert!(!bag.is_empty());
- assert_eq!(FLAG.load(Ordering::Relaxed), 0);
-
- drop(bag);
- assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
- }
-}