aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/registry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/registry.rs')
-rw-r--r--vendor/rayon-core/src/registry.rs995
1 files changed, 0 insertions, 995 deletions
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs
deleted file mode 100644
index e4f2ac7..0000000
--- a/vendor/rayon-core/src/registry.rs
+++ /dev/null
@@ -1,995 +0,0 @@
-use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
-use crate::sleep::Sleep;
-use crate::unwind;
-use crate::{
- ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
- Yield,
-};
-use crossbeam_deque::{Injector, Steal, Stealer, Worker};
-use std::cell::Cell;
-use std::collections::hash_map::DefaultHasher;
-use std::fmt;
-use std::hash::Hasher;
-use std::io;
-use std::mem;
-use std::ptr;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex, Once};
-use std::thread;
-use std::usize;
-
-/// Thread builder used for customization via
-/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
-pub struct ThreadBuilder {
- name: Option<String>,
- stack_size: Option<usize>,
- worker: Worker<JobRef>,
- stealer: Stealer<JobRef>,
- registry: Arc<Registry>,
- index: usize,
-}
-
-impl ThreadBuilder {
- /// Gets the index of this thread in the pool, within `0..num_threads`.
- pub fn index(&self) -> usize {
- self.index
- }
-
- /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
- pub fn name(&self) -> Option<&str> {
- self.name.as_deref()
- }
-
- /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
- pub fn stack_size(&self) -> Option<usize> {
- self.stack_size
- }
-
- /// Executes the main loop for this thread. This will not return until the
- /// thread pool is dropped.
- pub fn run(self) {
- unsafe { main_loop(self) }
- }
-}
-
-impl fmt::Debug for ThreadBuilder {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("ThreadBuilder")
- .field("pool", &self.registry.id())
- .field("index", &self.index)
- .field("name", &self.name)
- .field("stack_size", &self.stack_size)
- .finish()
- }
-}
-
-/// Generalized trait for spawning a thread in the `Registry`.
-///
-/// This trait is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-pub trait ThreadSpawn {
- private_decl! {}
-
- /// Spawn a thread with the `ThreadBuilder` parameters, and then
- /// call `ThreadBuilder::run()`.
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
-}
-
-/// Spawns a thread in the "normal" way with `std::thread::Builder`.
-///
-/// This type is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-#[derive(Debug, Default)]
-pub struct DefaultSpawn;
-
-impl ThreadSpawn for DefaultSpawn {
- private_impl! {}
-
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
- let mut b = thread::Builder::new();
- if let Some(name) = thread.name() {
- b = b.name(name.to_owned());
- }
- if let Some(stack_size) = thread.stack_size() {
- b = b.stack_size(stack_size);
- }
- b.spawn(|| thread.run())?;
- Ok(())
- }
-}
-
-/// Spawns a thread with a user's custom callback.
-///
-/// This type is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-#[derive(Debug)]
-pub struct CustomSpawn<F>(F);
-
-impl<F> CustomSpawn<F>
-where
- F: FnMut(ThreadBuilder) -> io::Result<()>,
-{
- pub(super) fn new(spawn: F) -> Self {
- CustomSpawn(spawn)
- }
-}
-
-impl<F> ThreadSpawn for CustomSpawn<F>
-where
- F: FnMut(ThreadBuilder) -> io::Result<()>,
-{
- private_impl! {}
-
- #[inline]
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
- (self.0)(thread)
- }
-}
-
-pub(super) struct Registry {
- thread_infos: Vec<ThreadInfo>,
- sleep: Sleep,
- injected_jobs: Injector<JobRef>,
- broadcasts: Mutex<Vec<Worker<JobRef>>>,
- panic_handler: Option<Box<PanicHandler>>,
- start_handler: Option<Box<StartHandler>>,
- exit_handler: Option<Box<ExitHandler>>,
-
- // When this latch reaches 0, it means that all work on this
- // registry must be complete. This is ensured in the following ways:
- //
- // - if this is the global registry, there is a ref-count that never
- // gets released.
- // - if this is a user-created thread-pool, then so long as the thread-pool
- // exists, it holds a reference.
- // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
- // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
- // return until the blocking job is complete, that ref will continue to be held.
- // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
- // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
- // and that job will keep the pool alive.
- terminate_count: AtomicUsize,
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-/// Initialization
-
-static mut THE_REGISTRY: Option<Arc<Registry>> = None;
-static THE_REGISTRY_SET: Once = Once::new();
-
-/// Starts the worker threads (if that has not already happened). If
-/// initialization has not already occurred, use the default
-/// configuration.
-pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(default_global_registry)
- .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
- .expect("The global thread pool has not been initialized.")
-}
-
-/// Starts the worker threads (if that has not already happened) with
-/// the given builder.
-pub(super) fn init_global_registry<S>(
- builder: ThreadPoolBuilder<S>,
-) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
-where
- S: ThreadSpawn,
-{
- set_global_registry(|| Registry::new(builder))
-}
-
-/// Starts the worker threads (if that has not already happened)
-/// by creating a registry with the given callback.
-fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
-where
- F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
-{
- let mut result = Err(ThreadPoolBuildError::new(
- ErrorKind::GlobalPoolAlreadyInitialized,
- ));
-
- THE_REGISTRY_SET.call_once(|| {
- result = registry()
- .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
- });
-
- result
-}
-
-fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
- let result = Registry::new(ThreadPoolBuilder::new());
-
- // If we're running in an environment that doesn't support threads at all, we can fall back to
- // using the current thread alone. This is crude, and probably won't work for non-blocking
- // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
- //
- // Notably, this allows current WebAssembly targets to work even though their threading support
- // is stubbed out, and we won't have to change anything if they do add real threading.
- let unsupported = matches!(&result, Err(e) if e.is_unsupported());
- if unsupported && WorkerThread::current().is_null() {
- let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread();
- let fallback_result = Registry::new(builder);
- if fallback_result.is_ok() {
- return fallback_result;
- }
- }
-
- result
-}
-
-struct Terminator<'a>(&'a Arc<Registry>);
-
-impl<'a> Drop for Terminator<'a> {
- fn drop(&mut self) {
- self.0.terminate()
- }
-}
-
-impl Registry {
- pub(super) fn new<S>(
- mut builder: ThreadPoolBuilder<S>,
- ) -> Result<Arc<Self>, ThreadPoolBuildError>
- where
- S: ThreadSpawn,
- {
- // Soft-limit the number of threads that we can actually support.
- let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
-
- let breadth_first = builder.get_breadth_first();
-
- let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
- .map(|_| {
- let worker = if breadth_first {
- Worker::new_fifo()
- } else {
- Worker::new_lifo()
- };
-
- let stealer = worker.stealer();
- (worker, stealer)
- })
- .unzip();
-
- let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
- .map(|_| {
- let worker = Worker::new_fifo();
- let stealer = worker.stealer();
- (worker, stealer)
- })
- .unzip();
-
- let registry = Arc::new(Registry {
- thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
- sleep: Sleep::new(n_threads),
- injected_jobs: Injector::new(),
- broadcasts: Mutex::new(broadcasts),
- terminate_count: AtomicUsize::new(1),
- panic_handler: builder.take_panic_handler(),
- start_handler: builder.take_start_handler(),
- exit_handler: builder.take_exit_handler(),
- });
-
- // If we return early or panic, make sure to terminate existing threads.
- let t1000 = Terminator(&registry);
-
- for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
- let thread = ThreadBuilder {
- name: builder.get_thread_name(index),
- stack_size: builder.get_stack_size(),
- registry: Arc::clone(&registry),
- worker,
- stealer,
- index,
- };
-
- if index == 0 && builder.use_current_thread {
- if !WorkerThread::current().is_null() {
- return Err(ThreadPoolBuildError::new(
- ErrorKind::CurrentThreadAlreadyInPool,
- ));
- }
- // Rather than starting a new thread, we're just taking over the current thread
- // *without* running the main loop, so we can still return from here.
- // The WorkerThread is leaked, but we never shutdown the global pool anyway.
- let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));
-
- unsafe {
- WorkerThread::set_current(worker_thread);
- Latch::set(&registry.thread_infos[index].primed);
- }
- continue;
- }
-
- if let Err(e) = builder.get_spawn_handler().spawn(thread) {
- return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
- }
- }
-
- // Returning normally now, without termination.
- mem::forget(t1000);
-
- Ok(registry)
- }
-
- pub(super) fn current() -> Arc<Registry> {
- unsafe {
- let worker_thread = WorkerThread::current();
- let registry = if worker_thread.is_null() {
- global_registry()
- } else {
- &(*worker_thread).registry
- };
- Arc::clone(registry)
- }
- }
-
- /// Returns the number of threads in the current registry. This
- /// is better than `Registry::current().num_threads()` because it
- /// avoids incrementing the `Arc`.
- pub(super) fn current_num_threads() -> usize {
- unsafe {
- let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- global_registry().num_threads()
- } else {
- (*worker_thread).registry.num_threads()
- }
- }
- }
-
- /// Returns the current `WorkerThread` if it's part of this `Registry`.
- pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
- unsafe {
- let worker = WorkerThread::current().as_ref()?;
- if worker.registry().id() == self.id() {
- Some(worker)
- } else {
- None
- }
- }
- }
-
- /// Returns an opaque identifier for this registry.
- pub(super) fn id(&self) -> RegistryId {
- // We can rely on `self` not to change since we only ever create
- // registries that are boxed up in an `Arc` (see `new()` above).
- RegistryId {
- addr: self as *const Self as usize,
- }
- }
-
- pub(super) fn num_threads(&self) -> usize {
- self.thread_infos.len()
- }
-
- pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
- if let Err(err) = unwind::halt_unwinding(f) {
- // If there is no handler, or if that handler itself panics, then we abort.
- let abort_guard = unwind::AbortIfPanic;
- if let Some(ref handler) = self.panic_handler {
- handler(err);
- mem::forget(abort_guard);
- }
- }
- }
-
- /// Waits for the worker threads to get up and running. This is
- /// meant to be used for benchmarking purposes, primarily, so that
- /// you can get more consistent numbers by having everything
- /// "ready to go".
- pub(super) fn wait_until_primed(&self) {
- for info in &self.thread_infos {
- info.primed.wait();
- }
- }
-
- /// Waits for the worker threads to stop. This is used for testing
- /// -- so we can check that termination actually works.
- #[cfg(test)]
- pub(super) fn wait_until_stopped(&self) {
- for info in &self.thread_infos {
- info.stopped.wait();
- }
- }
-
- /// ////////////////////////////////////////////////////////////////////////
- /// MAIN LOOP
- ///
- /// So long as all of the worker threads are hanging out in their
- /// top-level loop, there is no work to be done.
-
- /// Push a job into the given `registry`. If we are running on a
- /// worker thread for the registry, this will push onto the
- /// deque. Else, it will inject from the outside (which is slower).
- pub(super) fn inject_or_push(&self, job_ref: JobRef) {
- let worker_thread = WorkerThread::current();
- unsafe {
- if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
- (*worker_thread).push(job_ref);
- } else {
- self.inject(job_ref);
- }
- }
- }
-
- /// Push a job into the "external jobs" queue; it will be taken by
- /// whatever worker has nothing to do. Use this if you know that
- /// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_job: JobRef) {
- // It should not be possible for `state.terminate` to be true
- // here. It is only set to true when the user creates (and
- // drops) a `ThreadPool`; and, in that case, they cannot be
- // calling `inject()` later, since they dropped their
- // `ThreadPool`.
- debug_assert_ne!(
- self.terminate_count.load(Ordering::Acquire),
- 0,
- "inject() sees state.terminate as true"
- );
-
- let queue_was_empty = self.injected_jobs.is_empty();
-
- self.injected_jobs.push(injected_job);
- self.sleep.new_injected_jobs(1, queue_was_empty);
- }
-
- fn has_injected_job(&self) -> bool {
- !self.injected_jobs.is_empty()
- }
-
- fn pop_injected_job(&self) -> Option<JobRef> {
- loop {
- match self.injected_jobs.steal() {
- Steal::Success(job) => return Some(job),
- Steal::Empty => return None,
- Steal::Retry => {}
- }
- }
- }
-
- /// Push a job into each thread's own "external jobs" queue; it will be
- /// executed only on that thread, when it has nothing else to do locally,
- /// before it tries to steal other work.
- ///
- /// **Panics** if not given exactly as many jobs as there are threads.
- pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
- assert_eq!(self.num_threads(), injected_jobs.len());
- {
- let broadcasts = self.broadcasts.lock().unwrap();
-
- // It should not be possible for `state.terminate` to be true
- // here. It is only set to true when the user creates (and
- // drops) a `ThreadPool`; and, in that case, they cannot be
- // calling `inject_broadcast()` later, since they dropped their
- // `ThreadPool`.
- debug_assert_ne!(
- self.terminate_count.load(Ordering::Acquire),
- 0,
- "inject_broadcast() sees state.terminate as true"
- );
-
- assert_eq!(broadcasts.len(), injected_jobs.len());
- for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
- worker.push(job_ref);
- }
- }
- for i in 0..self.num_threads() {
- self.sleep.notify_worker_latch_is_set(i);
- }
- }
-
- /// If already in a worker-thread of this registry, just execute `op`.
- /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
- /// completes and return its return value. If `op` panics, that panic will
- /// be propagated as well. The second argument indicates `true` if injection
- /// was performed, `false` if executed directly.
- pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- unsafe {
- let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- self.in_worker_cold(op)
- } else if (*worker_thread).registry().id() != self.id() {
- self.in_worker_cross(&*worker_thread, op)
- } else {
- // Perfectly valid to give them a `&T`: this is the
- // current thread, so we know the data structure won't be
- // invalidated until we return.
- op(&*worker_thread, false)
- }
- }
- }
-
- #[cold]
- unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
-
- LOCK_LATCH.with(|l| {
- // This thread isn't a member of *any* thread pool, so just block.
- debug_assert!(WorkerThread::current().is_null());
- let job = StackJob::new(
- |injected| {
- let worker_thread = WorkerThread::current();
- assert!(injected && !worker_thread.is_null());
- op(&*worker_thread, true)
- },
- LatchRef::new(l),
- );
- self.inject(job.as_job_ref());
- job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
-
- job.into_result()
- })
- }
-
- #[cold]
- unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- // This thread is a member of a different pool, so let it process
- // other work while waiting for this `op` to complete.
- debug_assert!(current_thread.registry().id() != self.id());
- let latch = SpinLatch::cross(current_thread);
- let job = StackJob::new(
- |injected| {
- let worker_thread = WorkerThread::current();
- assert!(injected && !worker_thread.is_null());
- op(&*worker_thread, true)
- },
- latch,
- );
- self.inject(job.as_job_ref());
- current_thread.wait_until(&job.latch);
- job.into_result()
- }
-
- /// Increments the terminate counter. This increment should be
- /// balanced by a call to `terminate`, which will decrement. This
- /// is used when spawning asynchronous work, which needs to
- /// prevent the registry from terminating so long as it is active.
- ///
- /// Note that blocking functions such as `join` and `scope` do not
- /// need to concern themselves with this fn; their context is
- /// responsible for ensuring the current thread-pool will not
- /// terminate until they return.
- ///
- /// The global thread-pool always has an outstanding reference
- /// (the initial one). Custom thread-pools have one outstanding
- /// reference that is dropped when the `ThreadPool` is dropped:
- /// since installing the thread-pool blocks until any joins/scopes
- /// complete, this ensures that joins/scopes are covered.
- ///
- /// The exception is `::spawn()`, which can create a job outside
- /// of any blocking scope. In that case, the job itself holds a
- /// terminate count and is responsible for invoking `terminate()`
- /// when finished.
- pub(super) fn increment_terminate_count(&self) {
- let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
- debug_assert!(previous != 0, "registry ref count incremented from zero");
- assert!(
- previous != std::usize::MAX,
- "overflow in registry ref count"
- );
- }
-
- /// Signals that the thread-pool which owns this registry has been
- /// dropped. The worker threads will gradually terminate, once any
- /// extant work is completed.
- pub(super) fn terminate(&self) {
- if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
- for (i, thread_info) in self.thread_infos.iter().enumerate() {
- unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
- }
- }
- }
-
- /// Notify the worker that the latch they are sleeping on has been "set".
- pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
- self.sleep.notify_worker_latch_is_set(target_worker_index);
- }
-}
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
-pub(super) struct RegistryId {
- addr: usize,
-}
-
-struct ThreadInfo {
- /// Latch set once thread has started and we are entering into the
- /// main loop. Used to wait for worker threads to become primed,
- /// primarily of interest for benchmarking.
- primed: LockLatch,
-
- /// Latch is set once worker thread has completed. Used to wait
- /// until workers have stopped; only used for tests.
- stopped: LockLatch,
-
- /// The latch used to signal that terminated has been requested.
- /// This latch is *set* by the `terminate` method on the
- /// `Registry`, once the registry's main "terminate" counter
- /// reaches zero.
- terminate: OnceLatch,
-
- /// the "stealer" half of the worker's deque
- stealer: Stealer<JobRef>,
-}
-
-impl ThreadInfo {
- fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
- ThreadInfo {
- primed: LockLatch::new(),
- stopped: LockLatch::new(),
- terminate: OnceLatch::new(),
- stealer,
- }
- }
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-/// WorkerThread identifiers
-
-pub(super) struct WorkerThread {
- /// the "worker" half of our local deque
- worker: Worker<JobRef>,
-
- /// the "stealer" half of the worker's broadcast deque
- stealer: Stealer<JobRef>,
-
- /// local queue used for `spawn_fifo` indirection
- fifo: JobFifo,
-
- index: usize,
-
- /// A weak random number generator.
- rng: XorShift64Star,
-
- registry: Arc<Registry>,
-}
-
-// This is a bit sketchy, but basically: the WorkerThread is
-// allocated on the stack of the worker on entry and stored into this
-// thread local variable. So it will remain valid at least until the
-// worker is fully unwound. Using an unsafe pointer avoids the need
-// for a RefCell<T> etc.
-thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
-}
-
-impl From<ThreadBuilder> for WorkerThread {
- fn from(thread: ThreadBuilder) -> Self {
- Self {
- worker: thread.worker,
- stealer: thread.stealer,
- fifo: JobFifo::new(),
- index: thread.index,
- rng: XorShift64Star::new(),
- registry: thread.registry,
- }
- }
-}
-
-impl Drop for WorkerThread {
- fn drop(&mut self) {
- // Undo `set_current`
- WORKER_THREAD_STATE.with(|t| {
- assert!(t.get().eq(&(self as *const _)));
- t.set(ptr::null());
- });
- }
-}
-
-impl WorkerThread {
- /// Gets the `WorkerThread` index for the current thread; returns
- /// NULL if this is not a worker thread. This pointer is valid
- /// anywhere on the current thread.
- #[inline]
- pub(super) fn current() -> *const WorkerThread {
- WORKER_THREAD_STATE.with(Cell::get)
- }
-
- /// Sets `self` as the worker thread index for the current thread.
- /// This is done during worker thread startup.
- unsafe fn set_current(thread: *const WorkerThread) {
- WORKER_THREAD_STATE.with(|t| {
- assert!(t.get().is_null());
- t.set(thread);
- });
- }
-
- /// Returns the registry that owns this worker thread.
- #[inline]
- pub(super) fn registry(&self) -> &Arc<Registry> {
- &self.registry
- }
-
- /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
- #[inline]
- pub(super) fn index(&self) -> usize {
- self.index
- }
-
- #[inline]
- pub(super) unsafe fn push(&self, job: JobRef) {
- let queue_was_empty = self.worker.is_empty();
- self.worker.push(job);
- self.registry.sleep.new_internal_jobs(1, queue_was_empty);
- }
-
- #[inline]
- pub(super) unsafe fn push_fifo(&self, job: JobRef) {
- self.push(self.fifo.push(job));
- }
-
- #[inline]
- pub(super) fn local_deque_is_empty(&self) -> bool {
- self.worker.is_empty()
- }
-
- /// Attempts to obtain a "local" job -- typically this means
- /// popping from the top of the stack, though if we are configured
- /// for breadth-first execution, it would mean dequeuing from the
- /// bottom.
- #[inline]
- pub(super) fn take_local_job(&self) -> Option<JobRef> {
- let popped_job = self.worker.pop();
-
- if popped_job.is_some() {
- return popped_job;
- }
-
- loop {
- match self.stealer.steal() {
- Steal::Success(job) => return Some(job),
- Steal::Empty => return None,
- Steal::Retry => {}
- }
- }
- }
-
- fn has_injected_job(&self) -> bool {
- !self.stealer.is_empty() || self.registry.has_injected_job()
- }
-
- /// Wait until the latch is set. Try to keep busy by popping and
- /// stealing tasks as necessary.
- #[inline]
- pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
- let latch = latch.as_core_latch();
- if !latch.probe() {
- self.wait_until_cold(latch);
- }
- }
-
- #[cold]
- unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
- // the code below should swallow all panics and hence never
- // unwind; but if something does wrong, we want to abort,
- // because otherwise other code in rayon may assume that the
- // latch has been signaled, and that can lead to random memory
- // accesses, which would be *very bad*
- let abort_guard = unwind::AbortIfPanic;
-
- 'outer: while !latch.probe() {
- // Check for local work *before* we start marking ourself idle,
- // especially to avoid modifying shared sleep state.
- if let Some(job) = self.take_local_job() {
- self.execute(job);
- continue;
- }
-
- let mut idle_state = self.registry.sleep.start_looking(self.index);
- while !latch.probe() {
- if let Some(job) = self.find_work() {
- self.registry.sleep.work_found();
- self.execute(job);
- // The job might have injected local work, so go back to the outer loop.
- continue 'outer;
- } else {
- self.registry
- .sleep
- .no_work_found(&mut idle_state, latch, || self.has_injected_job())
- }
- }
-
- // If we were sleepy, we are not anymore. We "found work" --
- // whatever the surrounding thread was doing before it had to wait.
- self.registry.sleep.work_found();
- break;
- }
-
- mem::forget(abort_guard); // successful execution, do not abort
- }
-
- unsafe fn wait_until_out_of_work(&self) {
- debug_assert_eq!(self as *const _, WorkerThread::current());
- let registry = &*self.registry;
- let index = self.index;
-
- self.wait_until(&registry.thread_infos[index].terminate);
-
- // Should not be any work left in our queue.
- debug_assert!(self.take_local_job().is_none());
-
- // Let registry know we are done
- Latch::set(&registry.thread_infos[index].stopped);
- }
-
- fn find_work(&self) -> Option<JobRef> {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- self.take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job())
- }
-
- pub(super) fn yield_now(&self) -> Yield {
- match self.find_work() {
- Some(job) => unsafe {
- self.execute(job);
- Yield::Executed
- },
- None => Yield::Idle,
- }
- }
-
- pub(super) fn yield_local(&self) -> Yield {
- match self.take_local_job() {
- Some(job) => unsafe {
- self.execute(job);
- Yield::Executed
- },
- None => Yield::Idle,
- }
- }
-
- #[inline]
- pub(super) unsafe fn execute(&self, job: JobRef) {
- job.execute();
- }
-
- /// Try to steal a single job and return it.
- ///
- /// This should only be done as a last resort, when there is no
- /// local work to do.
- fn steal(&self) -> Option<JobRef> {
- // we only steal when we don't have any work to do locally
- debug_assert!(self.local_deque_is_empty());
-
- // otherwise, try to steal
- let thread_infos = &self.registry.thread_infos.as_slice();
- let num_threads = thread_infos.len();
- if num_threads <= 1 {
- return None;
- }
-
- loop {
- let mut retry = false;
- let start = self.rng.next_usize(num_threads);
- let job = (start..num_threads)
- .chain(0..start)
- .filter(move |&i| i != self.index)
- .find_map(|victim_index| {
- let victim = &thread_infos[victim_index];
- match victim.stealer.steal() {
- Steal::Success(job) => Some(job),
- Steal::Empty => None,
- Steal::Retry => {
- retry = true;
- None
- }
- }
- });
- if job.is_some() || !retry {
- return job;
- }
- }
- }
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-
-unsafe fn main_loop(thread: ThreadBuilder) {
- let worker_thread = &WorkerThread::from(thread);
- WorkerThread::set_current(worker_thread);
- let registry = &*worker_thread.registry;
- let index = worker_thread.index;
-
- // let registry know we are ready to do work
- Latch::set(&registry.thread_infos[index].primed);
-
- // Worker threads should not panic. If they do, just abort, as the
- // internal state of the threadpool is corrupted. Note that if
- // **user code** panics, we should catch that and redirect.
- let abort_guard = unwind::AbortIfPanic;
-
- // Inform a user callback that we started a thread.
- if let Some(ref handler) = registry.start_handler {
- registry.catch_unwind(|| handler(index));
- }
-
- worker_thread.wait_until_out_of_work();
-
- // Normal termination, do not abort.
- mem::forget(abort_guard);
-
- // Inform a user callback that we exited a thread.
- if let Some(ref handler) = registry.exit_handler {
- registry.catch_unwind(|| handler(index));
- // We're already exiting the thread, there's nothing else to do.
- }
-}
-
-/// If already in a worker-thread, just execute `op`. Otherwise,
-/// execute `op` in the default thread-pool. Either way, block until
-/// `op` completes and return its return value. If `op` panics, that
-/// panic will be propagated as well. The second argument indicates
-/// `true` if injection was performed, `false` if executed directly.
-pub(super) fn in_worker<OP, R>(op: OP) -> R
-where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
-{
- unsafe {
- let owner_thread = WorkerThread::current();
- if !owner_thread.is_null() {
- // Perfectly valid to give them a `&T`: this is the
- // current thread, so we know the data structure won't be
- // invalidated until we return.
- op(&*owner_thread, false)
- } else {
- global_registry().in_worker(op)
- }
- }
-}
-
-/// [xorshift*] is a fast pseudorandom number generator which will
-/// even tolerate weak seeding, as long as it's not zero.
-///
-/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
-struct XorShift64Star {
- state: Cell<u64>,
-}
-
-impl XorShift64Star {
- fn new() -> Self {
- // Any non-zero seed will do -- this uses the hash of a global counter.
- let mut seed = 0;
- while seed == 0 {
- let mut hasher = DefaultHasher::new();
- static COUNTER: AtomicUsize = AtomicUsize::new(0);
- hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
- seed = hasher.finish();
- }
-
- XorShift64Star {
- state: Cell::new(seed),
- }
- }
-
- fn next(&self) -> u64 {
- let mut x = self.state.get();
- debug_assert_ne!(x, 0);
- x ^= x >> 12;
- x ^= x << 25;
- x ^= x >> 27;
- self.state.set(x);
- x.wrapping_mul(0x2545_f491_4f6c_dd1d)
- }
-
- /// Return a value from `0..n`.
- fn next_usize(&self, n: usize) -> usize {
- (self.next() % n as u64) as usize
- }
-}