aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/sleep/mod.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
committerValentin Popov <valentin@popov.link>2024-01-08 00:21:28 +0300
commit1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch)
tree7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/rayon-core/src/sleep/mod.rs
parent5ecd8cf2cba827454317368b68571df0d13d7842 (diff)
downloadfparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz
fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/rayon-core/src/sleep/mod.rs')
-rw-r--r--vendor/rayon-core/src/sleep/mod.rs325
1 files changed, 325 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/sleep/mod.rs b/vendor/rayon-core/src/sleep/mod.rs
new file mode 100644
index 0000000..03d1077
--- /dev/null
+++ b/vendor/rayon-core/src/sleep/mod.rs
@@ -0,0 +1,325 @@
+//! Code that decides when workers should go to sleep. See README.md
+//! for an overview.
+
+use crate::latch::CoreLatch;
+use crossbeam_utils::CachePadded;
+use std::sync::atomic::Ordering;
+use std::sync::{Condvar, Mutex};
+use std::thread;
+use std::usize;
+
+mod counters;
+pub(crate) use self::counters::THREADS_MAX;
+use self::counters::{AtomicCounters, JobsEventCounter};
+
+/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
+/// of workers. It has callbacks that are invoked periodically at significant events,
+/// such as when workers are looping and looking for work, when latches are set, or when
+/// jobs are published, and it either blocks threads or wakes them in response to these
+/// events. See the [`README.md`] in this module for more details.
+///
+/// [`README.md`] README.md
+pub(super) struct Sleep {
+ /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
+ /// them block.
+ worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
+
+ counters: AtomicCounters,
+}
+
+/// An instance of this struct is created when a thread becomes idle.
+/// It is consumed when the thread finds work, and passed by `&mut`
+/// reference for operations that preserve the idle state. (In other
+/// words, producing one of these structs is evidence the thread is
+/// idle.) It tracks state such as how long the thread has been idle.
+pub(super) struct IdleState {
+ /// What is worker index of the idle thread?
+ worker_index: usize,
+
+ /// How many rounds have we been circling without sleeping?
+ rounds: u32,
+
+ /// Once we become sleepy, what was the sleepy counter value?
+ /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
+ jobs_counter: JobsEventCounter,
+}
+
+/// The "sleep state" for an individual worker.
+#[derive(Default)]
+struct WorkerSleepState {
+ /// Set to true when the worker goes to sleep; set to false when
+ /// the worker is notified or when it wakes.
+ is_blocked: Mutex<bool>,
+
+ condvar: Condvar,
+}
+
+const ROUNDS_UNTIL_SLEEPY: u32 = 32;
+const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
+
+impl Sleep {
+ pub(super) fn new(n_threads: usize) -> Sleep {
+ assert!(n_threads <= THREADS_MAX);
+ Sleep {
+ worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
+ counters: AtomicCounters::new(),
+ }
+ }
+
+ #[inline]
+ pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
+ self.counters.add_inactive_thread();
+
+ IdleState {
+ worker_index,
+ rounds: 0,
+ jobs_counter: JobsEventCounter::DUMMY,
+ }
+ }
+
+ #[inline]
+ pub(super) fn work_found(&self) {
+ // If we were the last idle thread and other threads are still sleeping,
+ // then we should wake up another thread.
+ let threads_to_wake = self.counters.sub_inactive_thread();
+ self.wake_any_threads(threads_to_wake as u32);
+ }
+
+ #[inline]
+ pub(super) fn no_work_found(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
+ thread::yield_now();
+ idle_state.rounds += 1;
+ } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
+ idle_state.jobs_counter = self.announce_sleepy();
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else {
+ debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
+ self.sleep(idle_state, latch, has_injected_jobs);
+ }
+ }
+
+ #[cold]
+ fn announce_sleepy(&self) -> JobsEventCounter {
+ self.counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_active)
+ .jobs_counter()
+ }
+
+ #[cold]
+ fn sleep(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ let worker_index = idle_state.worker_index;
+
+ if !latch.get_sleepy() {
+ return;
+ }
+
+ let sleep_state = &self.worker_sleep_states[worker_index];
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ debug_assert!(!*is_blocked);
+
+ // Our latch was signalled. We should wake back up fully as we
+ // will have some stuff to do.
+ if !latch.fall_asleep() {
+ idle_state.wake_fully();
+ return;
+ }
+
+ loop {
+ let counters = self.counters.load(Ordering::SeqCst);
+
+ // Check if the JEC has changed since we got sleepy.
+ debug_assert!(idle_state.jobs_counter.is_sleepy());
+ if counters.jobs_counter() != idle_state.jobs_counter {
+ // JEC has changed, so a new job was posted, but for some reason
+ // we didn't see it. We should return to just before the SLEEPY
+ // state so we can do another search and (if we fail to find
+ // work) go back to sleep.
+ idle_state.wake_partly();
+ latch.wake_up();
+ return;
+ }
+
+ // Otherwise, let's move from IDLE to SLEEPING.
+ if self.counters.try_add_sleeping_thread(counters) {
+ break;
+ }
+ }
+
+ // Successfully registered as asleep.
+
+ // We have one last check for injected jobs to do. This protects against
+ // deadlock in the very unlikely event that
+ //
+ // - an external job is being injected while we are sleepy
+ // - that job triggers the rollover over the JEC such that we don't see it
+ // - we are the last active worker thread
+ std::sync::atomic::fence(Ordering::SeqCst);
+ if has_injected_jobs() {
+ // If we see an externally injected job, then we have to 'wake
+ // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
+ // the one that wakes us.)
+ self.counters.sub_sleeping_thread();
+ } else {
+ // If we don't see an injected job (the normal case), then flag
+ // ourselves as asleep and wait till we are notified.
+ //
+ // (Note that `is_blocked` is held under a mutex and the mutex was
+ // acquired *before* we incremented the "sleepy counter". This means
+ // that whomever is coming to wake us will have to wait until we
+ // release the mutex in the call to `wait`, so they will see this
+ // boolean as true.)
+ *is_blocked = true;
+ while *is_blocked {
+ is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
+ }
+ }
+
+ // Update other state:
+ idle_state.wake_fully();
+ latch.wake_up();
+ }
+
+ /// Notify the given thread that it should wake up (if it is
+ /// sleeping). When this method is invoked, we typically know the
+ /// thread is asleep, though in rare cases it could have been
+ /// awoken by (e.g.) new work having been posted.
+ pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+ self.wake_specific_thread(target_worker_index);
+ }
+
+ /// Signals that `num_jobs` new jobs were injected into the thread
+ /// pool from outside. This function will ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary.
+ ///
+ /// # Parameters
+ ///
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
+ // This fence is needed to guarantee that threads
+ // as they are about to fall asleep, observe any
+ // new jobs that may have been injected.
+ std::sync::atomic::fence(Ordering::SeqCst);
+
+ self.new_jobs(num_jobs, queue_was_empty)
+ }
+
+ /// Signals that `num_jobs` new jobs were pushed onto a thread's
+ /// local deque. This function will try to ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary. However, this is not guaranteed: under certain
+ /// race conditions, the function may fail to wake any new
+ /// threads; in that case the existing thread should eventually
+ /// pop the job.
+ ///
+ /// # Parameters
+ ///
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
+ self.new_jobs(num_jobs, queue_was_empty)
+ }
+
+ /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
+ #[inline]
+ fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
+ // Read the counters and -- if sleepy workers have announced themselves
+ // -- announce that there is now work available. The final value of `counters`
+ // with which we exit the loop thus corresponds to a state when
+ let counters = self
+ .counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
+ let num_awake_but_idle = counters.awake_but_idle_threads();
+ let num_sleepers = counters.sleeping_threads();
+
+ if num_sleepers == 0 {
+ // nobody to wake
+ return;
+ }
+
+ // Promote from u16 to u32 so we can interoperate with
+ // num_jobs more easily.
+ let num_awake_but_idle = num_awake_but_idle as u32;
+ let num_sleepers = num_sleepers as u32;
+
+ // If the queue is non-empty, then we always wake up a worker
+ // -- clearly the existing idle jobs aren't enough. Otherwise,
+ // check to see if we have enough idle workers.
+ if !queue_was_empty {
+ let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ } else if num_awake_but_idle < num_jobs {
+ let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ }
+ }
+
+ #[cold]
+ fn wake_any_threads(&self, mut num_to_wake: u32) {
+ if num_to_wake > 0 {
+ for i in 0..self.worker_sleep_states.len() {
+ if self.wake_specific_thread(i) {
+ num_to_wake -= 1;
+ if num_to_wake == 0 {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ fn wake_specific_thread(&self, index: usize) -> bool {
+ let sleep_state = &self.worker_sleep_states[index];
+
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ if *is_blocked {
+ *is_blocked = false;
+ sleep_state.condvar.notify_one();
+
+ // When the thread went to sleep, it will have incremented
+ // this value. When we wake it, its our job to decrement
+ // it. We could have the thread do it, but that would
+ // introduce a delay between when the thread was
+ // *notified* and when this counter was decremented. That
+ // might mislead people with new work into thinking that
+ // there are sleeping threads that they should try to
+ // wake, when in fact there is nothing left for them to
+ // do.
+ self.counters.sub_sleeping_thread();
+
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl IdleState {
+ fn wake_fully(&mut self) {
+ self.rounds = 0;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+
+ fn wake_partly(&mut self) {
+ self.rounds = ROUNDS_UNTIL_SLEEPY;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+}