aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/latch.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/rayon-core/src/latch.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/rayon-core/src/latch.rs')
-rw-r--r--vendor/rayon-core/src/latch.rs460
1 files changed, 460 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs
new file mode 100644
index 0000000..b0cbbd8
--- /dev/null
+++ b/vendor/rayon-core/src/latch.rs
@@ -0,0 +1,460 @@
+use std::marker::PhantomData;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Condvar, Mutex};
+use std::usize;
+
+use crate::registry::{Registry, WorkerThread};
+
+/// We define various kinds of latches, which are all a primitive signaling
+/// mechanism. A latch starts as false. Eventually someone calls `set()` and
+/// it becomes true. You can test if it has been set by calling `probe()`.
+///
+/// Some kinds of latches, but not all, support a `wait()` operation
+/// that will wait until the latch is set, blocking efficiently. That
+/// is not part of the trait since it is not possibly to do with all
+/// latches.
+///
+/// The intention is that `set()` is called once, but `probe()` may be
+/// called any number of times. Once `probe()` returns true, the memory
+/// effects that occurred before `set()` become visible.
+///
+/// It'd probably be better to refactor the API into two paired types,
+/// but that's a bit of work, and this is not a public API.
+///
+/// ## Memory ordering
+///
+/// Latches need to guarantee two things:
+///
+/// - Once `probe()` returns true, all memory effects from the `set()`
+/// are visible (in other words, the set should synchronize-with
+/// the probe).
+/// - Once `set()` occurs, the next `probe()` *will* observe it. This
+/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
+/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
+pub(super) trait Latch {
+ /// Set the latch, signalling others.
+ ///
+ /// # WARNING
+ ///
+ /// Setting a latch triggers other threads to wake up and (in some
+ /// cases) complete. This may, in turn, cause memory to be
+ /// deallocated and so forth. One must be very careful about this,
+ /// and it's typically better to read all the fields you will need
+ /// to access *before* a latch is set!
+ ///
+ /// This function operates on `*const Self` instead of `&self` to allow it
+ /// to become dangling during this call. The caller must ensure that the
+ /// pointer is valid upon entry, and not invalidated during the call by any
+ /// actions other than `set` itself.
+ unsafe fn set(this: *const Self);
+}
+
+pub(super) trait AsCoreLatch {
+ fn as_core_latch(&self) -> &CoreLatch;
+}
+
+/// Latch is not set, owning thread is awake
+const UNSET: usize = 0;
+
+/// Latch is not set, owning thread is going to sleep on this latch
+/// (but has not yet fallen asleep).
+const SLEEPY: usize = 1;
+
+/// Latch is not set, owning thread is asleep on this latch and
+/// must be awoken.
+const SLEEPING: usize = 2;
+
+/// Latch is set.
+const SET: usize = 3;
+
+/// Spin latches are the simplest, most efficient kind, but they do
+/// not support a `wait()` operation. They just have a boolean flag
+/// that becomes true when `set()` is called.
+#[derive(Debug)]
+pub(super) struct CoreLatch {
+ state: AtomicUsize,
+}
+
+impl CoreLatch {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ state: AtomicUsize::new(0),
+ }
+ }
+
+ /// Invoked by owning thread as it prepares to sleep. Returns true
+ /// if the owning thread may proceed to fall asleep, false if the
+ /// latch was set in the meantime.
+ #[inline]
+ pub(super) fn get_sleepy(&self) -> bool {
+ self.state
+ .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Invoked by owning thread as it falls asleep sleep. Returns
+ /// true if the owning thread should block, or false if the latch
+ /// was set in the meantime.
+ #[inline]
+ pub(super) fn fall_asleep(&self) -> bool {
+ self.state
+ .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Invoked by owning thread as it falls asleep sleep. Returns
+ /// true if the owning thread should block, or false if the latch
+ /// was set in the meantime.
+ #[inline]
+ pub(super) fn wake_up(&self) {
+ if !self.probe() {
+ let _ =
+ self.state
+ .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
+ }
+ }
+
+ /// Set the latch. If this returns true, the owning thread was sleeping
+ /// and must be awoken.
+ ///
+ /// This is private because, typically, setting a latch involves
+ /// doing some wakeups; those are encapsulated in the surrounding
+ /// latch code.
+ #[inline]
+ unsafe fn set(this: *const Self) -> bool {
+ let old_state = (*this).state.swap(SET, Ordering::AcqRel);
+ old_state == SLEEPING
+ }
+
+ /// Test if this latch has been set.
+ #[inline]
+ pub(super) fn probe(&self) -> bool {
+ self.state.load(Ordering::Acquire) == SET
+ }
+}
+
+impl AsCoreLatch for CoreLatch {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ self
+ }
+}
+
+/// Spin latches are the simplest, most efficient kind, but they do
+/// not support a `wait()` operation. They just have a boolean flag
+/// that becomes true when `set()` is called.
+pub(super) struct SpinLatch<'r> {
+ core_latch: CoreLatch,
+ registry: &'r Arc<Registry>,
+ target_worker_index: usize,
+ cross: bool,
+}
+
+impl<'r> SpinLatch<'r> {
+ /// Creates a new spin latch that is owned by `thread`. This means
+ /// that `thread` is the only thread that should be blocking on
+ /// this latch -- it also means that when the latch is set, we
+ /// will wake `thread` if it is sleeping.
+ #[inline]
+ pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
+ SpinLatch {
+ core_latch: CoreLatch::new(),
+ registry: thread.registry(),
+ target_worker_index: thread.index(),
+ cross: false,
+ }
+ }
+
+ /// Creates a new spin latch for cross-threadpool blocking. Notably, we
+ /// need to make sure the registry is kept alive after setting, so we can
+ /// safely call the notification.
+ #[inline]
+ pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
+ SpinLatch {
+ cross: true,
+ ..SpinLatch::new(thread)
+ }
+ }
+
+ #[inline]
+ pub(super) fn probe(&self) -> bool {
+ self.core_latch.probe()
+ }
+}
+
+impl<'r> AsCoreLatch for SpinLatch<'r> {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ &self.core_latch
+ }
+}
+
+impl<'r> Latch for SpinLatch<'r> {
+ #[inline]
+ unsafe fn set(this: *const Self) {
+ let cross_registry;
+
+ let registry: &Registry = if (*this).cross {
+ // Ensure the registry stays alive while we notify it.
+ // Otherwise, it would be possible that we set the spin
+ // latch and the other thread sees it and exits, causing
+ // the registry to be deallocated, all before we get a
+ // chance to invoke `registry.notify_worker_latch_is_set`.
+ cross_registry = Arc::clone((*this).registry);
+ &cross_registry
+ } else {
+ // If this is not a "cross-registry" spin-latch, then the
+ // thread which is performing `set` is itself ensuring
+ // that the registry stays alive. However, that doesn't
+ // include this *particular* `Arc` handle if the waiting
+ // thread then exits, so we must completely dereference it.
+ (*this).registry
+ };
+ let target_worker_index = (*this).target_worker_index;
+
+ // NOTE: Once we `set`, the target may proceed and invalidate `this`!
+ if CoreLatch::set(&(*this).core_latch) {
+ // Subtle: at this point, we can no longer read from
+ // `self`, because the thread owning this spin latch may
+ // have awoken and deallocated the latch. Therefore, we
+ // only use fields whose values we already read.
+ registry.notify_worker_latch_is_set(target_worker_index);
+ }
+ }
+}
+
+/// A Latch starts as false and eventually becomes true. You can block
+/// until it becomes true.
+#[derive(Debug)]
+pub(super) struct LockLatch {
+ m: Mutex<bool>,
+ v: Condvar,
+}
+
+impl LockLatch {
+ #[inline]
+ pub(super) fn new() -> LockLatch {
+ LockLatch {
+ m: Mutex::new(false),
+ v: Condvar::new(),
+ }
+ }
+
+ /// Block until latch is set, then resets this lock latch so it can be reused again.
+ pub(super) fn wait_and_reset(&self) {
+ let mut guard = self.m.lock().unwrap();
+ while !*guard {
+ guard = self.v.wait(guard).unwrap();
+ }
+ *guard = false;
+ }
+
+ /// Block until latch is set.
+ pub(super) fn wait(&self) {
+ let mut guard = self.m.lock().unwrap();
+ while !*guard {
+ guard = self.v.wait(guard).unwrap();
+ }
+ }
+}
+
+impl Latch for LockLatch {
+ #[inline]
+ unsafe fn set(this: *const Self) {
+ let mut guard = (*this).m.lock().unwrap();
+ *guard = true;
+ (*this).v.notify_all();
+ }
+}
+
+/// Once latches are used to implement one-time blocking, primarily
+/// for the termination flag of the threads in the pool.
+///
+/// Note: like a `SpinLatch`, once-latches are always associated with
+/// some registry that is probing them, which must be tickled when
+/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
+/// reference to that registry. This is because in some cases the
+/// registry owns the once-latch, and that would create a cycle. So a
+/// `OnceLatch` must be given a reference to its owning registry when
+/// it is set. For this reason, it does not implement the `Latch`
+/// trait (but it doesn't have to, as it is not used in those generic
+/// contexts).
+#[derive(Debug)]
+pub(super) struct OnceLatch {
+ core_latch: CoreLatch,
+}
+
+impl OnceLatch {
+ #[inline]
+ pub(super) fn new() -> OnceLatch {
+ Self {
+ core_latch: CoreLatch::new(),
+ }
+ }
+
+ /// Set the latch, then tickle the specific worker thread,
+ /// which should be the one that owns this latch.
+ #[inline]
+ pub(super) unsafe fn set_and_tickle_one(
+ this: *const Self,
+ registry: &Registry,
+ target_worker_index: usize,
+ ) {
+ if CoreLatch::set(&(*this).core_latch) {
+ registry.notify_worker_latch_is_set(target_worker_index);
+ }
+ }
+}
+
+impl AsCoreLatch for OnceLatch {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ &self.core_latch
+ }
+}
+
+/// Counting latches are used to implement scopes. They track a
+/// counter. Unlike other latches, calling `set()` does not
+/// necessarily make the latch be considered `set()`; instead, it just
+/// decrements the counter. The latch is only "set" (in the sense that
+/// `probe()` returns true) once the counter reaches zero.
+#[derive(Debug)]
+pub(super) struct CountLatch {
+ counter: AtomicUsize,
+ kind: CountLatchKind,
+}
+
+enum CountLatchKind {
+ /// A latch for scopes created on a rayon thread which will participate in work-
+ /// stealing while it waits for completion. This thread is not necessarily part
+ /// of the same registry as the scope itself!
+ Stealing {
+ latch: CoreLatch,
+ /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
+ /// with registry B, when a job completes in a thread of registry B, we may
+ /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
+ /// That means we need a reference to registry A (since at that point we will
+ /// only have a reference to registry B), so we stash it here.
+ registry: Arc<Registry>,
+ /// The index of the worker to wake in `registry`
+ worker_index: usize,
+ },
+
+ /// A latch for scopes created on a non-rayon thread which will block to wait.
+ Blocking { latch: LockLatch },
+}
+
+impl std::fmt::Debug for CountLatchKind {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ CountLatchKind::Stealing { latch, .. } => {
+ f.debug_tuple("Stealing").field(latch).finish()
+ }
+ CountLatchKind::Blocking { latch, .. } => {
+ f.debug_tuple("Blocking").field(latch).finish()
+ }
+ }
+ }
+}
+
+impl CountLatch {
+ pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
+ Self {
+ counter: AtomicUsize::new(count),
+ kind: match owner {
+ Some(owner) => CountLatchKind::Stealing {
+ latch: CoreLatch::new(),
+ registry: Arc::clone(owner.registry()),
+ worker_index: owner.index(),
+ },
+ None => CountLatchKind::Blocking {
+ latch: LockLatch::new(),
+ },
+ },
+ }
+ }
+
+ #[inline]
+ pub(super) fn increment(&self) {
+ let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
+ debug_assert!(old_counter != 0);
+ }
+
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
+ match &self.kind {
+ CountLatchKind::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => unsafe {
+ let owner = owner.expect("owner thread");
+ debug_assert_eq!(registry.id(), owner.registry().id());
+ debug_assert_eq!(*worker_index, owner.index());
+ owner.wait_until(latch);
+ },
+ CountLatchKind::Blocking { latch } => latch.wait(),
+ }
+ }
+}
+
+impl Latch for CountLatch {
+ #[inline]
+ unsafe fn set(this: *const Self) {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ // NOTE: Once we call `set` on the internal `latch`,
+ // the target may proceed and invalidate `this`!
+ match (*this).kind {
+ CountLatchKind::Stealing {
+ ref latch,
+ ref registry,
+ worker_index,
+ } => {
+ let registry = Arc::clone(registry);
+ if CoreLatch::set(latch) {
+ registry.notify_worker_latch_is_set(worker_index);
+ }
+ }
+ CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
+ }
+ }
+ }
+}
+
+/// `&L` without any implication of `dereferenceable` for `Latch::set`
+pub(super) struct LatchRef<'a, L> {
+ inner: *const L,
+ marker: PhantomData<&'a L>,
+}
+
+impl<L> LatchRef<'_, L> {
+ pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
+ LatchRef {
+ inner,
+ marker: PhantomData,
+ }
+ }
+}
+
+unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
+
+impl<L> Deref for LatchRef<'_, L> {
+ type Target = L;
+
+ fn deref(&self) -> &L {
+ // SAFETY: if we have &self, the inner latch is still alive
+ unsafe { &*self.inner }
+ }
+}
+
+impl<L: Latch> Latch for LatchRef<'_, L> {
+ #[inline]
+ unsafe fn set(this: *const Self) {
+ L::set((*this).inner);
+ }
+}