use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::usize; use crate::registry::{Registry, WorkerThread}; /// We define various kinds of latches, which are all a primitive signaling /// mechanism. A latch starts as false. Eventually someone calls `set()` and /// it becomes true. You can test if it has been set by calling `probe()`. /// /// Some kinds of latches, but not all, support a `wait()` operation /// that will wait until the latch is set, blocking efficiently. That /// is not part of the trait since it is not possibly to do with all /// latches. /// /// The intention is that `set()` is called once, but `probe()` may be /// called any number of times. Once `probe()` returns true, the memory /// effects that occurred before `set()` become visible. /// /// It'd probably be better to refactor the API into two paired types, /// but that's a bit of work, and this is not a public API. /// /// ## Memory ordering /// /// Latches need to guarantee two things: /// /// - Once `probe()` returns true, all memory effects from the `set()` /// are visible (in other words, the set should synchronize-with /// the probe). /// - Once `set()` occurs, the next `probe()` *will* observe it. This /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. pub(super) trait Latch { /// Set the latch, signalling others. /// /// # WARNING /// /// Setting a latch triggers other threads to wake up and (in some /// cases) complete. This may, in turn, cause memory to be /// deallocated and so forth. One must be very careful about this, /// and it's typically better to read all the fields you will need /// to access *before* a latch is set! /// /// This function operates on `*const Self` instead of `&self` to allow it /// to become dangling during this call. The caller must ensure that the /// pointer is valid upon entry, and not invalidated during the call by any /// actions other than `set` itself. unsafe fn set(this: *const Self); } pub(super) trait AsCoreLatch { fn as_core_latch(&self) -> &CoreLatch; } /// Latch is not set, owning thread is awake const UNSET: usize = 0; /// Latch is not set, owning thread is going to sleep on this latch /// (but has not yet fallen asleep). const SLEEPY: usize = 1; /// Latch is not set, owning thread is asleep on this latch and /// must be awoken. const SLEEPING: usize = 2; /// Latch is set. const SET: usize = 3; /// Spin latches are the simplest, most efficient kind, but they do /// not support a `wait()` operation. They just have a boolean flag /// that becomes true when `set()` is called. #[derive(Debug)] pub(super) struct CoreLatch { state: AtomicUsize, } impl CoreLatch { #[inline] fn new() -> Self { Self { state: AtomicUsize::new(0), } } /// Invoked by owning thread as it prepares to sleep. Returns true /// if the owning thread may proceed to fall asleep, false if the /// latch was set in the meantime. #[inline] pub(super) fn get_sleepy(&self) -> bool { self.state .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) .is_ok() } /// Invoked by owning thread as it falls asleep sleep. Returns /// true if the owning thread should block, or false if the latch /// was set in the meantime. #[inline] pub(super) fn fall_asleep(&self) -> bool { self.state .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) .is_ok() } /// Invoked by owning thread as it falls asleep sleep. Returns /// true if the owning thread should block, or false if the latch /// was set in the meantime. #[inline] pub(super) fn wake_up(&self) { if !self.probe() { let _ = self.state .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); } } /// Set the latch. If this returns true, the owning thread was sleeping /// and must be awoken. /// /// This is private because, typically, setting a latch involves /// doing some wakeups; those are encapsulated in the surrounding /// latch code. #[inline] unsafe fn set(this: *const Self) -> bool { let old_state = (*this).state.swap(SET, Ordering::AcqRel); old_state == SLEEPING } /// Test if this latch has been set. #[inline] pub(super) fn probe(&self) -> bool { self.state.load(Ordering::Acquire) == SET } } impl AsCoreLatch for CoreLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { self } } /// Spin latches are the simplest, most efficient kind, but they do /// not support a `wait()` operation. They just have a boolean flag /// that becomes true when `set()` is called. pub(super) struct SpinLatch<'r> { core_latch: CoreLatch, registry: &'r Arc, target_worker_index: usize, cross: bool, } impl<'r> SpinLatch<'r> { /// Creates a new spin latch that is owned by `thread`. This means /// that `thread` is the only thread that should be blocking on /// this latch -- it also means that when the latch is set, we /// will wake `thread` if it is sleeping. #[inline] pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { SpinLatch { core_latch: CoreLatch::new(), registry: thread.registry(), target_worker_index: thread.index(), cross: false, } } /// Creates a new spin latch for cross-threadpool blocking. Notably, we /// need to make sure the registry is kept alive after setting, so we can /// safely call the notification. #[inline] pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { SpinLatch { cross: true, ..SpinLatch::new(thread) } } #[inline] pub(super) fn probe(&self) -> bool { self.core_latch.probe() } } impl<'r> AsCoreLatch for SpinLatch<'r> { #[inline] fn as_core_latch(&self) -> &CoreLatch { &self.core_latch } } impl<'r> Latch for SpinLatch<'r> { #[inline] unsafe fn set(this: *const Self) { let cross_registry; let registry: &Registry = if (*this).cross { // Ensure the registry stays alive while we notify it. // Otherwise, it would be possible that we set the spin // latch and the other thread sees it and exits, causing // the registry to be deallocated, all before we get a // chance to invoke `registry.notify_worker_latch_is_set`. cross_registry = Arc::clone((*this).registry); &cross_registry } else { // If this is not a "cross-registry" spin-latch, then the // thread which is performing `set` is itself ensuring // that the registry stays alive. However, that doesn't // include this *particular* `Arc` handle if the waiting // thread then exits, so we must completely dereference it. (*this).registry }; let target_worker_index = (*this).target_worker_index; // NOTE: Once we `set`, the target may proceed and invalidate `this`! if CoreLatch::set(&(*this).core_latch) { // Subtle: at this point, we can no longer read from // `self`, because the thread owning this spin latch may // have awoken and deallocated the latch. Therefore, we // only use fields whose values we already read. registry.notify_worker_latch_is_set(target_worker_index); } } } /// A Latch starts as false and eventually becomes true. You can block /// until it becomes true. #[derive(Debug)] pub(super) struct LockLatch { m: Mutex, v: Condvar, } impl LockLatch { #[inline] pub(super) fn new() -> LockLatch { LockLatch { m: Mutex::new(false), v: Condvar::new(), } } /// Block until latch is set, then resets this lock latch so it can be reused again. pub(super) fn wait_and_reset(&self) { let mut guard = self.m.lock().unwrap(); while !*guard { guard = self.v.wait(guard).unwrap(); } *guard = false; } /// Block until latch is set. pub(super) fn wait(&self) { let mut guard = self.m.lock().unwrap(); while !*guard { guard = self.v.wait(guard).unwrap(); } } } impl Latch for LockLatch { #[inline] unsafe fn set(this: *const Self) { let mut guard = (*this).m.lock().unwrap(); *guard = true; (*this).v.notify_all(); } } /// Once latches are used to implement one-time blocking, primarily /// for the termination flag of the threads in the pool. /// /// Note: like a `SpinLatch`, once-latches are always associated with /// some registry that is probing them, which must be tickled when /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a /// reference to that registry. This is because in some cases the /// registry owns the once-latch, and that would create a cycle. So a /// `OnceLatch` must be given a reference to its owning registry when /// it is set. For this reason, it does not implement the `Latch` /// trait (but it doesn't have to, as it is not used in those generic /// contexts). #[derive(Debug)] pub(super) struct OnceLatch { core_latch: CoreLatch, } impl OnceLatch { #[inline] pub(super) fn new() -> OnceLatch { Self { core_latch: CoreLatch::new(), } } /// Set the latch, then tickle the specific worker thread, /// which should be the one that owns this latch. #[inline] pub(super) unsafe fn set_and_tickle_one( this: *const Self, registry: &Registry, target_worker_index: usize, ) { if CoreLatch::set(&(*this).core_latch) { registry.notify_worker_latch_is_set(target_worker_index); } } } impl AsCoreLatch for OnceLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { &self.core_latch } } /// Counting latches are used to implement scopes. They track a /// counter. Unlike other latches, calling `set()` does not /// necessarily make the latch be considered `set()`; instead, it just /// decrements the counter. The latch is only "set" (in the sense that /// `probe()` returns true) once the counter reaches zero. #[derive(Debug)] pub(super) struct CountLatch { counter: AtomicUsize, kind: CountLatchKind, } enum CountLatchKind { /// A latch for scopes created on a rayon thread which will participate in work- /// stealing while it waits for completion. This thread is not necessarily part /// of the same registry as the scope itself! Stealing { latch: CoreLatch, /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool /// with registry B, when a job completes in a thread of registry B, we may /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. /// That means we need a reference to registry A (since at that point we will /// only have a reference to registry B), so we stash it here. registry: Arc, /// The index of the worker to wake in `registry` worker_index: usize, }, /// A latch for scopes created on a non-rayon thread which will block to wait. Blocking { latch: LockLatch }, } impl std::fmt::Debug for CountLatchKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { CountLatchKind::Stealing { latch, .. } => { f.debug_tuple("Stealing").field(latch).finish() } CountLatchKind::Blocking { latch, .. } => { f.debug_tuple("Blocking").field(latch).finish() } } } } impl CountLatch { pub(super) fn new(owner: Option<&WorkerThread>) -> Self { Self::with_count(1, owner) } pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { Self { counter: AtomicUsize::new(count), kind: match owner { Some(owner) => CountLatchKind::Stealing { latch: CoreLatch::new(), registry: Arc::clone(owner.registry()), worker_index: owner.index(), }, None => CountLatchKind::Blocking { latch: LockLatch::new(), }, }, } } #[inline] pub(super) fn increment(&self) { let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); debug_assert!(old_counter != 0); } pub(super) fn wait(&self, owner: Option<&WorkerThread>) { match &self.kind { CountLatchKind::Stealing { latch, registry, worker_index, } => unsafe { let owner = owner.expect("owner thread"); debug_assert_eq!(registry.id(), owner.registry().id()); debug_assert_eq!(*worker_index, owner.index()); owner.wait_until(latch); }, CountLatchKind::Blocking { latch } => latch.wait(), } } } impl Latch for CountLatch { #[inline] unsafe fn set(this: *const Self) { if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { // NOTE: Once we call `set` on the internal `latch`, // the target may proceed and invalidate `this`! match (*this).kind { CountLatchKind::Stealing { ref latch, ref registry, worker_index, } => { let registry = Arc::clone(registry); if CoreLatch::set(latch) { registry.notify_worker_latch_is_set(worker_index); } } CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), } } } } /// `&L` without any implication of `dereferenceable` for `Latch::set` pub(super) struct LatchRef<'a, L> { inner: *const L, marker: PhantomData<&'a L>, } impl LatchRef<'_, L> { pub(super) fn new(inner: &L) -> LatchRef<'_, L> { LatchRef { inner, marker: PhantomData, } } } unsafe impl Sync for LatchRef<'_, L> {} impl Deref for LatchRef<'_, L> { type Target = L; fn deref(&self) -> &L { // SAFETY: if we have &self, the inner latch is still alive unsafe { &*self.inner } } } impl Latch for LatchRef<'_, L> { #[inline] unsafe fn set(this: *const Self) { L::set((*this).inner); } }