use crate::job::{JobFifo, JobRef, StackJob}; use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::sleep::Sleep; use crate::unwind; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, }; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; use std::fmt; use std::hash::Hasher; use std::io; use std::mem; use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Once}; use std::thread; use std::usize; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). pub struct ThreadBuilder { name: Option, stack_size: Option, worker: Worker, stealer: Stealer, registry: Arc, index: usize, } impl ThreadBuilder { /// Gets the index of this thread in the pool, within `0..num_threads`. pub fn index(&self) -> usize { self.index } /// Gets the string that was specified by `ThreadPoolBuilder::name()`. pub fn name(&self) -> Option<&str> { self.name.as_deref() } /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. pub fn stack_size(&self) -> Option { self.stack_size } /// Executes the main loop for this thread. This will not return until the /// thread pool is dropped. pub fn run(self) { unsafe { main_loop(self) } } } impl fmt::Debug for ThreadBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ThreadBuilder") .field("pool", &self.registry.id()) .field("index", &self.index) .field("name", &self.name) .field("stack_size", &self.stack_size) .finish() } } /// Generalized trait for spawning a thread in the `Registry`. /// /// This trait is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. pub trait ThreadSpawn { private_decl! {} /// Spawn a thread with the `ThreadBuilder` parameters, and then /// call `ThreadBuilder::run()`. fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; } /// Spawns a thread in the "normal" way with `std::thread::Builder`. /// /// This type is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. #[derive(Debug, Default)] pub struct DefaultSpawn; impl ThreadSpawn for DefaultSpawn { private_impl! {} fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { let mut b = thread::Builder::new(); if let Some(name) = thread.name() { b = b.name(name.to_owned()); } if let Some(stack_size) = thread.stack_size() { b = b.stack_size(stack_size); } b.spawn(|| thread.run())?; Ok(()) } } /// Spawns a thread with a user's custom callback. /// /// This type is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. #[derive(Debug)] pub struct CustomSpawn(F); impl CustomSpawn where F: FnMut(ThreadBuilder) -> io::Result<()>, { pub(super) fn new(spawn: F) -> Self { CustomSpawn(spawn) } } impl ThreadSpawn for CustomSpawn where F: FnMut(ThreadBuilder) -> io::Result<()>, { private_impl! {} #[inline] fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { (self.0)(thread) } } pub(super) struct Registry { thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, broadcasts: Mutex>>, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: // // - if this is the global registry, there is a ref-count that never // gets released. // - if this is a user-created thread-pool, then so long as the thread-pool // exists, it holds a reference. // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't // return until the blocking job is complete, that ref will continue to be held. // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) // and that job will keep the pool alive. terminate_count: AtomicUsize, } /// //////////////////////////////////////////////////////////////////////// /// Initialization static mut THE_REGISTRY: Option> = None; static THE_REGISTRY_SET: Once = Once::new(); /// Starts the worker threads (if that has not already happened). If /// initialization has not already occurred, use the default /// configuration. pub(super) fn global_registry() -> &'static Arc { set_global_registry(default_global_registry) .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") } /// Starts the worker threads (if that has not already happened) with /// the given builder. pub(super) fn init_global_registry( builder: ThreadPoolBuilder, ) -> Result<&'static Arc, ThreadPoolBuildError> where S: ThreadSpawn, { set_global_registry(|| Registry::new(builder)) } /// Starts the worker threads (if that has not already happened) /// by creating a registry with the given callback. fn set_global_registry(registry: F) -> Result<&'static Arc, ThreadPoolBuildError> where F: FnOnce() -> Result, ThreadPoolBuildError>, { let mut result = Err(ThreadPoolBuildError::new( ErrorKind::GlobalPoolAlreadyInitialized, )); THE_REGISTRY_SET.call_once(|| { result = registry() .map(|registry: Arc| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) }); result } fn default_global_registry() -> Result, ThreadPoolBuildError> { let result = Registry::new(ThreadPoolBuilder::new()); // If we're running in an environment that doesn't support threads at all, we can fall back to // using the current thread alone. This is crude, and probably won't work for non-blocking // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. // // Notably, this allows current WebAssembly targets to work even though their threading support // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; } } result } struct Terminator<'a>(&'a Arc); impl<'a> Drop for Terminator<'a> { fn drop(&mut self) { self.0.terminate() } } impl Registry { pub(super) fn new( mut builder: ThreadPoolBuilder, ) -> Result, ThreadPoolBuildError> where S: ThreadSpawn, { // Soft-limit the number of threads that we can actually support. let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads()); let breadth_first = builder.get_breadth_first(); let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() }; let stealer = worker.stealer(); (worker, stealer) }) .unzip(); let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { let worker = Worker::new_fifo(); let stealer = worker.stealer(); (worker, stealer) }) .unzip(); let registry = Arc::new(Registry { thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), sleep: Sleep::new(n_threads), injected_jobs: Injector::new(), broadcasts: Mutex::new(broadcasts), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), }); // If we return early or panic, make sure to terminate existing threads. let t1000 = Terminator(®istry); for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() { let thread = ThreadBuilder { name: builder.get_thread_name(index), stack_size: builder.get_stack_size(), registry: Arc::clone(®istry), worker, stealer, index, }; if index == 0 && builder.use_current_thread { if !WorkerThread::current().is_null() { return Err(ThreadPoolBuildError::new( ErrorKind::CurrentThreadAlreadyInPool, )); } // Rather than starting a new thread, we're just taking over the current thread // *without* running the main loop, so we can still return from here. // The WorkerThread is leaked, but we never shutdown the global pool anyway. let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); unsafe { WorkerThread::set_current(worker_thread); Latch::set(®istry.thread_infos[index].primed); } continue; } if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } } // Returning normally now, without termination. mem::forget(t1000); Ok(registry) } pub(super) fn current() -> Arc { unsafe { let worker_thread = WorkerThread::current(); let registry = if worker_thread.is_null() { global_registry() } else { &(*worker_thread).registry }; Arc::clone(registry) } } /// Returns the number of threads in the current registry. This /// is better than `Registry::current().num_threads()` because it /// avoids incrementing the `Arc`. pub(super) fn current_num_threads() -> usize { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { global_registry().num_threads() } else { (*worker_thread).registry.num_threads() } } } /// Returns the current `WorkerThread` if it's part of this `Registry`. pub(super) fn current_thread(&self) -> Option<&WorkerThread> { unsafe { let worker = WorkerThread::current().as_ref()?; if worker.registry().id() == self.id() { Some(worker) } else { None } } } /// Returns an opaque identifier for this registry. pub(super) fn id(&self) -> RegistryId { // We can rely on `self` not to change since we only ever create // registries that are boxed up in an `Arc` (see `new()` above). RegistryId { addr: self as *const Self as usize, } } pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } pub(super) fn catch_unwind(&self, f: impl FnOnce()) { if let Err(err) = unwind::halt_unwinding(f) { // If there is no handler, or if that handler itself panics, then we abort. let abort_guard = unwind::AbortIfPanic; if let Some(ref handler) = self.panic_handler { handler(err); mem::forget(abort_guard); } } } /// Waits for the worker threads to get up and running. This is /// meant to be used for benchmarking purposes, primarily, so that /// you can get more consistent numbers by having everything /// "ready to go". pub(super) fn wait_until_primed(&self) { for info in &self.thread_infos { info.primed.wait(); } } /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. #[cfg(test)] pub(super) fn wait_until_stopped(&self) { for info in &self.thread_infos { info.stopped.wait(); } } /// //////////////////////////////////////////////////////////////////////// /// MAIN LOOP /// /// So long as all of the worker threads are hanging out in their /// top-level loop, there is no work to be done. /// Push a job into the given `registry`. If we are running on a /// worker thread for the registry, this will push onto the /// deque. Else, it will inject from the outside (which is slower). pub(super) fn inject_or_push(&self, job_ref: JobRef) { let worker_thread = WorkerThread::current(); unsafe { if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { (*worker_thread).push(job_ref); } else { self.inject(job_ref); } } } /// Push a job into the "external jobs" queue; it will be taken by /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_job: JobRef) { // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be // calling `inject()` later, since they dropped their // `ThreadPool`. debug_assert_ne!( self.terminate_count.load(Ordering::Acquire), 0, "inject() sees state.terminate as true" ); let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); self.sleep.new_injected_jobs(1, queue_was_empty); } fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } fn pop_injected_job(&self) -> Option { loop { match self.injected_jobs.steal() { Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } } } /// Push a job into each thread's own "external jobs" queue; it will be /// executed only on that thread, when it has nothing else to do locally, /// before it tries to steal other work. /// /// **Panics** if not given exactly as many jobs as there are threads. pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator) { assert_eq!(self.num_threads(), injected_jobs.len()); { let broadcasts = self.broadcasts.lock().unwrap(); // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be // calling `inject_broadcast()` later, since they dropped their // `ThreadPool`. debug_assert_ne!( self.terminate_count.load(Ordering::Acquire), 0, "inject_broadcast() sees state.terminate as true" ); assert_eq!(broadcasts.len(), injected_jobs.len()); for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) { worker.push(job_ref); } } for i in 0..self.num_threads() { self.sleep.notify_worker_latch_is_set(i); } } /// If already in a worker-thread of this registry, just execute `op`. /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` /// completes and return its return value. If `op` panics, that panic will /// be propagated as well. The second argument indicates `true` if injection /// was performed, `false` if executed directly. pub(super) fn in_worker(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { self.in_worker_cold(op) } else if (*worker_thread).registry().id() != self.id() { self.in_worker_cross(&*worker_thread, op) } else { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be // invalidated until we return. op(&*worker_thread, false) } } } #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); LOCK_LATCH.with(|l| { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, LatchRef::new(l), ); self.inject(job.as_job_ref()); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. job.into_result() }) } #[cold] unsafe fn in_worker_cross(&self, current_thread: &WorkerThread, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { // This thread is a member of a different pool, so let it process // other work while waiting for this `op` to complete. debug_assert!(current_thread.registry().id() != self.id()); let latch = SpinLatch::cross(current_thread); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, latch, ); self.inject(job.as_job_ref()); current_thread.wait_until(&job.latch); job.into_result() } /// Increments the terminate counter. This increment should be /// balanced by a call to `terminate`, which will decrement. This /// is used when spawning asynchronous work, which needs to /// prevent the registry from terminating so long as it is active. /// /// Note that blocking functions such as `join` and `scope` do not /// need to concern themselves with this fn; their context is /// responsible for ensuring the current thread-pool will not /// terminate until they return. /// /// The global thread-pool always has an outstanding reference /// (the initial one). Custom thread-pools have one outstanding /// reference that is dropped when the `ThreadPool` is dropped: /// since installing the thread-pool blocks until any joins/scopes /// complete, this ensures that joins/scopes are covered. /// /// The exception is `::spawn()`, which can create a job outside /// of any blocking scope. In that case, the job itself holds a /// terminate count and is responsible for invoking `terminate()` /// when finished. pub(super) fn increment_terminate_count(&self) { let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); debug_assert!(previous != 0, "registry ref count incremented from zero"); assert!( previous != std::usize::MAX, "overflow in registry ref count" ); } /// Signals that the thread-pool which owns this registry has been /// dropped. The worker threads will gradually terminate, once any /// extant work is completed. pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } /// Notify the worker that the latch they are sleeping on has been "set". pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { self.sleep.notify_worker_latch_is_set(target_worker_index); } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(super) struct RegistryId { addr: usize, } struct ThreadInfo { /// Latch set once thread has started and we are entering into the /// main loop. Used to wait for worker threads to become primed, /// primarily of interest for benchmarking. primed: LockLatch, /// Latch is set once worker thread has completed. Used to wait /// until workers have stopped; only used for tests. stopped: LockLatch, /// The latch used to signal that terminated has been requested. /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. terminate: OnceLatch, /// the "stealer" half of the worker's deque stealer: Stealer, } impl ThreadInfo { fn new(stealer: Stealer) -> ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), terminate: OnceLatch::new(), stealer, } } } /// //////////////////////////////////////////////////////////////////////// /// WorkerThread identifiers pub(super) struct WorkerThread { /// the "worker" half of our local deque worker: Worker, /// the "stealer" half of the worker's broadcast deque stealer: Stealer, /// local queue used for `spawn_fifo` indirection fifo: JobFifo, index: usize, /// A weak random number generator. rng: XorShift64Star, registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is // allocated on the stack of the worker on entry and stored into this // thread local variable. So it will remain valid at least until the // worker is fully unwound. Using an unsafe pointer avoids the need // for a RefCell etc. thread_local! { static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; } impl From for WorkerThread { fn from(thread: ThreadBuilder) -> Self { Self { worker: thread.worker, stealer: thread.stealer, fifo: JobFifo::new(), index: thread.index, rng: XorShift64Star::new(), registry: thread.registry, } } } impl Drop for WorkerThread { fn drop(&mut self) { // Undo `set_current` WORKER_THREAD_STATE.with(|t| { assert!(t.get().eq(&(self as *const _))); t.set(ptr::null()); }); } } impl WorkerThread { /// Gets the `WorkerThread` index for the current thread; returns /// NULL if this is not a worker thread. This pointer is valid /// anywhere on the current thread. #[inline] pub(super) fn current() -> *const WorkerThread { WORKER_THREAD_STATE.with(Cell::get) } /// Sets `self` as the worker thread index for the current thread. /// This is done during worker thread startup. unsafe fn set_current(thread: *const WorkerThread) { WORKER_THREAD_STATE.with(|t| { assert!(t.get().is_null()); t.set(thread); }); } /// Returns the registry that owns this worker thread. #[inline] pub(super) fn registry(&self) -> &Arc { &self.registry } /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { self.index } #[inline] pub(super) unsafe fn push(&self, job: JobRef) { let queue_was_empty = self.worker.is_empty(); self.worker.push(job); self.registry.sleep.new_internal_jobs(1, queue_was_empty); } #[inline] pub(super) unsafe fn push_fifo(&self, job: JobRef) { self.push(self.fifo.push(job)); } #[inline] pub(super) fn local_deque_is_empty(&self) -> bool { self.worker.is_empty() } /// Attempts to obtain a "local" job -- typically this means /// popping from the top of the stack, though if we are configured /// for breadth-first execution, it would mean dequeuing from the /// bottom. #[inline] pub(super) fn take_local_job(&self) -> Option { let popped_job = self.worker.pop(); if popped_job.is_some() { return popped_job; } loop { match self.stealer.steal() { Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } } } fn has_injected_job(&self) -> bool { !self.stealer.is_empty() || self.registry.has_injected_job() } /// Wait until the latch is set. Try to keep busy by popping and /// stealing tasks as necessary. #[inline] pub(super) unsafe fn wait_until(&self, latch: &L) { let latch = latch.as_core_latch(); if !latch.probe() { self.wait_until_cold(latch); } } #[cold] unsafe fn wait_until_cold(&self, latch: &CoreLatch) { // the code below should swallow all panics and hence never // unwind; but if something does wrong, we want to abort, // because otherwise other code in rayon may assume that the // latch has been signaled, and that can lead to random memory // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; 'outer: while !latch.probe() { // Check for local work *before* we start marking ourself idle, // especially to avoid modifying shared sleep state. if let Some(job) = self.take_local_job() { self.execute(job); continue; } let mut idle_state = self.registry.sleep.start_looking(self.index); while !latch.probe() { if let Some(job) = self.find_work() { self.registry.sleep.work_found(); self.execute(job); // The job might have injected local work, so go back to the outer loop. continue 'outer; } else { self.registry .sleep .no_work_found(&mut idle_state, latch, || self.has_injected_job()) } } // If we were sleepy, we are not anymore. We "found work" -- // whatever the surrounding thread was doing before it had to wait. self.registry.sleep.work_found(); break; } mem::forget(abort_guard); // successful execution, do not abort } unsafe fn wait_until_out_of_work(&self) { debug_assert_eq!(self as *const _, WorkerThread::current()); let registry = &*self.registry; let index = self.index; self.wait_until(®istry.thread_infos[index].terminate); // Should not be any work left in our queue. debug_assert!(self.take_local_job().is_none()); // Let registry know we are done Latch::set(®istry.thread_infos[index].stopped); } fn find_work(&self) -> Option { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers // deques, and finally to injected jobs from the // outside. The idea is to finish what we started before // we take on something new. self.take_local_job() .or_else(|| self.steal()) .or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { match self.find_work() { Some(job) => unsafe { self.execute(job); Yield::Executed }, None => Yield::Idle, } } pub(super) fn yield_local(&self) -> Yield { match self.take_local_job() { Some(job) => unsafe { self.execute(job); Yield::Executed }, None => Yield::Idle, } } #[inline] pub(super) unsafe fn execute(&self, job: JobRef) { job.execute(); } /// Try to steal a single job and return it. /// /// This should only be done as a last resort, when there is no /// local work to do. fn steal(&self) -> Option { // we only steal when we don't have any work to do locally debug_assert!(self.local_deque_is_empty()); // otherwise, try to steal let thread_infos = &self.registry.thread_infos.as_slice(); let num_threads = thread_infos.len(); if num_threads <= 1 { return None; } loop { let mut retry = false; let start = self.rng.next_usize(num_threads); let job = (start..num_threads) .chain(0..start) .filter(move |&i| i != self.index) .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { Steal::Success(job) => Some(job), Steal::Empty => None, Steal::Retry => { retry = true; None } } }); if job.is_some() || !retry { return job; } } } } /// //////////////////////////////////////////////////////////////////////// unsafe fn main_loop(thread: ThreadBuilder) { let worker_thread = &WorkerThread::from(thread); WorkerThread::set_current(worker_thread); let registry = &*worker_thread.registry; let index = worker_thread.index; // let registry know we are ready to do work Latch::set(®istry.thread_infos[index].primed); // Worker threads should not panic. If they do, just abort, as the // internal state of the threadpool is corrupted. Note that if // **user code** panics, we should catch that and redirect. let abort_guard = unwind::AbortIfPanic; // Inform a user callback that we started a thread. if let Some(ref handler) = registry.start_handler { registry.catch_unwind(|| handler(index)); } worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. mem::forget(abort_guard); // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { registry.catch_unwind(|| handler(index)); // We're already exiting the thread, there's nothing else to do. } } /// If already in a worker-thread, just execute `op`. Otherwise, /// execute `op` in the default thread-pool. Either way, block until /// `op` completes and return its return value. If `op` panics, that /// panic will be propagated as well. The second argument indicates /// `true` if injection was performed, `false` if executed directly. pub(super) fn in_worker(op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { unsafe { let owner_thread = WorkerThread::current(); if !owner_thread.is_null() { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be // invalidated until we return. op(&*owner_thread, false) } else { global_registry().in_worker(op) } } } /// [xorshift*] is a fast pseudorandom number generator which will /// even tolerate weak seeding, as long as it's not zero. /// /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* struct XorShift64Star { state: Cell, } impl XorShift64Star { fn new() -> Self { // Any non-zero seed will do -- this uses the hash of a global counter. let mut seed = 0; while seed == 0 { let mut hasher = DefaultHasher::new(); static COUNTER: AtomicUsize = AtomicUsize::new(0); hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); seed = hasher.finish(); } XorShift64Star { state: Cell::new(seed), } } fn next(&self) -> u64 { let mut x = self.state.get(); debug_assert_ne!(x, 0); x ^= x >> 12; x ^= x << 25; x ^= x >> 27; self.state.set(x); x.wrapping_mul(0x2545_f491_4f6c_dd1d) } /// Return a value from `0..n`. fn next_usize(&self, n: usize) -> usize { (self.next() % n as u64) as usize } }