diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/rayon-core/src/latch.rs | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/rayon-core/src/latch.rs')
-rw-r--r-- | vendor/rayon-core/src/latch.rs | 460 |
1 files changed, 460 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs new file mode 100644 index 0000000..b0cbbd8 --- /dev/null +++ b/vendor/rayon-core/src/latch.rs @@ -0,0 +1,460 @@ +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::usize; + +use crate::registry::{Registry, WorkerThread}; + +/// We define various kinds of latches, which are all a primitive signaling +/// mechanism. A latch starts as false. Eventually someone calls `set()` and +/// it becomes true. You can test if it has been set by calling `probe()`. +/// +/// Some kinds of latches, but not all, support a `wait()` operation +/// that will wait until the latch is set, blocking efficiently. That +/// is not part of the trait since it is not possibly to do with all +/// latches. +/// +/// The intention is that `set()` is called once, but `probe()` may be +/// called any number of times. Once `probe()` returns true, the memory +/// effects that occurred before `set()` become visible. +/// +/// It'd probably be better to refactor the API into two paired types, +/// but that's a bit of work, and this is not a public API. +/// +/// ## Memory ordering +/// +/// Latches need to guarantee two things: +/// +/// - Once `probe()` returns true, all memory effects from the `set()` +/// are visible (in other words, the set should synchronize-with +/// the probe). +/// - Once `set()` occurs, the next `probe()` *will* observe it. This +/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep +/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. +pub(super) trait Latch { + /// Set the latch, signalling others. + /// + /// # WARNING + /// + /// Setting a latch triggers other threads to wake up and (in some + /// cases) complete. This may, in turn, cause memory to be + /// deallocated and so forth. One must be very careful about this, + /// and it's typically better to read all the fields you will need + /// to access *before* a latch is set! + /// + /// This function operates on `*const Self` instead of `&self` to allow it + /// to become dangling during this call. The caller must ensure that the + /// pointer is valid upon entry, and not invalidated during the call by any + /// actions other than `set` itself. + unsafe fn set(this: *const Self); +} + +pub(super) trait AsCoreLatch { + fn as_core_latch(&self) -> &CoreLatch; +} + +/// Latch is not set, owning thread is awake +const UNSET: usize = 0; + +/// Latch is not set, owning thread is going to sleep on this latch +/// (but has not yet fallen asleep). +const SLEEPY: usize = 1; + +/// Latch is not set, owning thread is asleep on this latch and +/// must be awoken. +const SLEEPING: usize = 2; + +/// Latch is set. +const SET: usize = 3; + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +#[derive(Debug)] +pub(super) struct CoreLatch { + state: AtomicUsize, +} + +impl CoreLatch { + #[inline] + fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// Invoked by owning thread as it prepares to sleep. Returns true + /// if the owning thread may proceed to fall asleep, false if the + /// latch was set in the meantime. + #[inline] + pub(super) fn get_sleepy(&self) -> bool { + self.state + .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn fall_asleep(&self) -> bool { + self.state + .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn wake_up(&self) { + if !self.probe() { + let _ = + self.state + .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); + } + } + + /// Set the latch. If this returns true, the owning thread was sleeping + /// and must be awoken. + /// + /// This is private because, typically, setting a latch involves + /// doing some wakeups; those are encapsulated in the surrounding + /// latch code. + #[inline] + unsafe fn set(this: *const Self) -> bool { + let old_state = (*this).state.swap(SET, Ordering::AcqRel); + old_state == SLEEPING + } + + /// Test if this latch has been set. + #[inline] + pub(super) fn probe(&self) -> bool { + self.state.load(Ordering::Acquire) == SET + } +} + +impl AsCoreLatch for CoreLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + self + } +} + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +pub(super) struct SpinLatch<'r> { + core_latch: CoreLatch, + registry: &'r Arc<Registry>, + target_worker_index: usize, + cross: bool, +} + +impl<'r> SpinLatch<'r> { + /// Creates a new spin latch that is owned by `thread`. This means + /// that `thread` is the only thread that should be blocking on + /// this latch -- it also means that when the latch is set, we + /// will wake `thread` if it is sleeping. + #[inline] + pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + core_latch: CoreLatch::new(), + registry: thread.registry(), + target_worker_index: thread.index(), + cross: false, + } + } + + /// Creates a new spin latch for cross-threadpool blocking. Notably, we + /// need to make sure the registry is kept alive after setting, so we can + /// safely call the notification. + #[inline] + pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + cross: true, + ..SpinLatch::new(thread) + } + } + + #[inline] + pub(super) fn probe(&self) -> bool { + self.core_latch.probe() + } +} + +impl<'r> AsCoreLatch for SpinLatch<'r> { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +impl<'r> Latch for SpinLatch<'r> { + #[inline] + unsafe fn set(this: *const Self) { + let cross_registry; + + let registry: &Registry = if (*this).cross { + // Ensure the registry stays alive while we notify it. + // Otherwise, it would be possible that we set the spin + // latch and the other thread sees it and exits, causing + // the registry to be deallocated, all before we get a + // chance to invoke `registry.notify_worker_latch_is_set`. + cross_registry = Arc::clone((*this).registry); + &cross_registry + } else { + // If this is not a "cross-registry" spin-latch, then the + // thread which is performing `set` is itself ensuring + // that the registry stays alive. However, that doesn't + // include this *particular* `Arc` handle if the waiting + // thread then exits, so we must completely dereference it. + (*this).registry + }; + let target_worker_index = (*this).target_worker_index; + + // NOTE: Once we `set`, the target may proceed and invalidate `this`! + if CoreLatch::set(&(*this).core_latch) { + // Subtle: at this point, we can no longer read from + // `self`, because the thread owning this spin latch may + // have awoken and deallocated the latch. Therefore, we + // only use fields whose values we already read. + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +/// A Latch starts as false and eventually becomes true. You can block +/// until it becomes true. +#[derive(Debug)] +pub(super) struct LockLatch { + m: Mutex<bool>, + v: Condvar, +} + +impl LockLatch { + #[inline] + pub(super) fn new() -> LockLatch { + LockLatch { + m: Mutex::new(false), + v: Condvar::new(), + } + } + + /// Block until latch is set, then resets this lock latch so it can be reused again. + pub(super) fn wait_and_reset(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + *guard = false; + } + + /// Block until latch is set. + pub(super) fn wait(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + } +} + +impl Latch for LockLatch { + #[inline] + unsafe fn set(this: *const Self) { + let mut guard = (*this).m.lock().unwrap(); + *guard = true; + (*this).v.notify_all(); + } +} + +/// Once latches are used to implement one-time blocking, primarily +/// for the termination flag of the threads in the pool. +/// +/// Note: like a `SpinLatch`, once-latches are always associated with +/// some registry that is probing them, which must be tickled when +/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a +/// reference to that registry. This is because in some cases the +/// registry owns the once-latch, and that would create a cycle. So a +/// `OnceLatch` must be given a reference to its owning registry when +/// it is set. For this reason, it does not implement the `Latch` +/// trait (but it doesn't have to, as it is not used in those generic +/// contexts). +#[derive(Debug)] +pub(super) struct OnceLatch { + core_latch: CoreLatch, +} + +impl OnceLatch { + #[inline] + pub(super) fn new() -> OnceLatch { + Self { + core_latch: CoreLatch::new(), + } + } + + /// Set the latch, then tickle the specific worker thread, + /// which should be the one that owns this latch. + #[inline] + pub(super) unsafe fn set_and_tickle_one( + this: *const Self, + registry: &Registry, + target_worker_index: usize, + ) { + if CoreLatch::set(&(*this).core_latch) { + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +impl AsCoreLatch for OnceLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. +#[derive(Debug)] +pub(super) struct CountLatch { + counter: AtomicUsize, + kind: CountLatchKind, +} + +enum CountLatchKind { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CoreLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc<Registry>, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: LockLatch }, +} + +impl std::fmt::Debug for CountLatchKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CountLatchKind::Stealing { latch, .. } => { + f.debug_tuple("Stealing").field(latch).finish() + } + CountLatchKind::Blocking { latch, .. } => { + f.debug_tuple("Blocking").field(latch).finish() + } + } + } +} + +impl CountLatch { + pub(super) fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + Self { + counter: AtomicUsize::new(count), + kind: match owner { + Some(owner) => CountLatchKind::Stealing { + latch: CoreLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => CountLatchKind::Blocking { + latch: LockLatch::new(), + }, + }, + } + } + + #[inline] + pub(super) fn increment(&self) { + let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); + debug_assert!(old_counter != 0); + } + + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match &self.kind { + CountLatchKind::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + CountLatchKind::Blocking { latch } => latch.wait(), + } + } +} + +impl Latch for CountLatch { + #[inline] + unsafe fn set(this: *const Self) { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + // NOTE: Once we call `set` on the internal `latch`, + // the target may proceed and invalidate `this`! + match (*this).kind { + CountLatchKind::Stealing { + ref latch, + ref registry, + worker_index, + } => { + let registry = Arc::clone(registry); + if CoreLatch::set(latch) { + registry.notify_worker_latch_is_set(worker_index); + } + } + CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), + } + } + } +} + +/// `&L` without any implication of `dereferenceable` for `Latch::set` +pub(super) struct LatchRef<'a, L> { + inner: *const L, + marker: PhantomData<&'a L>, +} + +impl<L> LatchRef<'_, L> { + pub(super) fn new(inner: &L) -> LatchRef<'_, L> { + LatchRef { + inner, + marker: PhantomData, + } + } +} + +unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} + +impl<L> Deref for LatchRef<'_, L> { + type Target = L; + + fn deref(&self) -> &L { + // SAFETY: if we have &self, the inner latch is still alive + unsafe { &*self.inner } + } +} + +impl<L: Latch> Latch for LatchRef<'_, L> { + #[inline] + unsafe fn set(this: *const Self) { + L::set((*this).inner); + } +} |