diff options
author | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
---|---|---|
committer | Valentin Popov <valentin@popov.link> | 2024-01-08 00:21:28 +0300 |
commit | 1b6a04ca5504955c571d1c97504fb45ea0befee4 (patch) | |
tree | 7579f518b23313e8a9748a88ab6173d5e030b227 /vendor/rayon-core/src/registry.rs | |
parent | 5ecd8cf2cba827454317368b68571df0d13d7842 (diff) | |
download | fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.tar.xz fparkan-1b6a04ca5504955c571d1c97504fb45ea0befee4.zip |
Initial vendor packages
Signed-off-by: Valentin Popov <valentin@popov.link>
Diffstat (limited to 'vendor/rayon-core/src/registry.rs')
-rw-r--r-- | vendor/rayon-core/src/registry.rs | 995 |
1 files changed, 995 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs new file mode 100644 index 0000000..e4f2ac7 --- /dev/null +++ b/vendor/rayon-core/src/registry.rs @@ -0,0 +1,995 @@ +use crate::job::{JobFifo, JobRef, StackJob}; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; +use crate::sleep::Sleep; +use crate::unwind; +use crate::{ + ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, + Yield, +}; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use std::cell::Cell; +use std::collections::hash_map::DefaultHasher; +use std::fmt; +use std::hash::Hasher; +use std::io; +use std::mem; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; +use std::thread; +use std::usize; + +/// Thread builder used for customization via +/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). +pub struct ThreadBuilder { + name: Option<String>, + stack_size: Option<usize>, + worker: Worker<JobRef>, + stealer: Stealer<JobRef>, + registry: Arc<Registry>, + index: usize, +} + +impl ThreadBuilder { + /// Gets the index of this thread in the pool, within `0..num_threads`. + pub fn index(&self) -> usize { + self.index + } + + /// Gets the string that was specified by `ThreadPoolBuilder::name()`. + pub fn name(&self) -> Option<&str> { + self.name.as_deref() + } + + /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. + pub fn stack_size(&self) -> Option<usize> { + self.stack_size + } + + /// Executes the main loop for this thread. This will not return until the + /// thread pool is dropped. + pub fn run(self) { + unsafe { main_loop(self) } + } +} + +impl fmt::Debug for ThreadBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadBuilder") + .field("pool", &self.registry.id()) + .field("index", &self.index) + .field("name", &self.name) + .field("stack_size", &self.stack_size) + .finish() + } +} + +/// Generalized trait for spawning a thread in the `Registry`. +/// +/// This trait is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +pub trait ThreadSpawn { + private_decl! {} + + /// Spawn a thread with the `ThreadBuilder` parameters, and then + /// call `ThreadBuilder::run()`. + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; +} + +/// Spawns a thread in the "normal" way with `std::thread::Builder`. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug, Default)] +pub struct DefaultSpawn; + +impl ThreadSpawn for DefaultSpawn { + private_impl! {} + + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + let mut b = thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + b.spawn(|| thread.run())?; + Ok(()) + } +} + +/// Spawns a thread with a user's custom callback. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug)] +pub struct CustomSpawn<F>(F); + +impl<F> CustomSpawn<F> +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + pub(super) fn new(spawn: F) -> Self { + CustomSpawn(spawn) + } +} + +impl<F> ThreadSpawn for CustomSpawn<F> +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + private_impl! {} + + #[inline] + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + (self.0)(thread) + } +} + +pub(super) struct Registry { + thread_infos: Vec<ThreadInfo>, + sleep: Sleep, + injected_jobs: Injector<JobRef>, + broadcasts: Mutex<Vec<Worker<JobRef>>>, + panic_handler: Option<Box<PanicHandler>>, + start_handler: Option<Box<StartHandler>>, + exit_handler: Option<Box<ExitHandler>>, + + // When this latch reaches 0, it means that all work on this + // registry must be complete. This is ensured in the following ways: + // + // - if this is the global registry, there is a ref-count that never + // gets released. + // - if this is a user-created thread-pool, then so long as the thread-pool + // exists, it holds a reference. + // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, + // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't + // return until the blocking job is complete, that ref will continue to be held. + // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. + // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) + // and that job will keep the pool alive. + terminate_count: AtomicUsize, +} + +/// //////////////////////////////////////////////////////////////////////// +/// Initialization + +static mut THE_REGISTRY: Option<Arc<Registry>> = None; +static THE_REGISTRY_SET: Once = Once::new(); + +/// Starts the worker threads (if that has not already happened). If +/// initialization has not already occurred, use the default +/// configuration. +pub(super) fn global_registry() -> &'static Arc<Registry> { + set_global_registry(default_global_registry) + .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) + .expect("The global thread pool has not been initialized.") +} + +/// Starts the worker threads (if that has not already happened) with +/// the given builder. +pub(super) fn init_global_registry<S>( + builder: ThreadPoolBuilder<S>, +) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> +where + S: ThreadSpawn, +{ + set_global_registry(|| Registry::new(builder)) +} + +/// Starts the worker threads (if that has not already happened) +/// by creating a registry with the given callback. +fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> +where + F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, +{ + let mut result = Err(ThreadPoolBuildError::new( + ErrorKind::GlobalPoolAlreadyInitialized, + )); + + THE_REGISTRY_SET.call_once(|| { + result = registry() + .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) + }); + + result +} + +fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { + let result = Registry::new(ThreadPoolBuilder::new()); + + // If we're running in an environment that doesn't support threads at all, we can fall back to + // using the current thread alone. This is crude, and probably won't work for non-blocking + // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. + // + // Notably, this allows current WebAssembly targets to work even though their threading support + // is stubbed out, and we won't have to change anything if they do add real threading. + let unsupported = matches!(&result, Err(e) if e.is_unsupported()); + if unsupported && WorkerThread::current().is_null() { + let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); + let fallback_result = Registry::new(builder); + if fallback_result.is_ok() { + return fallback_result; + } + } + + result +} + +struct Terminator<'a>(&'a Arc<Registry>); + +impl<'a> Drop for Terminator<'a> { + fn drop(&mut self) { + self.0.terminate() + } +} + +impl Registry { + pub(super) fn new<S>( + mut builder: ThreadPoolBuilder<S>, + ) -> Result<Arc<Self>, ThreadPoolBuildError> + where + S: ThreadSpawn, + { + // Soft-limit the number of threads that we can actually support. + let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads()); + + let breadth_first = builder.get_breadth_first(); + + let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = if breadth_first { + Worker::new_fifo() + } else { + Worker::new_lifo() + }; + + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = Worker::new_fifo(); + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let registry = Arc::new(Registry { + thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), + sleep: Sleep::new(n_threads), + injected_jobs: Injector::new(), + broadcasts: Mutex::new(broadcasts), + terminate_count: AtomicUsize::new(1), + panic_handler: builder.take_panic_handler(), + start_handler: builder.take_start_handler(), + exit_handler: builder.take_exit_handler(), + }); + + // If we return early or panic, make sure to terminate existing threads. + let t1000 = Terminator(®istry); + + for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() { + let thread = ThreadBuilder { + name: builder.get_thread_name(index), + stack_size: builder.get_stack_size(), + registry: Arc::clone(®istry), + worker, + stealer, + index, + }; + + if index == 0 && builder.use_current_thread { + if !WorkerThread::current().is_null() { + return Err(ThreadPoolBuildError::new( + ErrorKind::CurrentThreadAlreadyInPool, + )); + } + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); + + unsafe { + WorkerThread::set_current(worker_thread); + Latch::set(®istry.thread_infos[index].primed); + } + continue; + } + + if let Err(e) = builder.get_spawn_handler().spawn(thread) { + return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); + } + } + + // Returning normally now, without termination. + mem::forget(t1000); + + Ok(registry) + } + + pub(super) fn current() -> Arc<Registry> { + unsafe { + let worker_thread = WorkerThread::current(); + let registry = if worker_thread.is_null() { + global_registry() + } else { + &(*worker_thread).registry + }; + Arc::clone(registry) + } + } + + /// Returns the number of threads in the current registry. This + /// is better than `Registry::current().num_threads()` because it + /// avoids incrementing the `Arc`. + pub(super) fn current_num_threads() -> usize { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + global_registry().num_threads() + } else { + (*worker_thread).registry.num_threads() + } + } + } + + /// Returns the current `WorkerThread` if it's part of this `Registry`. + pub(super) fn current_thread(&self) -> Option<&WorkerThread> { + unsafe { + let worker = WorkerThread::current().as_ref()?; + if worker.registry().id() == self.id() { + Some(worker) + } else { + None + } + } + } + + /// Returns an opaque identifier for this registry. + pub(super) fn id(&self) -> RegistryId { + // We can rely on `self` not to change since we only ever create + // registries that are boxed up in an `Arc` (see `new()` above). + RegistryId { + addr: self as *const Self as usize, + } + } + + pub(super) fn num_threads(&self) -> usize { + self.thread_infos.len() + } + + pub(super) fn catch_unwind(&self, f: impl FnOnce()) { + if let Err(err) = unwind::halt_unwinding(f) { + // If there is no handler, or if that handler itself panics, then we abort. + let abort_guard = unwind::AbortIfPanic; + if let Some(ref handler) = self.panic_handler { + handler(err); + mem::forget(abort_guard); + } + } + } + + /// Waits for the worker threads to get up and running. This is + /// meant to be used for benchmarking purposes, primarily, so that + /// you can get more consistent numbers by having everything + /// "ready to go". + pub(super) fn wait_until_primed(&self) { + for info in &self.thread_infos { + info.primed.wait(); + } + } + + /// Waits for the worker threads to stop. This is used for testing + /// -- so we can check that termination actually works. + #[cfg(test)] + pub(super) fn wait_until_stopped(&self) { + for info in &self.thread_infos { + info.stopped.wait(); + } + } + + /// //////////////////////////////////////////////////////////////////////// + /// MAIN LOOP + /// + /// So long as all of the worker threads are hanging out in their + /// top-level loop, there is no work to be done. + + /// Push a job into the given `registry`. If we are running on a + /// worker thread for the registry, this will push onto the + /// deque. Else, it will inject from the outside (which is slower). + pub(super) fn inject_or_push(&self, job_ref: JobRef) { + let worker_thread = WorkerThread::current(); + unsafe { + if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { + (*worker_thread).push(job_ref); + } else { + self.inject(job_ref); + } + } + } + + /// Push a job into the "external jobs" queue; it will be taken by + /// whatever worker has nothing to do. Use this if you know that + /// you are not on a worker of this registry. + pub(super) fn inject(&self, injected_job: JobRef) { + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject() sees state.terminate as true" + ); + + let queue_was_empty = self.injected_jobs.is_empty(); + + self.injected_jobs.push(injected_job); + self.sleep.new_injected_jobs(1, queue_was_empty); + } + + fn has_injected_job(&self) -> bool { + !self.injected_jobs.is_empty() + } + + fn pop_injected_job(&self) -> Option<JobRef> { + loop { + match self.injected_jobs.steal() { + Steal::Success(job) => return Some(job), + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + /// Push a job into each thread's own "external jobs" queue; it will be + /// executed only on that thread, when it has nothing else to do locally, + /// before it tries to steal other work. + /// + /// **Panics** if not given exactly as many jobs as there are threads. + pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) { + assert_eq!(self.num_threads(), injected_jobs.len()); + { + let broadcasts = self.broadcasts.lock().unwrap(); + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject_broadcast()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject_broadcast() sees state.terminate as true" + ); + + assert_eq!(broadcasts.len(), injected_jobs.len()); + for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) { + worker.push(job_ref); + } + } + for i in 0..self.num_threads() { + self.sleep.notify_worker_latch_is_set(i); + } + } + + /// If already in a worker-thread of this registry, just execute `op`. + /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` + /// completes and return its return value. If `op` panics, that panic will + /// be propagated as well. The second argument indicates `true` if injection + /// was performed, `false` if executed directly. + pub(super) fn in_worker<OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + self.in_worker_cold(op) + } else if (*worker_thread).registry().id() != self.id() { + self.in_worker_cross(&*worker_thread, op) + } else { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*worker_thread, false) + } + } + } + + #[cold] + unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); + + LOCK_LATCH.with(|l| { + // This thread isn't a member of *any* thread pool, so just block. + debug_assert!(WorkerThread::current().is_null()); + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + LatchRef::new(l), + ); + self.inject(job.as_job_ref()); + job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + + job.into_result() + }) + } + + #[cold] + unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + // This thread is a member of a different pool, so let it process + // other work while waiting for this `op` to complete. + debug_assert!(current_thread.registry().id() != self.id()); + let latch = SpinLatch::cross(current_thread); + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + latch, + ); + self.inject(job.as_job_ref()); + current_thread.wait_until(&job.latch); + job.into_result() + } + + /// Increments the terminate counter. This increment should be + /// balanced by a call to `terminate`, which will decrement. This + /// is used when spawning asynchronous work, which needs to + /// prevent the registry from terminating so long as it is active. + /// + /// Note that blocking functions such as `join` and `scope` do not + /// need to concern themselves with this fn; their context is + /// responsible for ensuring the current thread-pool will not + /// terminate until they return. + /// + /// The global thread-pool always has an outstanding reference + /// (the initial one). Custom thread-pools have one outstanding + /// reference that is dropped when the `ThreadPool` is dropped: + /// since installing the thread-pool blocks until any joins/scopes + /// complete, this ensures that joins/scopes are covered. + /// + /// The exception is `::spawn()`, which can create a job outside + /// of any blocking scope. In that case, the job itself holds a + /// terminate count and is responsible for invoking `terminate()` + /// when finished. + pub(super) fn increment_terminate_count(&self) { + let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); + debug_assert!(previous != 0, "registry ref count incremented from zero"); + assert!( + previous != std::usize::MAX, + "overflow in registry ref count" + ); + } + + /// Signals that the thread-pool which owns this registry has been + /// dropped. The worker threads will gradually terminate, once any + /// extant work is completed. + pub(super) fn terminate(&self) { + if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { + for (i, thread_info) in self.thread_infos.iter().enumerate() { + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + } + } + } + + /// Notify the worker that the latch they are sleeping on has been "set". + pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { + self.sleep.notify_worker_latch_is_set(target_worker_index); + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(super) struct RegistryId { + addr: usize, +} + +struct ThreadInfo { + /// Latch set once thread has started and we are entering into the + /// main loop. Used to wait for worker threads to become primed, + /// primarily of interest for benchmarking. + primed: LockLatch, + + /// Latch is set once worker thread has completed. Used to wait + /// until workers have stopped; only used for tests. + stopped: LockLatch, + + /// The latch used to signal that terminated has been requested. + /// This latch is *set* by the `terminate` method on the + /// `Registry`, once the registry's main "terminate" counter + /// reaches zero. + terminate: OnceLatch, + + /// the "stealer" half of the worker's deque + stealer: Stealer<JobRef>, +} + +impl ThreadInfo { + fn new(stealer: Stealer<JobRef>) -> ThreadInfo { + ThreadInfo { + primed: LockLatch::new(), + stopped: LockLatch::new(), + terminate: OnceLatch::new(), + stealer, + } + } +} + +/// //////////////////////////////////////////////////////////////////////// +/// WorkerThread identifiers + +pub(super) struct WorkerThread { + /// the "worker" half of our local deque + worker: Worker<JobRef>, + + /// the "stealer" half of the worker's broadcast deque + stealer: Stealer<JobRef>, + + /// local queue used for `spawn_fifo` indirection + fifo: JobFifo, + + index: usize, + + /// A weak random number generator. + rng: XorShift64Star, + + registry: Arc<Registry>, +} + +// This is a bit sketchy, but basically: the WorkerThread is +// allocated on the stack of the worker on entry and stored into this +// thread local variable. So it will remain valid at least until the +// worker is fully unwound. Using an unsafe pointer avoids the need +// for a RefCell<T> etc. +thread_local! { + static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; +} + +impl From<ThreadBuilder> for WorkerThread { + fn from(thread: ThreadBuilder) -> Self { + Self { + worker: thread.worker, + stealer: thread.stealer, + fifo: JobFifo::new(), + index: thread.index, + rng: XorShift64Star::new(), + registry: thread.registry, + } + } +} + +impl Drop for WorkerThread { + fn drop(&mut self) { + // Undo `set_current` + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().eq(&(self as *const _))); + t.set(ptr::null()); + }); + } +} + +impl WorkerThread { + /// Gets the `WorkerThread` index for the current thread; returns + /// NULL if this is not a worker thread. This pointer is valid + /// anywhere on the current thread. + #[inline] + pub(super) fn current() -> *const WorkerThread { + WORKER_THREAD_STATE.with(Cell::get) + } + + /// Sets `self` as the worker thread index for the current thread. + /// This is done during worker thread startup. + unsafe fn set_current(thread: *const WorkerThread) { + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().is_null()); + t.set(thread); + }); + } + + /// Returns the registry that owns this worker thread. + #[inline] + pub(super) fn registry(&self) -> &Arc<Registry> { + &self.registry + } + + /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). + #[inline] + pub(super) fn index(&self) -> usize { + self.index + } + + #[inline] + pub(super) unsafe fn push(&self, job: JobRef) { + let queue_was_empty = self.worker.is_empty(); + self.worker.push(job); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); + } + + #[inline] + pub(super) unsafe fn push_fifo(&self, job: JobRef) { + self.push(self.fifo.push(job)); + } + + #[inline] + pub(super) fn local_deque_is_empty(&self) -> bool { + self.worker.is_empty() + } + + /// Attempts to obtain a "local" job -- typically this means + /// popping from the top of the stack, though if we are configured + /// for breadth-first execution, it would mean dequeuing from the + /// bottom. + #[inline] + pub(super) fn take_local_job(&self) -> Option<JobRef> { + let popped_job = self.worker.pop(); + + if popped_job.is_some() { + return popped_job; + } + + loop { + match self.stealer.steal() { + Steal::Success(job) => return Some(job), + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + fn has_injected_job(&self) -> bool { + !self.stealer.is_empty() || self.registry.has_injected_job() + } + + /// Wait until the latch is set. Try to keep busy by popping and + /// stealing tasks as necessary. + #[inline] + pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) { + let latch = latch.as_core_latch(); + if !latch.probe() { + self.wait_until_cold(latch); + } + } + + #[cold] + unsafe fn wait_until_cold(&self, latch: &CoreLatch) { + // the code below should swallow all panics and hence never + // unwind; but if something does wrong, we want to abort, + // because otherwise other code in rayon may assume that the + // latch has been signaled, and that can lead to random memory + // accesses, which would be *very bad* + let abort_guard = unwind::AbortIfPanic; + + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { + self.execute(job); + continue; + } + + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + } + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + break; + } + + mem::forget(abort_guard); // successful execution, do not abort + } + + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + + fn find_work(&self) -> Option<JobRef> { + // Try to find some work to do. We give preference first + // to things in our local deque, then in other workers + // deques, and finally to injected jobs from the + // outside. The idea is to finish what we started before + // we take on something new. + self.take_local_job() + .or_else(|| self.steal()) + .or_else(|| self.registry.pop_injected_job()) + } + + pub(super) fn yield_now(&self) -> Yield { + match self.find_work() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + pub(super) fn yield_local(&self) -> Yield { + match self.take_local_job() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + #[inline] + pub(super) unsafe fn execute(&self, job: JobRef) { + job.execute(); + } + + /// Try to steal a single job and return it. + /// + /// This should only be done as a last resort, when there is no + /// local work to do. + fn steal(&self) -> Option<JobRef> { + // we only steal when we don't have any work to do locally + debug_assert!(self.local_deque_is_empty()); + + // otherwise, try to steal + let thread_infos = &self.registry.thread_infos.as_slice(); + let num_threads = thread_infos.len(); + if num_threads <= 1 { + return None; + } + + loop { + let mut retry = false; + let start = self.rng.next_usize(num_threads); + let job = (start..num_threads) + .chain(0..start) + .filter(move |&i| i != self.index) + .find_map(|victim_index| { + let victim = &thread_infos[victim_index]; + match victim.stealer.steal() { + Steal::Success(job) => Some(job), + Steal::Empty => None, + Steal::Retry => { + retry = true; + None + } + } + }); + if job.is_some() || !retry { + return job; + } + } + } +} + +/// //////////////////////////////////////////////////////////////////////// + +unsafe fn main_loop(thread: ThreadBuilder) { + let worker_thread = &WorkerThread::from(thread); + WorkerThread::set_current(worker_thread); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + + // Worker threads should not panic. If they do, just abort, as the + // internal state of the threadpool is corrupted. Note that if + // **user code** panics, we should catch that and redirect. + let abort_guard = unwind::AbortIfPanic; + + // Inform a user callback that we started a thread. + if let Some(ref handler) = registry.start_handler { + registry.catch_unwind(|| handler(index)); + } + + worker_thread.wait_until_out_of_work(); + + // Normal termination, do not abort. + mem::forget(abort_guard); + + // Inform a user callback that we exited a thread. + if let Some(ref handler) = registry.exit_handler { + registry.catch_unwind(|| handler(index)); + // We're already exiting the thread, there's nothing else to do. + } +} + +/// If already in a worker-thread, just execute `op`. Otherwise, +/// execute `op` in the default thread-pool. Either way, block until +/// `op` completes and return its return value. If `op` panics, that +/// panic will be propagated as well. The second argument indicates +/// `true` if injection was performed, `false` if executed directly. +pub(super) fn in_worker<OP, R>(op: OP) -> R +where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, +{ + unsafe { + let owner_thread = WorkerThread::current(); + if !owner_thread.is_null() { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*owner_thread, false) + } else { + global_registry().in_worker(op) + } + } +} + +/// [xorshift*] is a fast pseudorandom number generator which will +/// even tolerate weak seeding, as long as it's not zero. +/// +/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* +struct XorShift64Star { + state: Cell<u64>, +} + +impl XorShift64Star { + fn new() -> Self { + // Any non-zero seed will do -- this uses the hash of a global counter. + let mut seed = 0; + while seed == 0 { + let mut hasher = DefaultHasher::new(); + static COUNTER: AtomicUsize = AtomicUsize::new(0); + hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); + seed = hasher.finish(); + } + + XorShift64Star { + state: Cell::new(seed), + } + } + + fn next(&self) -> u64 { + let mut x = self.state.get(); + debug_assert_ne!(x, 0); + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.state.set(x); + x.wrapping_mul(0x2545_f491_4f6c_dd1d) + } + + /// Return a value from `0..n`. + fn next_usize(&self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +} |