diff options
Diffstat (limited to 'vendor/rayon-core/src')
27 files changed, 0 insertions, 7260 deletions
diff --git a/vendor/rayon-core/src/broadcast/mod.rs b/vendor/rayon-core/src/broadcast/mod.rs deleted file mode 100644 index 96611e4..0000000 --- a/vendor/rayon-core/src/broadcast/mod.rs +++ /dev/null @@ -1,150 +0,0 @@ -use crate::job::{ArcJob, StackJob}; -use crate::latch::{CountLatch, LatchRef}; -use crate::registry::{Registry, WorkerThread}; -use std::fmt; -use std::marker::PhantomData; -use std::sync::Arc; - -mod test; - -/// Executes `op` within every thread in the current threadpool. If this is -/// called from a non-Rayon thread, it will execute in the global threadpool. -/// Any attempts to use `join`, `scope`, or parallel iterators will then operate -/// within that threadpool. When the call has completed on each thread, returns -/// a vector containing all of their return values. -/// -/// For more information, see the [`ThreadPool::broadcast()`][m] method. -/// -/// [m]: struct.ThreadPool.html#method.broadcast -pub fn broadcast<OP, R>(op: OP) -> Vec<R> -where - OP: Fn(BroadcastContext<'_>) -> R + Sync, - R: Send, -{ - // We assert that current registry has not terminated. - unsafe { broadcast_in(op, &Registry::current()) } -} - -/// Spawns an asynchronous task on every thread in this thread-pool. This task -/// will run in the implicit, global scope, which means that it may outlast the -/// current stack frame -- therefore, it cannot capture any references onto the -/// stack (you will likely need a `move` closure). -/// -/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method. -/// -/// [m]: struct.ThreadPool.html#method.spawn_broadcast -pub fn spawn_broadcast<OP>(op: OP) -where - OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, -{ - // We assert that current registry has not terminated. - unsafe { spawn_broadcast_in(op, &Registry::current()) } -} - -/// Provides context to a closure called by `broadcast`. -pub struct BroadcastContext<'a> { - worker: &'a WorkerThread, - - /// Make sure to prevent auto-traits like `Send` and `Sync`. - _marker: PhantomData<&'a mut dyn Fn()>, -} - -impl<'a> BroadcastContext<'a> { - pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R { - let worker_thread = WorkerThread::current(); - assert!(!worker_thread.is_null()); - f(BroadcastContext { - worker: unsafe { &*worker_thread }, - _marker: PhantomData, - }) - } - - /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`). - #[inline] - pub fn index(&self) -> usize { - self.worker.index() - } - - /// The number of threads receiving the broadcast in the thread pool. - /// - /// # Future compatibility note - /// - /// Future versions of Rayon might vary the number of threads over time, but - /// this method will always return the number of threads which are actually - /// receiving your particular `broadcast` call. - #[inline] - pub fn num_threads(&self) -> usize { - self.worker.registry().num_threads() - } -} - -impl<'a> fmt::Debug for BroadcastContext<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("BroadcastContext") - .field("index", &self.index()) - .field("num_threads", &self.num_threads()) - .field("pool_id", &self.worker.registry().id()) - .finish() - } -} - -/// Execute `op` on every thread in the pool. It will be executed on each -/// thread when they have nothing else to do locally, before they try to -/// steal work from other threads. This function will not return until all -/// threads have completed the `op`. -/// -/// Unsafe because `registry` must not yet have terminated. -pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R> -where - OP: Fn(BroadcastContext<'_>) -> R + Sync, - R: Send, -{ - let f = move |injected: bool| { - debug_assert!(injected); - BroadcastContext::with(&op) - }; - - let n_threads = registry.num_threads(); - let current_thread = WorkerThread::current().as_ref(); - let latch = CountLatch::with_count(n_threads, current_thread); - let jobs: Vec<_> = (0..n_threads) - .map(|_| StackJob::new(&f, LatchRef::new(&latch))) - .collect(); - let job_refs = jobs.iter().map(|job| job.as_job_ref()); - - registry.inject_broadcast(job_refs); - - // Wait for all jobs to complete, then collect the results, maybe propagating a panic. - latch.wait(current_thread); - jobs.into_iter().map(|job| job.into_result()).collect() -} - -/// Execute `op` on every thread in the pool. It will be executed on each -/// thread when they have nothing else to do locally, before they try to -/// steal work from other threads. This function returns immediately after -/// injecting the jobs. -/// -/// Unsafe because `registry` must not yet have terminated. -pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>) -where - OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, -{ - let job = ArcJob::new({ - let registry = Arc::clone(registry); - move || { - registry.catch_unwind(|| BroadcastContext::with(&op)); - registry.terminate(); // (*) permit registry to terminate now - } - }); - - let n_threads = registry.num_threads(); - let job_refs = (0..n_threads).map(|_| { - // Ensure that registry cannot terminate until this job has executed - // on each thread. This ref is decremented at the (*) above. - registry.increment_terminate_count(); - - ArcJob::as_static_job_ref(&job) - }); - - registry.inject_broadcast(job_refs); -} diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs deleted file mode 100644 index 00ab4ad..0000000 --- a/vendor/rayon-core/src/broadcast/test.rs +++ /dev/null @@ -1,263 +0,0 @@ -#![cfg(test)] - -use crate::ThreadPoolBuilder; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::Arc; -use std::{thread, time}; - -#[test] -fn broadcast_global() { - let v = crate::broadcast(|ctx| ctx.index()); - assert!(v.into_iter().eq(0..crate::current_num_threads())); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_broadcast_global() { - let (tx, rx) = channel(); - crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); - - let mut v: Vec<_> = rx.into_iter().collect(); - v.sort_unstable(); - assert!(v.into_iter().eq(0..crate::current_num_threads())); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn broadcast_pool() { - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let v = pool.broadcast(|ctx| ctx.index()); - assert!(v.into_iter().eq(0..7)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_broadcast_pool() { - let (tx, rx) = channel(); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); - - let mut v: Vec<_> = rx.into_iter().collect(); - v.sort_unstable(); - assert!(v.into_iter().eq(0..7)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn broadcast_self() { - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); - assert!(v.into_iter().eq(0..7)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_broadcast_self() { - let (tx, rx) = channel(); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); - - let mut v: Vec<_> = rx.into_iter().collect(); - v.sort_unstable(); - assert!(v.into_iter().eq(0..7)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn broadcast_mutual() { - let count = AtomicUsize::new(0); - let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool1.install(|| { - pool2.broadcast(|_| { - pool1.broadcast(|_| { - count.fetch_add(1, Ordering::Relaxed); - }) - }) - }); - assert_eq!(count.into_inner(), 3 * 7); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_broadcast_mutual() { - let (tx, rx) = channel(); - let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); - let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool1.spawn({ - let pool1 = Arc::clone(&pool1); - move || { - pool2.spawn_broadcast(move |_| { - let tx = tx.clone(); - pool1.spawn_broadcast(move |_| tx.send(()).unwrap()) - }) - } - }); - assert_eq!(rx.into_iter().count(), 3 * 7); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn broadcast_mutual_sleepy() { - let count = AtomicUsize::new(0); - let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool1.install(|| { - thread::sleep(time::Duration::from_secs(1)); - pool2.broadcast(|_| { - thread::sleep(time::Duration::from_secs(1)); - pool1.broadcast(|_| { - thread::sleep(time::Duration::from_millis(100)); - count.fetch_add(1, Ordering::Relaxed); - }) - }) - }); - assert_eq!(count.into_inner(), 3 * 7); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_broadcast_mutual_sleepy() { - let (tx, rx) = channel(); - let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); - let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool1.spawn({ - let pool1 = Arc::clone(&pool1); - move || { - thread::sleep(time::Duration::from_secs(1)); - pool2.spawn_broadcast(move |_| { - let tx = tx.clone(); - thread::sleep(time::Duration::from_secs(1)); - pool1.spawn_broadcast(move |_| { - thread::sleep(time::Duration::from_millis(100)); - tx.send(()).unwrap(); - }) - }) - } - }); - assert_eq!(rx.into_iter().count(), 3 * 7); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn broadcast_panic_one() { - let count = AtomicUsize::new(0); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let result = crate::unwind::halt_unwinding(|| { - pool.broadcast(|ctx| { - count.fetch_add(1, Ordering::Relaxed); - if ctx.index() == 3 { - panic!("Hello, world!"); - } - }) - }); - assert_eq!(count.into_inner(), 7); - assert!(result.is_err(), "broadcast panic should propagate!"); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn spawn_broadcast_panic_one() { - let (tx, rx) = channel(); - let (panic_tx, panic_rx) = channel(); - let pool = ThreadPoolBuilder::new() - .num_threads(7) - .panic_handler(move |e| panic_tx.send(e).unwrap()) - .build() - .unwrap(); - pool.spawn_broadcast(move |ctx| { - tx.send(()).unwrap(); - if ctx.index() == 3 { - panic!("Hello, world!"); - } - }); - drop(pool); // including panic_tx - assert_eq!(rx.into_iter().count(), 7); - assert_eq!(panic_rx.into_iter().count(), 1); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn broadcast_panic_many() { - let count = AtomicUsize::new(0); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let result = crate::unwind::halt_unwinding(|| { - pool.broadcast(|ctx| { - count.fetch_add(1, Ordering::Relaxed); - if ctx.index() % 2 == 0 { - panic!("Hello, world!"); - } - }) - }); - assert_eq!(count.into_inner(), 7); - assert!(result.is_err(), "broadcast panic should propagate!"); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn spawn_broadcast_panic_many() { - let (tx, rx) = channel(); - let (panic_tx, panic_rx) = channel(); - let pool = ThreadPoolBuilder::new() - .num_threads(7) - .panic_handler(move |e| panic_tx.send(e).unwrap()) - .build() - .unwrap(); - pool.spawn_broadcast(move |ctx| { - tx.send(()).unwrap(); - if ctx.index() % 2 == 0 { - panic!("Hello, world!"); - } - }); - drop(pool); // including panic_tx - assert_eq!(rx.into_iter().count(), 7); - assert_eq!(panic_rx.into_iter().count(), 4); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn broadcast_sleep_race() { - let test_duration = time::Duration::from_secs(1); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let start = time::Instant::now(); - while start.elapsed() < test_duration { - pool.broadcast(|ctx| { - // A slight spread of sleep duration increases the chance that one - // of the threads will race in the pool's idle sleep afterward. - thread::sleep(time::Duration::from_micros(ctx.index() as u64)); - }); - } -} - -#[test] -fn broadcast_after_spawn_broadcast() { - let (tx, rx) = channel(); - - // Queue a non-blocking spawn_broadcast. - crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); - - // This blocking broadcast runs after all prior broadcasts. - crate::broadcast(|_| {}); - - // The spawn_broadcast **must** have run by now on all threads. - let mut v: Vec<_> = rx.try_iter().collect(); - v.sort_unstable(); - assert!(v.into_iter().eq(0..crate::current_num_threads())); -} - -#[test] -fn broadcast_after_spawn() { - let (tx, rx) = channel(); - - // Queue a regular spawn on a thread-local deque. - crate::registry::in_worker(move |_, _| { - crate::spawn(move || tx.send(22).unwrap()); - }); - - // Broadcast runs after the local deque is empty. - crate::broadcast(|_| {}); - - // The spawn **must** have run by now. - assert_eq!(22, rx.try_recv().unwrap()); -} diff --git a/vendor/rayon-core/src/compile_fail/mod.rs b/vendor/rayon-core/src/compile_fail/mod.rs deleted file mode 100644 index f2ec646..0000000 --- a/vendor/rayon-core/src/compile_fail/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -// These modules contain `compile_fail` doc tests. -mod quicksort_race1; -mod quicksort_race2; -mod quicksort_race3; -mod rc_return; -mod rc_upvar; -mod scope_join_bad; diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race1.rs b/vendor/rayon-core/src/compile_fail/quicksort_race1.rs deleted file mode 100644 index 5615033..0000000 --- a/vendor/rayon-core/src/compile_fail/quicksort_race1.rs +++ /dev/null @@ -1,28 +0,0 @@ -/*! ```compile_fail,E0524 - -fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { - if v.len() <= 1 { - return; - } - - let mid = partition(v); - let (lo, _hi) = v.split_at_mut(mid); - rayon_core::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR -} - -fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { - let pivot = v.len() - 1; - let mut i = 0; - for j in 0..pivot { - if v[j] <= v[pivot] { - v.swap(i, j); - i += 1; - } - } - v.swap(i, pivot); - i -} - -fn main() { } - -``` */ diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race2.rs b/vendor/rayon-core/src/compile_fail/quicksort_race2.rs deleted file mode 100644 index 020589c..0000000 --- a/vendor/rayon-core/src/compile_fail/quicksort_race2.rs +++ /dev/null @@ -1,28 +0,0 @@ -/*! ```compile_fail,E0500 - -fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { - if v.len() <= 1 { - return; - } - - let mid = partition(v); - let (lo, _hi) = v.split_at_mut(mid); - rayon_core::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR -} - -fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { - let pivot = v.len() - 1; - let mut i = 0; - for j in 0..pivot { - if v[j] <= v[pivot] { - v.swap(i, j); - i += 1; - } - } - v.swap(i, pivot); - i -} - -fn main() { } - -``` */ diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race3.rs b/vendor/rayon-core/src/compile_fail/quicksort_race3.rs deleted file mode 100644 index 16fbf3b..0000000 --- a/vendor/rayon-core/src/compile_fail/quicksort_race3.rs +++ /dev/null @@ -1,28 +0,0 @@ -/*! ```compile_fail,E0524 - -fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { - if v.len() <= 1 { - return; - } - - let mid = partition(v); - let (_lo, hi) = v.split_at_mut(mid); - rayon_core::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR -} - -fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { - let pivot = v.len() - 1; - let mut i = 0; - for j in 0..pivot { - if v[j] <= v[pivot] { - v.swap(i, j); - i += 1; - } - } - v.swap(i, pivot); - i -} - -fn main() { } - -``` */ diff --git a/vendor/rayon-core/src/compile_fail/rc_return.rs b/vendor/rayon-core/src/compile_fail/rc_return.rs deleted file mode 100644 index 93e3a60..0000000 --- a/vendor/rayon-core/src/compile_fail/rc_return.rs +++ /dev/null @@ -1,17 +0,0 @@ -/** ```compile_fail,E0277 - -use std::rc::Rc; - -rayon_core::join(|| Rc::new(22), || ()); //~ ERROR - -``` */ -mod left {} - -/** ```compile_fail,E0277 - -use std::rc::Rc; - -rayon_core::join(|| (), || Rc::new(23)); //~ ERROR - -``` */ -mod right {} diff --git a/vendor/rayon-core/src/compile_fail/rc_upvar.rs b/vendor/rayon-core/src/compile_fail/rc_upvar.rs deleted file mode 100644 index d8aebcf..0000000 --- a/vendor/rayon-core/src/compile_fail/rc_upvar.rs +++ /dev/null @@ -1,9 +0,0 @@ -/*! ```compile_fail,E0277 - -use std::rc::Rc; - -let r = Rc::new(22); -rayon_core::join(|| r.clone(), || r.clone()); -//~^ ERROR - -``` */ diff --git a/vendor/rayon-core/src/compile_fail/scope_join_bad.rs b/vendor/rayon-core/src/compile_fail/scope_join_bad.rs deleted file mode 100644 index 75e4c5c..0000000 --- a/vendor/rayon-core/src/compile_fail/scope_join_bad.rs +++ /dev/null @@ -1,24 +0,0 @@ -/*! ```compile_fail,E0373 - -fn bad_scope<F>(f: F) - where F: FnOnce(&i32) + Send, -{ - rayon_core::scope(|s| { - let x = 22; - s.spawn(|_| f(&x)); //~ ERROR `x` does not live long enough - }); -} - -fn good_scope<F>(f: F) - where F: FnOnce(&i32) + Send, -{ - let x = 22; - rayon_core::scope(|s| { - s.spawn(|_| f(&x)); - }); -} - -fn main() { -} - -``` */ diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs deleted file mode 100644 index 5664bb3..0000000 --- a/vendor/rayon-core/src/job.rs +++ /dev/null @@ -1,270 +0,0 @@ -use crate::latch::Latch; -use crate::unwind; -use crossbeam_deque::{Injector, Steal}; -use std::any::Any; -use std::cell::UnsafeCell; -use std::mem; -use std::sync::Arc; - -pub(super) enum JobResult<T> { - None, - Ok(T), - Panic(Box<dyn Any + Send>), -} - -/// A `Job` is used to advertise work for other threads that they may -/// want to steal. In accordance with time honored tradition, jobs are -/// arranged in a deque, so that thieves can take from the top of the -/// deque while the main worker manages the bottom of the deque. This -/// deque is managed by the `thread_pool` module. -pub(super) trait Job { - /// Unsafe: this may be called from a different thread than the one - /// which scheduled the job, so the implementer must ensure the - /// appropriate traits are met, whether `Send`, `Sync`, or both. - unsafe fn execute(this: *const ()); -} - -/// Effectively a Job trait object. Each JobRef **must** be executed -/// exactly once, or else data may leak. -/// -/// Internally, we store the job's data in a `*const ()` pointer. The -/// true type is something like `*const StackJob<...>`, but we hide -/// it. We also carry the "execute fn" from the `Job` trait. -pub(super) struct JobRef { - pointer: *const (), - execute_fn: unsafe fn(*const ()), -} - -unsafe impl Send for JobRef {} -unsafe impl Sync for JobRef {} - -impl JobRef { - /// Unsafe: caller asserts that `data` will remain valid until the - /// job is executed. - pub(super) unsafe fn new<T>(data: *const T) -> JobRef - where - T: Job, - { - // erase types: - JobRef { - pointer: data as *const (), - execute_fn: <T as Job>::execute, - } - } - - /// Returns an opaque handle that can be saved and compared, - /// without making `JobRef` itself `Copy + Eq`. - #[inline] - pub(super) fn id(&self) -> impl Eq { - (self.pointer, self.execute_fn) - } - - #[inline] - pub(super) unsafe fn execute(self) { - (self.execute_fn)(self.pointer) - } -} - -/// A job that will be owned by a stack slot. This means that when it -/// executes it need not free any heap data, the cleanup occurs when -/// the stack frame is later popped. The function parameter indicates -/// `true` if the job was stolen -- executed on a different thread. -pub(super) struct StackJob<L, F, R> -where - L: Latch + Sync, - F: FnOnce(bool) -> R + Send, - R: Send, -{ - pub(super) latch: L, - func: UnsafeCell<Option<F>>, - result: UnsafeCell<JobResult<R>>, -} - -impl<L, F, R> StackJob<L, F, R> -where - L: Latch + Sync, - F: FnOnce(bool) -> R + Send, - R: Send, -{ - pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> { - StackJob { - latch, - func: UnsafeCell::new(Some(func)), - result: UnsafeCell::new(JobResult::None), - } - } - - pub(super) unsafe fn as_job_ref(&self) -> JobRef { - JobRef::new(self) - } - - pub(super) unsafe fn run_inline(self, stolen: bool) -> R { - self.func.into_inner().unwrap()(stolen) - } - - pub(super) unsafe fn into_result(self) -> R { - self.result.into_inner().into_return_value() - } -} - -impl<L, F, R> Job for StackJob<L, F, R> -where - L: Latch + Sync, - F: FnOnce(bool) -> R + Send, - R: Send, -{ - unsafe fn execute(this: *const ()) { - let this = &*(this as *const Self); - let abort = unwind::AbortIfPanic; - let func = (*this.func.get()).take().unwrap(); - (*this.result.get()) = JobResult::call(func); - Latch::set(&this.latch); - mem::forget(abort); - } -} - -/// Represents a job stored in the heap. Used to implement -/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply -/// invokes a closure, which then triggers the appropriate logic to -/// signal that the job executed. -/// -/// (Probably `StackJob` should be refactored in a similar fashion.) -pub(super) struct HeapJob<BODY> -where - BODY: FnOnce() + Send, -{ - job: BODY, -} - -impl<BODY> HeapJob<BODY> -where - BODY: FnOnce() + Send, -{ - pub(super) fn new(job: BODY) -> Box<Self> { - Box::new(HeapJob { job }) - } - - /// Creates a `JobRef` from this job -- note that this hides all - /// lifetimes, so it is up to you to ensure that this JobRef - /// doesn't outlive any data that it closes over. - pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { - JobRef::new(Box::into_raw(self)) - } - - /// Creates a static `JobRef` from this job. - pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef - where - BODY: 'static, - { - unsafe { self.into_job_ref() } - } -} - -impl<BODY> Job for HeapJob<BODY> -where - BODY: FnOnce() + Send, -{ - unsafe fn execute(this: *const ()) { - let this = Box::from_raw(this as *mut Self); - (this.job)(); - } -} - -/// Represents a job stored in an `Arc` -- like `HeapJob`, but may -/// be turned into multiple `JobRef`s and called multiple times. -pub(super) struct ArcJob<BODY> -where - BODY: Fn() + Send + Sync, -{ - job: BODY, -} - -impl<BODY> ArcJob<BODY> -where - BODY: Fn() + Send + Sync, -{ - pub(super) fn new(job: BODY) -> Arc<Self> { - Arc::new(ArcJob { job }) - } - - /// Creates a `JobRef` from this job -- note that this hides all - /// lifetimes, so it is up to you to ensure that this JobRef - /// doesn't outlive any data that it closes over. - pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { - JobRef::new(Arc::into_raw(Arc::clone(this))) - } - - /// Creates a static `JobRef` from this job. - pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef - where - BODY: 'static, - { - unsafe { Self::as_job_ref(this) } - } -} - -impl<BODY> Job for ArcJob<BODY> -where - BODY: Fn() + Send + Sync, -{ - unsafe fn execute(this: *const ()) { - let this = Arc::from_raw(this as *mut Self); - (this.job)(); - } -} - -impl<T> JobResult<T> { - fn call(func: impl FnOnce(bool) -> T) -> Self { - match unwind::halt_unwinding(|| func(true)) { - Ok(x) => JobResult::Ok(x), - Err(x) => JobResult::Panic(x), - } - } - - /// Convert the `JobResult` for a job that has finished (and hence - /// its JobResult is populated) into its return value. - /// - /// NB. This will panic if the job panicked. - pub(super) fn into_return_value(self) -> T { - match self { - JobResult::None => unreachable!(), - JobResult::Ok(x) => x, - JobResult::Panic(x) => unwind::resume_unwinding(x), - } - } -} - -/// Indirect queue to provide FIFO job priority. -pub(super) struct JobFifo { - inner: Injector<JobRef>, -} - -impl JobFifo { - pub(super) fn new() -> Self { - JobFifo { - inner: Injector::new(), - } - } - - pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { - // A little indirection ensures that spawns are always prioritized in FIFO order. The - // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front - // (FIFO), but either way they will end up popping from the front of this queue. - self.inner.push(job_ref); - JobRef::new(self) - } -} - -impl Job for JobFifo { - unsafe fn execute(this: *const ()) { - // We "execute" a queue by executing its first job, FIFO. - let this = &*(this as *const Self); - loop { - match this.inner.steal() { - Steal::Success(job_ref) => break job_ref.execute(), - Steal::Empty => panic!("FIFO is empty"), - Steal::Retry => {} - } - } - } -} diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs deleted file mode 100644 index 5ab9f6b..0000000 --- a/vendor/rayon-core/src/join/mod.rs +++ /dev/null @@ -1,188 +0,0 @@ -use crate::job::StackJob; -use crate::latch::SpinLatch; -use crate::registry::{self, WorkerThread}; -use crate::unwind; -use std::any::Any; - -use crate::FnContext; - -#[cfg(test)] -mod test; - -/// Takes two closures and *potentially* runs them in parallel. It -/// returns a pair of the results from those closures. -/// -/// Conceptually, calling `join()` is similar to spawning two threads, -/// one executing each of the two closures. However, the -/// implementation is quite different and incurs very low -/// overhead. The underlying technique is called "work stealing": the -/// Rayon runtime uses a fixed pool of worker threads and attempts to -/// only execute code in parallel when there are idle CPUs to handle -/// it. -/// -/// When `join` is called from outside the thread pool, the calling -/// thread will block while the closures execute in the pool. When -/// `join` is called within the pool, the calling thread still actively -/// participates in the thread pool. It will begin by executing closure -/// A (on the current thread). While it is doing that, it will advertise -/// closure B as being available for other threads to execute. Once closure A -/// has completed, the current thread will try to execute closure B; -/// if however closure B has been stolen, then it will look for other work -/// while waiting for the thief to fully execute closure B. (This is the -/// typical work-stealing strategy). -/// -/// # Examples -/// -/// This example uses join to perform a quick-sort (note this is not a -/// particularly optimized implementation: if you **actually** want to -/// sort for real, you should prefer [the `par_sort` method] offered -/// by Rayon). -/// -/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort -/// -/// ```rust -/// # use rayon_core as rayon; -/// let mut v = vec![5, 1, 8, 22, 0, 44]; -/// quick_sort(&mut v); -/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]); -/// -/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { -/// if v.len() > 1 { -/// let mid = partition(v); -/// let (lo, hi) = v.split_at_mut(mid); -/// rayon::join(|| quick_sort(lo), -/// || quick_sort(hi)); -/// } -/// } -/// -/// // Partition rearranges all items `<=` to the pivot -/// // item (arbitrary selected to be the last item in the slice) -/// // to the first half of the slice. It then returns the -/// // "dividing point" where the pivot is placed. -/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { -/// let pivot = v.len() - 1; -/// let mut i = 0; -/// for j in 0..pivot { -/// if v[j] <= v[pivot] { -/// v.swap(i, j); -/// i += 1; -/// } -/// } -/// v.swap(i, pivot); -/// i -/// } -/// ``` -/// -/// # Warning about blocking I/O -/// -/// The assumption is that the closures given to `join()` are -/// CPU-bound tasks that do not perform I/O or other blocking -/// operations. If you do perform I/O, and that I/O should block -/// (e.g., waiting for a network request), the overall performance may -/// be poor. Moreover, if you cause one closure to be blocked waiting -/// on another (for example, using a channel), that could lead to a -/// deadlock. -/// -/// # Panics -/// -/// No matter what happens, both closures will always be executed. If -/// a single closure panics, whether it be the first or second -/// closure, that panic will be propagated and hence `join()` will -/// panic with the same panic value. If both closures panic, `join()` -/// will panic with the panic value from the first closure. -pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, -{ - #[inline] - fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R { - move |_| f() - } - - join_context(call(oper_a), call(oper_b)) -} - -/// Identical to `join`, except that the closures have a parameter -/// that provides context for the way the closure has been called, -/// especially indicating whether they're executing on a different -/// thread than where `join_context` was called. This will occur if -/// the second job is stolen by a different thread, or if -/// `join_context` was called from outside the thread pool to begin -/// with. -pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce(FnContext) -> RA + Send, - B: FnOnce(FnContext) -> RB + Send, - RA: Send, - RB: Send, -{ - #[inline] - fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { - move || f(FnContext::new(injected)) - } - - #[inline] - fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { - move |migrated| f(FnContext::new(migrated)) - } - - registry::in_worker(|worker_thread, injected| unsafe { - // Create virtual wrapper for task b; this all has to be - // done here so that the stack frame can keep it all live - // long enough. - let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); - let job_b_ref = job_b.as_job_ref(); - let job_b_id = job_b_ref.id(); - worker_thread.push(job_b_ref); - - // Execute task a; hopefully b gets stolen in the meantime. - let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); - let result_a = match status_a { - Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), - }; - - // Now that task A has finished, try to pop job B from the - // local stack. It may already have been popped by job A; it - // may also have been stolen. There may also be some tasks - // pushed on top of it in the stack, and we will have to pop - // those off to get to it. - while !job_b.latch.probe() { - if let Some(job) = worker_thread.take_local_job() { - if job_b_id == job.id() { - // Found it! Let's run it. - // - // Note that this could panic, but it's ok if we unwind here. - let result_b = job_b.run_inline(injected); - return (result_a, result_b); - } else { - worker_thread.execute(job); - } - } else { - // Local deque is empty. Time to steal from other - // threads. - worker_thread.wait_until(&job_b.latch); - debug_assert!(job_b.latch.probe()); - break; - } - } - - (result_a, job_b.into_result()) - }) -} - -/// If job A panics, we still cannot return until we are sure that job -/// B is complete. This is because it may contain references into the -/// enclosing stack frame(s). -#[cold] // cold path -unsafe fn join_recover_from_panic( - worker_thread: &WorkerThread, - job_b_latch: &SpinLatch<'_>, - err: Box<dyn Any + Send>, -) -> ! { - worker_thread.wait_until(job_b_latch); - unwind::resume_unwinding(err) -} diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs deleted file mode 100644 index b303dbc..0000000 --- a/vendor/rayon-core/src/join/test.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! Tests for the join code. - -use crate::join::*; -use crate::unwind; -use crate::ThreadPoolBuilder; -use rand::distributions::Standard; -use rand::{Rng, SeedableRng}; -use rand_xorshift::XorShiftRng; - -fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) { - if v.len() <= 1 { - return; - } - - let mid = partition(v); - let (lo, hi) = v.split_at_mut(mid); - join(|| quick_sort(lo), || quick_sort(hi)); -} - -fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize { - let pivot = v.len() - 1; - let mut i = 0; - for j in 0..pivot { - if v[j] <= v[pivot] { - v.swap(i, j); - i += 1; - } - } - v.swap(i, pivot); - i -} - -fn seeded_rng() -> XorShiftRng { - let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); - (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); - XorShiftRng::from_seed(seed) -} - -#[test] -fn sort() { - let rng = seeded_rng(); - let mut data: Vec<u32> = rng.sample_iter(&Standard).take(6 * 1024).collect(); - let mut sorted_data = data.clone(); - sorted_data.sort(); - quick_sort(&mut data); - assert_eq!(data, sorted_data); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn sort_in_pool() { - let rng = seeded_rng(); - let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect(); - - let pool = ThreadPoolBuilder::new().build().unwrap(); - let mut sorted_data = data.clone(); - sorted_data.sort(); - pool.install(|| quick_sort(&mut data)); - assert_eq!(data, sorted_data); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_a() { - join(|| panic!("Hello, world!"), || ()); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_b() { - join(|| (), || panic!("Hello, world!")); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_both() { - join(|| panic!("Hello, world!"), || panic!("Goodbye, world!")); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_b_still_executes() { - let mut x = false; - match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) { - Ok(_) => panic!("failed to propagate panic from closure A,"), - Err(_) => assert!(x, "closure b failed to execute"), - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_both() { - // If we're not in a pool, both should be marked stolen as they're injected. - let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated()); - assert!(a_migrated); - assert!(b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_neither() { - // If we're already in a 1-thread pool, neither job should be stolen. - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (a_migrated, b_migrated) = - pool.install(|| join_context(|a| a.migrated(), |b| b.migrated())); - assert!(!a_migrated); - assert!(!b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_context_second() { - use std::sync::Barrier; - - // If we're already in a 2-thread pool, the second job should be stolen. - let barrier = Barrier::new(2); - let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let (a_migrated, b_migrated) = pool.install(|| { - join_context( - |a| { - barrier.wait(); - a.migrated() - }, - |b| { - barrier.wait(); - b.migrated() - }, - ) - }); - assert!(!a_migrated); - assert!(b_migrated); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn join_counter_overflow() { - const MAX: u32 = 500_000; - - let mut i = 0; - let mut j = 0; - let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - - // Hammer on join a bunch of times -- used to hit overflow debug-assertions - // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797 - for _ in 0..MAX { - pool.join(|| i += 1, || j += 1); - } - - assert_eq!(i, MAX); - assert_eq!(j, MAX); -} diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs deleted file mode 100644 index b0cbbd8..0000000 --- a/vendor/rayon-core/src/latch.rs +++ /dev/null @@ -1,460 +0,0 @@ -use std::marker::PhantomData; -use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; -use std::usize; - -use crate::registry::{Registry, WorkerThread}; - -/// We define various kinds of latches, which are all a primitive signaling -/// mechanism. A latch starts as false. Eventually someone calls `set()` and -/// it becomes true. You can test if it has been set by calling `probe()`. -/// -/// Some kinds of latches, but not all, support a `wait()` operation -/// that will wait until the latch is set, blocking efficiently. That -/// is not part of the trait since it is not possibly to do with all -/// latches. -/// -/// The intention is that `set()` is called once, but `probe()` may be -/// called any number of times. Once `probe()` returns true, the memory -/// effects that occurred before `set()` become visible. -/// -/// It'd probably be better to refactor the API into two paired types, -/// but that's a bit of work, and this is not a public API. -/// -/// ## Memory ordering -/// -/// Latches need to guarantee two things: -/// -/// - Once `probe()` returns true, all memory effects from the `set()` -/// are visible (in other words, the set should synchronize-with -/// the probe). -/// - Once `set()` occurs, the next `probe()` *will* observe it. This -/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep -/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. -pub(super) trait Latch { - /// Set the latch, signalling others. - /// - /// # WARNING - /// - /// Setting a latch triggers other threads to wake up and (in some - /// cases) complete. This may, in turn, cause memory to be - /// deallocated and so forth. One must be very careful about this, - /// and it's typically better to read all the fields you will need - /// to access *before* a latch is set! - /// - /// This function operates on `*const Self` instead of `&self` to allow it - /// to become dangling during this call. The caller must ensure that the - /// pointer is valid upon entry, and not invalidated during the call by any - /// actions other than `set` itself. - unsafe fn set(this: *const Self); -} - -pub(super) trait AsCoreLatch { - fn as_core_latch(&self) -> &CoreLatch; -} - -/// Latch is not set, owning thread is awake -const UNSET: usize = 0; - -/// Latch is not set, owning thread is going to sleep on this latch -/// (but has not yet fallen asleep). -const SLEEPY: usize = 1; - -/// Latch is not set, owning thread is asleep on this latch and -/// must be awoken. -const SLEEPING: usize = 2; - -/// Latch is set. -const SET: usize = 3; - -/// Spin latches are the simplest, most efficient kind, but they do -/// not support a `wait()` operation. They just have a boolean flag -/// that becomes true when `set()` is called. -#[derive(Debug)] -pub(super) struct CoreLatch { - state: AtomicUsize, -} - -impl CoreLatch { - #[inline] - fn new() -> Self { - Self { - state: AtomicUsize::new(0), - } - } - - /// Invoked by owning thread as it prepares to sleep. Returns true - /// if the owning thread may proceed to fall asleep, false if the - /// latch was set in the meantime. - #[inline] - pub(super) fn get_sleepy(&self) -> bool { - self.state - .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - } - - /// Invoked by owning thread as it falls asleep sleep. Returns - /// true if the owning thread should block, or false if the latch - /// was set in the meantime. - #[inline] - pub(super) fn fall_asleep(&self) -> bool { - self.state - .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - } - - /// Invoked by owning thread as it falls asleep sleep. Returns - /// true if the owning thread should block, or false if the latch - /// was set in the meantime. - #[inline] - pub(super) fn wake_up(&self) { - if !self.probe() { - let _ = - self.state - .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); - } - } - - /// Set the latch. If this returns true, the owning thread was sleeping - /// and must be awoken. - /// - /// This is private because, typically, setting a latch involves - /// doing some wakeups; those are encapsulated in the surrounding - /// latch code. - #[inline] - unsafe fn set(this: *const Self) -> bool { - let old_state = (*this).state.swap(SET, Ordering::AcqRel); - old_state == SLEEPING - } - - /// Test if this latch has been set. - #[inline] - pub(super) fn probe(&self) -> bool { - self.state.load(Ordering::Acquire) == SET - } -} - -impl AsCoreLatch for CoreLatch { - #[inline] - fn as_core_latch(&self) -> &CoreLatch { - self - } -} - -/// Spin latches are the simplest, most efficient kind, but they do -/// not support a `wait()` operation. They just have a boolean flag -/// that becomes true when `set()` is called. -pub(super) struct SpinLatch<'r> { - core_latch: CoreLatch, - registry: &'r Arc<Registry>, - target_worker_index: usize, - cross: bool, -} - -impl<'r> SpinLatch<'r> { - /// Creates a new spin latch that is owned by `thread`. This means - /// that `thread` is the only thread that should be blocking on - /// this latch -- it also means that when the latch is set, we - /// will wake `thread` if it is sleeping. - #[inline] - pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { - SpinLatch { - core_latch: CoreLatch::new(), - registry: thread.registry(), - target_worker_index: thread.index(), - cross: false, - } - } - - /// Creates a new spin latch for cross-threadpool blocking. Notably, we - /// need to make sure the registry is kept alive after setting, so we can - /// safely call the notification. - #[inline] - pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { - SpinLatch { - cross: true, - ..SpinLatch::new(thread) - } - } - - #[inline] - pub(super) fn probe(&self) -> bool { - self.core_latch.probe() - } -} - -impl<'r> AsCoreLatch for SpinLatch<'r> { - #[inline] - fn as_core_latch(&self) -> &CoreLatch { - &self.core_latch - } -} - -impl<'r> Latch for SpinLatch<'r> { - #[inline] - unsafe fn set(this: *const Self) { - let cross_registry; - - let registry: &Registry = if (*this).cross { - // Ensure the registry stays alive while we notify it. - // Otherwise, it would be possible that we set the spin - // latch and the other thread sees it and exits, causing - // the registry to be deallocated, all before we get a - // chance to invoke `registry.notify_worker_latch_is_set`. - cross_registry = Arc::clone((*this).registry); - &cross_registry - } else { - // If this is not a "cross-registry" spin-latch, then the - // thread which is performing `set` is itself ensuring - // that the registry stays alive. However, that doesn't - // include this *particular* `Arc` handle if the waiting - // thread then exits, so we must completely dereference it. - (*this).registry - }; - let target_worker_index = (*this).target_worker_index; - - // NOTE: Once we `set`, the target may proceed and invalidate `this`! - if CoreLatch::set(&(*this).core_latch) { - // Subtle: at this point, we can no longer read from - // `self`, because the thread owning this spin latch may - // have awoken and deallocated the latch. Therefore, we - // only use fields whose values we already read. - registry.notify_worker_latch_is_set(target_worker_index); - } - } -} - -/// A Latch starts as false and eventually becomes true. You can block -/// until it becomes true. -#[derive(Debug)] -pub(super) struct LockLatch { - m: Mutex<bool>, - v: Condvar, -} - -impl LockLatch { - #[inline] - pub(super) fn new() -> LockLatch { - LockLatch { - m: Mutex::new(false), - v: Condvar::new(), - } - } - - /// Block until latch is set, then resets this lock latch so it can be reused again. - pub(super) fn wait_and_reset(&self) { - let mut guard = self.m.lock().unwrap(); - while !*guard { - guard = self.v.wait(guard).unwrap(); - } - *guard = false; - } - - /// Block until latch is set. - pub(super) fn wait(&self) { - let mut guard = self.m.lock().unwrap(); - while !*guard { - guard = self.v.wait(guard).unwrap(); - } - } -} - -impl Latch for LockLatch { - #[inline] - unsafe fn set(this: *const Self) { - let mut guard = (*this).m.lock().unwrap(); - *guard = true; - (*this).v.notify_all(); - } -} - -/// Once latches are used to implement one-time blocking, primarily -/// for the termination flag of the threads in the pool. -/// -/// Note: like a `SpinLatch`, once-latches are always associated with -/// some registry that is probing them, which must be tickled when -/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a -/// reference to that registry. This is because in some cases the -/// registry owns the once-latch, and that would create a cycle. So a -/// `OnceLatch` must be given a reference to its owning registry when -/// it is set. For this reason, it does not implement the `Latch` -/// trait (but it doesn't have to, as it is not used in those generic -/// contexts). -#[derive(Debug)] -pub(super) struct OnceLatch { - core_latch: CoreLatch, -} - -impl OnceLatch { - #[inline] - pub(super) fn new() -> OnceLatch { - Self { - core_latch: CoreLatch::new(), - } - } - - /// Set the latch, then tickle the specific worker thread, - /// which should be the one that owns this latch. - #[inline] - pub(super) unsafe fn set_and_tickle_one( - this: *const Self, - registry: &Registry, - target_worker_index: usize, - ) { - if CoreLatch::set(&(*this).core_latch) { - registry.notify_worker_latch_is_set(target_worker_index); - } - } -} - -impl AsCoreLatch for OnceLatch { - #[inline] - fn as_core_latch(&self) -> &CoreLatch { - &self.core_latch - } -} - -/// Counting latches are used to implement scopes. They track a -/// counter. Unlike other latches, calling `set()` does not -/// necessarily make the latch be considered `set()`; instead, it just -/// decrements the counter. The latch is only "set" (in the sense that -/// `probe()` returns true) once the counter reaches zero. -#[derive(Debug)] -pub(super) struct CountLatch { - counter: AtomicUsize, - kind: CountLatchKind, -} - -enum CountLatchKind { - /// A latch for scopes created on a rayon thread which will participate in work- - /// stealing while it waits for completion. This thread is not necessarily part - /// of the same registry as the scope itself! - Stealing { - latch: CoreLatch, - /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool - /// with registry B, when a job completes in a thread of registry B, we may - /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. - /// That means we need a reference to registry A (since at that point we will - /// only have a reference to registry B), so we stash it here. - registry: Arc<Registry>, - /// The index of the worker to wake in `registry` - worker_index: usize, - }, - - /// A latch for scopes created on a non-rayon thread which will block to wait. - Blocking { latch: LockLatch }, -} - -impl std::fmt::Debug for CountLatchKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - CountLatchKind::Stealing { latch, .. } => { - f.debug_tuple("Stealing").field(latch).finish() - } - CountLatchKind::Blocking { latch, .. } => { - f.debug_tuple("Blocking").field(latch).finish() - } - } - } -} - -impl CountLatch { - pub(super) fn new(owner: Option<&WorkerThread>) -> Self { - Self::with_count(1, owner) - } - - pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { - Self { - counter: AtomicUsize::new(count), - kind: match owner { - Some(owner) => CountLatchKind::Stealing { - latch: CoreLatch::new(), - registry: Arc::clone(owner.registry()), - worker_index: owner.index(), - }, - None => CountLatchKind::Blocking { - latch: LockLatch::new(), - }, - }, - } - } - - #[inline] - pub(super) fn increment(&self) { - let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); - debug_assert!(old_counter != 0); - } - - pub(super) fn wait(&self, owner: Option<&WorkerThread>) { - match &self.kind { - CountLatchKind::Stealing { - latch, - registry, - worker_index, - } => unsafe { - let owner = owner.expect("owner thread"); - debug_assert_eq!(registry.id(), owner.registry().id()); - debug_assert_eq!(*worker_index, owner.index()); - owner.wait_until(latch); - }, - CountLatchKind::Blocking { latch } => latch.wait(), - } - } -} - -impl Latch for CountLatch { - #[inline] - unsafe fn set(this: *const Self) { - if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - // NOTE: Once we call `set` on the internal `latch`, - // the target may proceed and invalidate `this`! - match (*this).kind { - CountLatchKind::Stealing { - ref latch, - ref registry, - worker_index, - } => { - let registry = Arc::clone(registry); - if CoreLatch::set(latch) { - registry.notify_worker_latch_is_set(worker_index); - } - } - CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), - } - } - } -} - -/// `&L` without any implication of `dereferenceable` for `Latch::set` -pub(super) struct LatchRef<'a, L> { - inner: *const L, - marker: PhantomData<&'a L>, -} - -impl<L> LatchRef<'_, L> { - pub(super) fn new(inner: &L) -> LatchRef<'_, L> { - LatchRef { - inner, - marker: PhantomData, - } - } -} - -unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} - -impl<L> Deref for LatchRef<'_, L> { - type Target = L; - - fn deref(&self) -> &L { - // SAFETY: if we have &self, the inner latch is still alive - unsafe { &*self.inner } - } -} - -impl<L: Latch> Latch for LatchRef<'_, L> { - #[inline] - unsafe fn set(this: *const Self) { - L::set((*this).inner); - } -} diff --git a/vendor/rayon-core/src/lib.rs b/vendor/rayon-core/src/lib.rs deleted file mode 100644 index 7001c8c..0000000 --- a/vendor/rayon-core/src/lib.rs +++ /dev/null @@ -1,869 +0,0 @@ -//! Rayon-core houses the core stable APIs of Rayon. -//! -//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there. -//! -//! [`join`] is used to take two closures and potentially run them in parallel. -//! - It will run in parallel if task B gets stolen before task A can finish. -//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B. -//! -//! [`scope`] creates a scope in which you can run any number of parallel tasks. -//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed. -//! The scope will exist until all tasks spawned within the scope have been completed. -//! -//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function. -//! -//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one. -//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque, -//! where it becomes available for work stealing from other threads in the local threadpool. -//! -//! [`join`]: fn.join.html -//! [`scope`]: fn.scope.html -//! [`scope()`]: fn.scope.html -//! [`spawn`]: fn.spawn.html -//! [`ThreadPool`]: struct.threadpool.html -//! [`install()`]: struct.ThreadPool.html#method.install -//! [`spawn()`]: struct.ThreadPool.html#method.spawn -//! [`join()`]: struct.ThreadPool.html#method.join -//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html -//! -//! # Global fallback when threading is unsupported -//! -//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that -//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi` -//! targets are notable examples of this. Rather than panicking on the unsupported error when -//! creating the implicit global threadpool, Rayon configures a fallback mode instead. -//! -//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting -//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since -//! there is no other thread to share the work. However, since the pool is not running independent -//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower- -//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate -//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local` -//! can also volunteer execution time. -//! -//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback. -//! -//! # Restricting multiple versions -//! -//! In order to ensure proper coordination between threadpools, and especially -//! to make sure there's only one global threadpool, `rayon-core` is actively -//! restricted from building multiple versions of itself into a single target. -//! You may see a build error like this in violation: -//! -//! ```text -//! error: native library `rayon-core` is being linked to by more -//! than one package, and can only be linked to by one package -//! ``` -//! -//! While we strive to keep `rayon-core` semver-compatible, it's still -//! possible to arrive at this situation if different crates have overly -//! restrictive tilde or inequality requirements for `rayon-core`. The -//! conflicting requirements will need to be resolved before the build will -//! succeed. - -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] -#![deny(unreachable_pub)] -#![warn(rust_2018_idioms)] - -use std::any::Any; -use std::env; -use std::error::Error; -use std::fmt; -use std::io; -use std::marker::PhantomData; -use std::str::FromStr; -use std::thread; - -#[macro_use] -mod private; - -mod broadcast; -mod job; -mod join; -mod latch; -mod registry; -mod scope; -mod sleep; -mod spawn; -mod thread_pool; -mod unwind; - -mod compile_fail; -mod test; - -pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; -pub use self::join::{join, join_context}; -pub use self::registry::ThreadBuilder; -pub use self::scope::{in_place_scope, scope, Scope}; -pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; -pub use self::spawn::{spawn, spawn_fifo}; -pub use self::thread_pool::current_thread_has_pending_tasks; -pub use self::thread_pool::current_thread_index; -pub use self::thread_pool::ThreadPool; -pub use self::thread_pool::{yield_local, yield_now, Yield}; - -use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; - -/// Returns the maximum number of threads that Rayon supports in a single thread-pool. -/// -/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting -/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum. -/// -/// The value may vary between different targets, and is subject to change in new Rayon versions. -pub fn max_num_threads() -> usize { - // We are limited by the bits available in the sleep counter's `AtomicUsize`. - crate::sleep::THREADS_MAX -} - -/// Returns the number of threads in the current registry. If this -/// code is executing within a Rayon thread-pool, then this will be -/// the number of threads for the thread-pool of the current -/// thread. Otherwise, it will be the number of threads for the global -/// thread-pool. -/// -/// This can be useful when trying to judge how many times to split -/// parallel work (the parallel iterator traits use this value -/// internally for this purpose). -/// -/// # Future compatibility note -/// -/// Note that unless this thread-pool was created with a -/// builder that specifies the number of threads, then this -/// number may vary over time in future versions (see [the -/// `num_threads()` method for details][snt]). -/// -/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads -pub fn current_num_threads() -> usize { - crate::registry::Registry::current_num_threads() -} - -/// Error when initializing a thread pool. -#[derive(Debug)] -pub struct ThreadPoolBuildError { - kind: ErrorKind, -} - -#[derive(Debug)] -enum ErrorKind { - GlobalPoolAlreadyInitialized, - CurrentThreadAlreadyInPool, - IOError(io::Error), -} - -/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool. -/// ## Creating a ThreadPool -/// The following creates a thread pool with 22 threads. -/// -/// ```rust -/// # use rayon_core as rayon; -/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap(); -/// ``` -/// -/// To instead configure the global thread pool, use [`build_global()`]: -/// -/// ```rust -/// # use rayon_core as rayon; -/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); -/// ``` -/// -/// [`ThreadPool`]: struct.ThreadPool.html -/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global -pub struct ThreadPoolBuilder<S = DefaultSpawn> { - /// The number of threads in the rayon thread pool. - /// If zero will use the RAYON_NUM_THREADS environment variable. - /// If RAYON_NUM_THREADS is invalid or zero will use the default. - num_threads: usize, - - /// The thread we're building *from* will also be part of the pool. - use_current_thread: bool, - - /// Custom closure, if any, to handle a panic that we cannot propagate - /// anywhere else. - panic_handler: Option<Box<PanicHandler>>, - - /// Closure to compute the name of a thread. - get_thread_name: Option<Box<dyn FnMut(usize) -> String>>, - - /// The stack size for the created worker threads - stack_size: Option<usize>, - - /// Closure invoked on worker thread start. - start_handler: Option<Box<StartHandler>>, - - /// Closure invoked on worker thread exit. - exit_handler: Option<Box<ExitHandler>>, - - /// Closure invoked to spawn threads. - spawn_handler: S, - - /// If false, worker threads will execute spawned jobs in a - /// "depth-first" fashion. If true, they will do a "breadth-first" - /// fashion. Depth-first is the default. - breadth_first: bool, -} - -/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. -/// -/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html -#[deprecated(note = "Use `ThreadPoolBuilder`")] -#[derive(Default)] -pub struct Configuration { - builder: ThreadPoolBuilder, -} - -/// The type for a panic handling closure. Note that this same closure -/// may be invoked multiple times in parallel. -type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; - -/// The type for a closure that gets invoked when a thread starts. The -/// closure is passed the index of the thread on which it is invoked. -/// Note that this same closure may be invoked multiple times in parallel. -type StartHandler = dyn Fn(usize) + Send + Sync; - -/// The type for a closure that gets invoked when a thread exits. The -/// closure is passed the index of the thread on which is is invoked. -/// Note that this same closure may be invoked multiple times in parallel. -type ExitHandler = dyn Fn(usize) + Send + Sync; - -// NB: We can't `#[derive(Default)]` because `S` is left ambiguous. -impl Default for ThreadPoolBuilder { - fn default() -> Self { - ThreadPoolBuilder { - num_threads: 0, - use_current_thread: false, - panic_handler: None, - get_thread_name: None, - stack_size: None, - start_handler: None, - exit_handler: None, - spawn_handler: DefaultSpawn, - breadth_first: false, - } - } -} - -impl ThreadPoolBuilder { - /// Creates and returns a valid rayon thread pool builder, but does not initialize it. - pub fn new() -> Self { - Self::default() - } -} - -/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the -/// default spawn and those set by [`spawn_handler`](#method.spawn_handler). -impl<S> ThreadPoolBuilder<S> -where - S: ThreadSpawn, -{ - /// Creates a new `ThreadPool` initialized using this configuration. - pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> { - ThreadPool::build(self) - } - - /// Initializes the global thread pool. This initialization is - /// **optional**. If you do not call this function, the thread pool - /// will be automatically initialized with the default - /// configuration. Calling `build_global` is not recommended, except - /// in two scenarios: - /// - /// - You wish to change the default configuration. - /// - You are running a benchmark, in which case initializing may - /// yield slightly more consistent results, since the worker threads - /// will already be ready to go even in the first iteration. But - /// this cost is minimal. - /// - /// Initialization of the global thread pool happens exactly - /// once. Once started, the configuration cannot be - /// changed. Therefore, if you call `build_global` a second time, it - /// will return an error. An `Ok` result indicates that this - /// is the first initialization of the thread pool. - pub fn build_global(self) -> Result<(), ThreadPoolBuildError> { - let registry = registry::init_global_registry(self)?; - registry.wait_until_primed(); - Ok(()) - } -} - -impl ThreadPoolBuilder { - /// Creates a scoped `ThreadPool` initialized using this configuration. - /// - /// This is a convenience function for building a pool using [`std::thread::scope`] - /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). - /// The threads in this pool will start by calling `wrapper`, which should - /// do initialization and continue by calling `ThreadBuilder::run()`. - /// - /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html - /// - /// # Examples - /// - /// A scoped pool may be useful in combination with scoped thread-local variables. - /// - /// ``` - /// # use rayon_core as rayon; - /// - /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>); - /// - /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { - /// let pool_data = vec![1, 2, 3]; - /// - /// // We haven't assigned any TLS data yet. - /// assert!(!POOL_DATA.is_set()); - /// - /// rayon::ThreadPoolBuilder::new() - /// .build_scoped( - /// // Borrow `pool_data` in TLS for each thread. - /// |thread| POOL_DATA.set(&pool_data, || thread.run()), - /// // Do some work that needs the TLS data. - /// |pool| pool.install(|| assert!(POOL_DATA.is_set())), - /// )?; - /// - /// // Once we've returned, `pool_data` is no longer borrowed. - /// drop(pool_data); - /// Ok(()) - /// } - /// ``` - pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> - where - W: Fn(ThreadBuilder) + Sync, // expected to call `run()` - F: FnOnce(&ThreadPool) -> R, - { - std::thread::scope(|scope| { - let pool = self - .spawn_handler(|thread| { - let mut builder = std::thread::Builder::new(); - if let Some(name) = thread.name() { - builder = builder.name(name.to_string()); - } - if let Some(size) = thread.stack_size() { - builder = builder.stack_size(size); - } - builder.spawn_scoped(scope, || wrapper(thread))?; - Ok(()) - }) - .build()?; - Ok(with_pool(&pool)) - }) - } -} - -impl<S> ThreadPoolBuilder<S> { - /// Sets a custom function for spawning threads. - /// - /// Note that the threads will not exit until after the pool is dropped. It - /// is up to the caller to wait for thread termination if that is important - /// for any invariants. For instance, threads created in [`std::thread::scope`] - /// will be joined before that scope returns, and this will block indefinitely - /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate - /// until the entire process exits! - /// - /// # Examples - /// - /// A minimal spawn handler just needs to call `run()` from an independent thread. - /// - /// ``` - /// # use rayon_core as rayon; - /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { - /// let pool = rayon::ThreadPoolBuilder::new() - /// .spawn_handler(|thread| { - /// std::thread::spawn(|| thread.run()); - /// Ok(()) - /// }) - /// .build()?; - /// - /// pool.install(|| println!("Hello from my custom thread!")); - /// Ok(()) - /// } - /// ``` - /// - /// The default spawn handler sets the name and stack size if given, and propagates - /// any errors from the thread builder. - /// - /// ``` - /// # use rayon_core as rayon; - /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { - /// let pool = rayon::ThreadPoolBuilder::new() - /// .spawn_handler(|thread| { - /// let mut b = std::thread::Builder::new(); - /// if let Some(name) = thread.name() { - /// b = b.name(name.to_owned()); - /// } - /// if let Some(stack_size) = thread.stack_size() { - /// b = b.stack_size(stack_size); - /// } - /// b.spawn(|| thread.run())?; - /// Ok(()) - /// }) - /// .build()?; - /// - /// pool.install(|| println!("Hello from my fully custom thread!")); - /// Ok(()) - /// } - /// ``` - /// - /// This can also be used for a pool of scoped threads like [`crossbeam::scope`], - /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in - /// [`build_scoped`](#method.build_scoped). - /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html - /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html - /// - /// ``` - /// # use rayon_core as rayon; - /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { - /// std::thread::scope(|scope| { - /// let pool = rayon::ThreadPoolBuilder::new() - /// .spawn_handler(|thread| { - /// let mut builder = std::thread::Builder::new(); - /// if let Some(name) = thread.name() { - /// builder = builder.name(name.to_string()); - /// } - /// if let Some(size) = thread.stack_size() { - /// builder = builder.stack_size(size); - /// } - /// builder.spawn_scoped(scope, || { - /// // Add any scoped initialization here, then run! - /// thread.run() - /// })?; - /// Ok(()) - /// }) - /// .build()?; - /// - /// pool.install(|| println!("Hello from my custom scoped thread!")); - /// Ok(()) - /// }) - /// } - /// ``` - pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> - where - F: FnMut(ThreadBuilder) -> io::Result<()>, - { - ThreadPoolBuilder { - spawn_handler: CustomSpawn::new(spawn), - // ..self - num_threads: self.num_threads, - use_current_thread: self.use_current_thread, - panic_handler: self.panic_handler, - get_thread_name: self.get_thread_name, - stack_size: self.stack_size, - start_handler: self.start_handler, - exit_handler: self.exit_handler, - breadth_first: self.breadth_first, - } - } - - /// Returns a reference to the current spawn handler. - fn get_spawn_handler(&mut self) -> &mut S { - &mut self.spawn_handler - } - - /// Get the number of threads that will be used for the thread - /// pool. See `num_threads()` for more information. - fn get_num_threads(&self) -> usize { - if self.num_threads > 0 { - self.num_threads - } else { - let default = || { - thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - }; - - match env::var("RAYON_NUM_THREADS") - .ok() - .and_then(|s| usize::from_str(&s).ok()) - { - Some(x @ 1..) => return x, - Some(0) => return default(), - _ => {} - } - - // Support for deprecated `RAYON_RS_NUM_CPUS`. - match env::var("RAYON_RS_NUM_CPUS") - .ok() - .and_then(|s| usize::from_str(&s).ok()) - { - Some(x @ 1..) => x, - _ => default(), - } - } - } - - /// Get the thread name for the thread with the given index. - fn get_thread_name(&mut self, index: usize) -> Option<String> { - let f = self.get_thread_name.as_mut()?; - Some(f(index)) - } - - /// Sets a closure which takes a thread index and returns - /// the thread's name. - pub fn thread_name<F>(mut self, closure: F) -> Self - where - F: FnMut(usize) -> String + 'static, - { - self.get_thread_name = Some(Box::new(closure)); - self - } - - /// Sets the number of threads to be used in the rayon threadpool. - /// - /// If you specify a non-zero number of threads using this - /// function, then the resulting thread-pools are guaranteed to - /// start at most this number of threads. - /// - /// If `num_threads` is 0, or you do not call this function, then - /// the Rayon runtime will select the number of threads - /// automatically. At present, this is based on the - /// `RAYON_NUM_THREADS` environment variable (if set), - /// or the number of logical CPUs (otherwise). - /// In the future, however, the default behavior may - /// change to dynamically add or remove threads as needed. - /// - /// **Future compatibility warning:** Given the default behavior - /// may change in the future, if you wish to rely on a fixed - /// number of threads, you should use this function to specify - /// that number. To reproduce the current default behavior, you - /// may wish to use [`std::thread::available_parallelism`] - /// to query the number of CPUs dynamically. - /// - /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one - /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment - /// variable. If both variables are specified, `RAYON_NUM_THREADS` will - /// be preferred. - pub fn num_threads(mut self, num_threads: usize) -> Self { - self.num_threads = num_threads; - self - } - - /// Use the current thread as one of the threads in the pool. - /// - /// The current thread is guaranteed to be at index 0, and since the thread is not managed by - /// rayon, the spawn and exit handlers do not run for that thread. - /// - /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into - /// the thread-pool will generally not be picked up automatically by this thread unless you - /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. - /// - /// # Local thread-pools - /// - /// Using this in a local thread-pool means the registry will be leaked. In future versions - /// there might be a way of cleaning up the current-thread state. - pub fn use_current_thread(mut self) -> Self { - self.use_current_thread = true; - self - } - - /// Returns a copy of the current panic handler. - fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> { - self.panic_handler.take() - } - - /// Normally, whenever Rayon catches a panic, it tries to - /// propagate it to someplace sensible, to try and reflect the - /// semantics of sequential execution. But in some cases, - /// particularly with the `spawn()` APIs, there is no - /// obvious place where we should propagate the panic to. - /// In that case, this panic handler is invoked. - /// - /// If no panic handler is set, the default is to abort the - /// process, under the principle that panics should not go - /// unobserved. - /// - /// If the panic handler itself panics, this will abort the - /// process. To prevent this, wrap the body of your panic handler - /// in a call to `std::panic::catch_unwind()`. - pub fn panic_handler<H>(mut self, panic_handler: H) -> Self - where - H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, - { - self.panic_handler = Some(Box::new(panic_handler)); - self - } - - /// Get the stack size of the worker threads - fn get_stack_size(&self) -> Option<usize> { - self.stack_size - } - - /// Sets the stack size of the worker threads - pub fn stack_size(mut self, stack_size: usize) -> Self { - self.stack_size = Some(stack_size); - self - } - - /// **(DEPRECATED)** Suggest to worker threads that they execute - /// spawned jobs in a "breadth-first" fashion. - /// - /// Typically, when a worker thread is idle or blocked, it will - /// attempt to execute the job from the *top* of its local deque of - /// work (i.e., the job most recently spawned). If this flag is set - /// to true, however, workers will prefer to execute in a - /// *breadth-first* fashion -- that is, they will search for jobs at - /// the *bottom* of their local deque. (At present, workers *always* - /// steal from the bottom of other workers' deques, regardless of - /// the setting of this flag.) - /// - /// If you think of the tasks as a tree, where a parent task - /// spawns its children in the tree, then this flag loosely - /// corresponds to doing a breadth-first traversal of the tree, - /// whereas the default would be to do a depth-first traversal. - /// - /// **Note that this is an "execution hint".** Rayon's task - /// execution is highly dynamic and the precise order in which - /// independent tasks are executed is not intended to be - /// guaranteed. - /// - /// This `breadth_first()` method is now deprecated per [RFC #1], - /// and in the future its effect may be removed. Consider using - /// [`scope_fifo()`] for a similar effect. - /// - /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md - /// [`scope_fifo()`]: fn.scope_fifo.html - #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")] - pub fn breadth_first(mut self) -> Self { - self.breadth_first = true; - self - } - - fn get_breadth_first(&self) -> bool { - self.breadth_first - } - - /// Takes the current thread start callback, leaving `None`. - fn take_start_handler(&mut self) -> Option<Box<StartHandler>> { - self.start_handler.take() - } - - /// Sets a callback to be invoked on thread start. - /// - /// The closure is passed the index of the thread on which it is invoked. - /// Note that this same closure may be invoked multiple times in parallel. - /// If this closure panics, the panic will be passed to the panic handler. - /// If that handler returns, then startup will continue normally. - pub fn start_handler<H>(mut self, start_handler: H) -> Self - where - H: Fn(usize) + Send + Sync + 'static, - { - self.start_handler = Some(Box::new(start_handler)); - self - } - - /// Returns a current thread exit callback, leaving `None`. - fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> { - self.exit_handler.take() - } - - /// Sets a callback to be invoked on thread exit. - /// - /// The closure is passed the index of the thread on which it is invoked. - /// Note that this same closure may be invoked multiple times in parallel. - /// If this closure panics, the panic will be passed to the panic handler. - /// If that handler returns, then the thread will exit normally. - pub fn exit_handler<H>(mut self, exit_handler: H) -> Self - where - H: Fn(usize) + Send + Sync + 'static, - { - self.exit_handler = Some(Box::new(exit_handler)); - self - } -} - -#[allow(deprecated)] -impl Configuration { - /// Creates and return a valid rayon thread pool configuration, but does not initialize it. - pub fn new() -> Configuration { - Configuration { - builder: ThreadPoolBuilder::new(), - } - } - - /// Deprecated in favor of `ThreadPoolBuilder::build`. - pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> { - self.builder.build().map_err(Box::from) - } - - /// Deprecated in favor of `ThreadPoolBuilder::thread_name`. - pub fn thread_name<F>(mut self, closure: F) -> Self - where - F: FnMut(usize) -> String + 'static, - { - self.builder = self.builder.thread_name(closure); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::num_threads`. - pub fn num_threads(mut self, num_threads: usize) -> Configuration { - self.builder = self.builder.num_threads(num_threads); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`. - pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration - where - H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, - { - self.builder = self.builder.panic_handler(panic_handler); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::stack_size`. - pub fn stack_size(mut self, stack_size: usize) -> Self { - self.builder = self.builder.stack_size(stack_size); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`. - pub fn breadth_first(mut self) -> Self { - self.builder = self.builder.breadth_first(); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::start_handler`. - pub fn start_handler<H>(mut self, start_handler: H) -> Configuration - where - H: Fn(usize) + Send + Sync + 'static, - { - self.builder = self.builder.start_handler(start_handler); - self - } - - /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`. - pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration - where - H: Fn(usize) + Send + Sync + 'static, - { - self.builder = self.builder.exit_handler(exit_handler); - self - } - - /// Returns a ThreadPoolBuilder with identical parameters. - fn into_builder(self) -> ThreadPoolBuilder { - self.builder - } -} - -impl ThreadPoolBuildError { - fn new(kind: ErrorKind) -> ThreadPoolBuildError { - ThreadPoolBuildError { kind } - } - - fn is_unsupported(&self) -> bool { - matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported) - } -} - -const GLOBAL_POOL_ALREADY_INITIALIZED: &str = - "The global thread pool has already been initialized."; - -const CURRENT_THREAD_ALREADY_IN_POOL: &str = - "The current thread is already part of another thread pool."; - -impl Error for ThreadPoolBuildError { - #[allow(deprecated)] - fn description(&self) -> &str { - match self.kind { - ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, - ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL, - ErrorKind::IOError(ref e) => e.description(), - } - } - - fn source(&self) -> Option<&(dyn Error + 'static)> { - match &self.kind { - ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None, - ErrorKind::IOError(e) => Some(e), - } - } -} - -impl fmt::Display for ThreadPoolBuildError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.kind { - ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f), - ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), - ErrorKind::IOError(e) => e.fmt(f), - } - } -} - -/// Deprecated in favor of `ThreadPoolBuilder::build_global`. -#[deprecated(note = "use `ThreadPoolBuilder::build_global`")] -#[allow(deprecated)] -pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> { - config.into_builder().build_global().map_err(Box::from) -} - -impl<S> fmt::Debug for ThreadPoolBuilder<S> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ThreadPoolBuilder { - ref num_threads, - ref use_current_thread, - ref get_thread_name, - ref panic_handler, - ref stack_size, - ref start_handler, - ref exit_handler, - spawn_handler: _, - ref breadth_first, - } = *self; - - // Just print `Some(<closure>)` or `None` to the debug - // output. - struct ClosurePlaceholder; - impl fmt::Debug for ClosurePlaceholder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("<closure>") - } - } - let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); - let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); - let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); - let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); - - f.debug_struct("ThreadPoolBuilder") - .field("num_threads", num_threads) - .field("use_current_thread", use_current_thread) - .field("get_thread_name", &get_thread_name) - .field("panic_handler", &panic_handler) - .field("stack_size", &stack_size) - .field("start_handler", &start_handler) - .field("exit_handler", &exit_handler) - .field("breadth_first", &breadth_first) - .finish() - } -} - -#[allow(deprecated)] -impl fmt::Debug for Configuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.builder.fmt(f) - } -} - -/// Provides the calling context to a closure called by `join_context`. -#[derive(Debug)] -pub struct FnContext { - migrated: bool, - - /// disable `Send` and `Sync`, just for a little future-proofing. - _marker: PhantomData<*mut ()>, -} - -impl FnContext { - #[inline] - fn new(migrated: bool) -> Self { - FnContext { - migrated, - _marker: PhantomData, - } - } -} - -impl FnContext { - /// Returns `true` if the closure was called from a different thread - /// than it was provided from. - #[inline] - pub fn migrated(&self) -> bool { - self.migrated - } -} diff --git a/vendor/rayon-core/src/private.rs b/vendor/rayon-core/src/private.rs deleted file mode 100644 index c85e77b..0000000 --- a/vendor/rayon-core/src/private.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! The public parts of this private module are used to create traits -//! that cannot be implemented outside of our own crate. This way we -//! can feel free to extend those traits without worrying about it -//! being a breaking change for other implementations. - -/// If this type is pub but not publicly reachable, third parties -/// can't name it and can't implement traits using it. -#[allow(missing_debug_implementations)] -pub struct PrivateMarker; - -macro_rules! private_decl { - () => { - /// This trait is private; this method exists to make it - /// impossible to implement outside the crate. - #[doc(hidden)] - fn __rayon_private__(&self) -> crate::private::PrivateMarker; - }; -} - -macro_rules! private_impl { - () => { - fn __rayon_private__(&self) -> crate::private::PrivateMarker { - crate::private::PrivateMarker - } - }; -} diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs deleted file mode 100644 index e4f2ac7..0000000 --- a/vendor/rayon-core/src/registry.rs +++ /dev/null @@ -1,995 +0,0 @@ -use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; -use crate::sleep::Sleep; -use crate::unwind; -use crate::{ - ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, - Yield, -}; -use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use std::cell::Cell; -use std::collections::hash_map::DefaultHasher; -use std::fmt; -use std::hash::Hasher; -use std::io; -use std::mem; -use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Once}; -use std::thread; -use std::usize; - -/// Thread builder used for customization via -/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). -pub struct ThreadBuilder { - name: Option<String>, - stack_size: Option<usize>, - worker: Worker<JobRef>, - stealer: Stealer<JobRef>, - registry: Arc<Registry>, - index: usize, -} - -impl ThreadBuilder { - /// Gets the index of this thread in the pool, within `0..num_threads`. - pub fn index(&self) -> usize { - self.index - } - - /// Gets the string that was specified by `ThreadPoolBuilder::name()`. - pub fn name(&self) -> Option<&str> { - self.name.as_deref() - } - - /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. - pub fn stack_size(&self) -> Option<usize> { - self.stack_size - } - - /// Executes the main loop for this thread. This will not return until the - /// thread pool is dropped. - pub fn run(self) { - unsafe { main_loop(self) } - } -} - -impl fmt::Debug for ThreadBuilder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ThreadBuilder") - .field("pool", &self.registry.id()) - .field("index", &self.index) - .field("name", &self.name) - .field("stack_size", &self.stack_size) - .finish() - } -} - -/// Generalized trait for spawning a thread in the `Registry`. -/// -/// This trait is pub-in-private -- E0445 forces us to make it public, -/// but we don't actually want to expose these details in the API. -pub trait ThreadSpawn { - private_decl! {} - - /// Spawn a thread with the `ThreadBuilder` parameters, and then - /// call `ThreadBuilder::run()`. - fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; -} - -/// Spawns a thread in the "normal" way with `std::thread::Builder`. -/// -/// This type is pub-in-private -- E0445 forces us to make it public, -/// but we don't actually want to expose these details in the API. -#[derive(Debug, Default)] -pub struct DefaultSpawn; - -impl ThreadSpawn for DefaultSpawn { - private_impl! {} - - fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { - let mut b = thread::Builder::new(); - if let Some(name) = thread.name() { - b = b.name(name.to_owned()); - } - if let Some(stack_size) = thread.stack_size() { - b = b.stack_size(stack_size); - } - b.spawn(|| thread.run())?; - Ok(()) - } -} - -/// Spawns a thread with a user's custom callback. -/// -/// This type is pub-in-private -- E0445 forces us to make it public, -/// but we don't actually want to expose these details in the API. -#[derive(Debug)] -pub struct CustomSpawn<F>(F); - -impl<F> CustomSpawn<F> -where - F: FnMut(ThreadBuilder) -> io::Result<()>, -{ - pub(super) fn new(spawn: F) -> Self { - CustomSpawn(spawn) - } -} - -impl<F> ThreadSpawn for CustomSpawn<F> -where - F: FnMut(ThreadBuilder) -> io::Result<()>, -{ - private_impl! {} - - #[inline] - fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { - (self.0)(thread) - } -} - -pub(super) struct Registry { - thread_infos: Vec<ThreadInfo>, - sleep: Sleep, - injected_jobs: Injector<JobRef>, - broadcasts: Mutex<Vec<Worker<JobRef>>>, - panic_handler: Option<Box<PanicHandler>>, - start_handler: Option<Box<StartHandler>>, - exit_handler: Option<Box<ExitHandler>>, - - // When this latch reaches 0, it means that all work on this - // registry must be complete. This is ensured in the following ways: - // - // - if this is the global registry, there is a ref-count that never - // gets released. - // - if this is a user-created thread-pool, then so long as the thread-pool - // exists, it holds a reference. - // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, - // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't - // return until the blocking job is complete, that ref will continue to be held. - // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. - // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) - // and that job will keep the pool alive. - terminate_count: AtomicUsize, -} - -/// //////////////////////////////////////////////////////////////////////// -/// Initialization - -static mut THE_REGISTRY: Option<Arc<Registry>> = None; -static THE_REGISTRY_SET: Once = Once::new(); - -/// Starts the worker threads (if that has not already happened). If -/// initialization has not already occurred, use the default -/// configuration. -pub(super) fn global_registry() -> &'static Arc<Registry> { - set_global_registry(default_global_registry) - .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) - .expect("The global thread pool has not been initialized.") -} - -/// Starts the worker threads (if that has not already happened) with -/// the given builder. -pub(super) fn init_global_registry<S>( - builder: ThreadPoolBuilder<S>, -) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> -where - S: ThreadSpawn, -{ - set_global_registry(|| Registry::new(builder)) -} - -/// Starts the worker threads (if that has not already happened) -/// by creating a registry with the given callback. -fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> -where - F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, -{ - let mut result = Err(ThreadPoolBuildError::new( - ErrorKind::GlobalPoolAlreadyInitialized, - )); - - THE_REGISTRY_SET.call_once(|| { - result = registry() - .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) - }); - - result -} - -fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { - let result = Registry::new(ThreadPoolBuilder::new()); - - // If we're running in an environment that doesn't support threads at all, we can fall back to - // using the current thread alone. This is crude, and probably won't work for non-blocking - // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. - // - // Notably, this allows current WebAssembly targets to work even though their threading support - // is stubbed out, and we won't have to change anything if they do add real threading. - let unsupported = matches!(&result, Err(e) if e.is_unsupported()); - if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); - let fallback_result = Registry::new(builder); - if fallback_result.is_ok() { - return fallback_result; - } - } - - result -} - -struct Terminator<'a>(&'a Arc<Registry>); - -impl<'a> Drop for Terminator<'a> { - fn drop(&mut self) { - self.0.terminate() - } -} - -impl Registry { - pub(super) fn new<S>( - mut builder: ThreadPoolBuilder<S>, - ) -> Result<Arc<Self>, ThreadPoolBuildError> - where - S: ThreadSpawn, - { - // Soft-limit the number of threads that we can actually support. - let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads()); - - let breadth_first = builder.get_breadth_first(); - - let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) - .map(|_| { - let worker = if breadth_first { - Worker::new_fifo() - } else { - Worker::new_lifo() - }; - - let stealer = worker.stealer(); - (worker, stealer) - }) - .unzip(); - - let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads) - .map(|_| { - let worker = Worker::new_fifo(); - let stealer = worker.stealer(); - (worker, stealer) - }) - .unzip(); - - let registry = Arc::new(Registry { - thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(n_threads), - injected_jobs: Injector::new(), - broadcasts: Mutex::new(broadcasts), - terminate_count: AtomicUsize::new(1), - panic_handler: builder.take_panic_handler(), - start_handler: builder.take_start_handler(), - exit_handler: builder.take_exit_handler(), - }); - - // If we return early or panic, make sure to terminate existing threads. - let t1000 = Terminator(®istry); - - for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() { - let thread = ThreadBuilder { - name: builder.get_thread_name(index), - stack_size: builder.get_stack_size(), - registry: Arc::clone(®istry), - worker, - stealer, - index, - }; - - if index == 0 && builder.use_current_thread { - if !WorkerThread::current().is_null() { - return Err(ThreadPoolBuildError::new( - ErrorKind::CurrentThreadAlreadyInPool, - )); - } - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); - - unsafe { - WorkerThread::set_current(worker_thread); - Latch::set(®istry.thread_infos[index].primed); - } - continue; - } - - if let Err(e) = builder.get_spawn_handler().spawn(thread) { - return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); - } - } - - // Returning normally now, without termination. - mem::forget(t1000); - - Ok(registry) - } - - pub(super) fn current() -> Arc<Registry> { - unsafe { - let worker_thread = WorkerThread::current(); - let registry = if worker_thread.is_null() { - global_registry() - } else { - &(*worker_thread).registry - }; - Arc::clone(registry) - } - } - - /// Returns the number of threads in the current registry. This - /// is better than `Registry::current().num_threads()` because it - /// avoids incrementing the `Arc`. - pub(super) fn current_num_threads() -> usize { - unsafe { - let worker_thread = WorkerThread::current(); - if worker_thread.is_null() { - global_registry().num_threads() - } else { - (*worker_thread).registry.num_threads() - } - } - } - - /// Returns the current `WorkerThread` if it's part of this `Registry`. - pub(super) fn current_thread(&self) -> Option<&WorkerThread> { - unsafe { - let worker = WorkerThread::current().as_ref()?; - if worker.registry().id() == self.id() { - Some(worker) - } else { - None - } - } - } - - /// Returns an opaque identifier for this registry. - pub(super) fn id(&self) -> RegistryId { - // We can rely on `self` not to change since we only ever create - // registries that are boxed up in an `Arc` (see `new()` above). - RegistryId { - addr: self as *const Self as usize, - } - } - - pub(super) fn num_threads(&self) -> usize { - self.thread_infos.len() - } - - pub(super) fn catch_unwind(&self, f: impl FnOnce()) { - if let Err(err) = unwind::halt_unwinding(f) { - // If there is no handler, or if that handler itself panics, then we abort. - let abort_guard = unwind::AbortIfPanic; - if let Some(ref handler) = self.panic_handler { - handler(err); - mem::forget(abort_guard); - } - } - } - - /// Waits for the worker threads to get up and running. This is - /// meant to be used for benchmarking purposes, primarily, so that - /// you can get more consistent numbers by having everything - /// "ready to go". - pub(super) fn wait_until_primed(&self) { - for info in &self.thread_infos { - info.primed.wait(); - } - } - - /// Waits for the worker threads to stop. This is used for testing - /// -- so we can check that termination actually works. - #[cfg(test)] - pub(super) fn wait_until_stopped(&self) { - for info in &self.thread_infos { - info.stopped.wait(); - } - } - - /// //////////////////////////////////////////////////////////////////////// - /// MAIN LOOP - /// - /// So long as all of the worker threads are hanging out in their - /// top-level loop, there is no work to be done. - - /// Push a job into the given `registry`. If we are running on a - /// worker thread for the registry, this will push onto the - /// deque. Else, it will inject from the outside (which is slower). - pub(super) fn inject_or_push(&self, job_ref: JobRef) { - let worker_thread = WorkerThread::current(); - unsafe { - if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { - (*worker_thread).push(job_ref); - } else { - self.inject(job_ref); - } - } - } - - /// Push a job into the "external jobs" queue; it will be taken by - /// whatever worker has nothing to do. Use this if you know that - /// you are not on a worker of this registry. - pub(super) fn inject(&self, injected_job: JobRef) { - // It should not be possible for `state.terminate` to be true - // here. It is only set to true when the user creates (and - // drops) a `ThreadPool`; and, in that case, they cannot be - // calling `inject()` later, since they dropped their - // `ThreadPool`. - debug_assert_ne!( - self.terminate_count.load(Ordering::Acquire), - 0, - "inject() sees state.terminate as true" - ); - - let queue_was_empty = self.injected_jobs.is_empty(); - - self.injected_jobs.push(injected_job); - self.sleep.new_injected_jobs(1, queue_was_empty); - } - - fn has_injected_job(&self) -> bool { - !self.injected_jobs.is_empty() - } - - fn pop_injected_job(&self) -> Option<JobRef> { - loop { - match self.injected_jobs.steal() { - Steal::Success(job) => return Some(job), - Steal::Empty => return None, - Steal::Retry => {} - } - } - } - - /// Push a job into each thread's own "external jobs" queue; it will be - /// executed only on that thread, when it has nothing else to do locally, - /// before it tries to steal other work. - /// - /// **Panics** if not given exactly as many jobs as there are threads. - pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) { - assert_eq!(self.num_threads(), injected_jobs.len()); - { - let broadcasts = self.broadcasts.lock().unwrap(); - - // It should not be possible for `state.terminate` to be true - // here. It is only set to true when the user creates (and - // drops) a `ThreadPool`; and, in that case, they cannot be - // calling `inject_broadcast()` later, since they dropped their - // `ThreadPool`. - debug_assert_ne!( - self.terminate_count.load(Ordering::Acquire), - 0, - "inject_broadcast() sees state.terminate as true" - ); - - assert_eq!(broadcasts.len(), injected_jobs.len()); - for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) { - worker.push(job_ref); - } - } - for i in 0..self.num_threads() { - self.sleep.notify_worker_latch_is_set(i); - } - } - - /// If already in a worker-thread of this registry, just execute `op`. - /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` - /// completes and return its return value. If `op` panics, that panic will - /// be propagated as well. The second argument indicates `true` if injection - /// was performed, `false` if executed directly. - pub(super) fn in_worker<OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&WorkerThread, bool) -> R + Send, - R: Send, - { - unsafe { - let worker_thread = WorkerThread::current(); - if worker_thread.is_null() { - self.in_worker_cold(op) - } else if (*worker_thread).registry().id() != self.id() { - self.in_worker_cross(&*worker_thread, op) - } else { - // Perfectly valid to give them a `&T`: this is the - // current thread, so we know the data structure won't be - // invalidated until we return. - op(&*worker_thread, false) - } - } - } - - #[cold] - unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&WorkerThread, bool) -> R + Send, - R: Send, - { - thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); - - LOCK_LATCH.with(|l| { - // This thread isn't a member of *any* thread pool, so just block. - debug_assert!(WorkerThread::current().is_null()); - let job = StackJob::new( - |injected| { - let worker_thread = WorkerThread::current(); - assert!(injected && !worker_thread.is_null()); - op(&*worker_thread, true) - }, - LatchRef::new(l), - ); - self.inject(job.as_job_ref()); - job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. - - job.into_result() - }) - } - - #[cold] - unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R - where - OP: FnOnce(&WorkerThread, bool) -> R + Send, - R: Send, - { - // This thread is a member of a different pool, so let it process - // other work while waiting for this `op` to complete. - debug_assert!(current_thread.registry().id() != self.id()); - let latch = SpinLatch::cross(current_thread); - let job = StackJob::new( - |injected| { - let worker_thread = WorkerThread::current(); - assert!(injected && !worker_thread.is_null()); - op(&*worker_thread, true) - }, - latch, - ); - self.inject(job.as_job_ref()); - current_thread.wait_until(&job.latch); - job.into_result() - } - - /// Increments the terminate counter. This increment should be - /// balanced by a call to `terminate`, which will decrement. This - /// is used when spawning asynchronous work, which needs to - /// prevent the registry from terminating so long as it is active. - /// - /// Note that blocking functions such as `join` and `scope` do not - /// need to concern themselves with this fn; their context is - /// responsible for ensuring the current thread-pool will not - /// terminate until they return. - /// - /// The global thread-pool always has an outstanding reference - /// (the initial one). Custom thread-pools have one outstanding - /// reference that is dropped when the `ThreadPool` is dropped: - /// since installing the thread-pool blocks until any joins/scopes - /// complete, this ensures that joins/scopes are covered. - /// - /// The exception is `::spawn()`, which can create a job outside - /// of any blocking scope. In that case, the job itself holds a - /// terminate count and is responsible for invoking `terminate()` - /// when finished. - pub(super) fn increment_terminate_count(&self) { - let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); - debug_assert!(previous != 0, "registry ref count incremented from zero"); - assert!( - previous != std::usize::MAX, - "overflow in registry ref count" - ); - } - - /// Signals that the thread-pool which owns this registry has been - /// dropped. The worker threads will gradually terminate, once any - /// extant work is completed. - pub(super) fn terminate(&self) { - if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { - for (i, thread_info) in self.thread_infos.iter().enumerate() { - unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; - } - } - } - - /// Notify the worker that the latch they are sleeping on has been "set". - pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { - self.sleep.notify_worker_latch_is_set(target_worker_index); - } -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub(super) struct RegistryId { - addr: usize, -} - -struct ThreadInfo { - /// Latch set once thread has started and we are entering into the - /// main loop. Used to wait for worker threads to become primed, - /// primarily of interest for benchmarking. - primed: LockLatch, - - /// Latch is set once worker thread has completed. Used to wait - /// until workers have stopped; only used for tests. - stopped: LockLatch, - - /// The latch used to signal that terminated has been requested. - /// This latch is *set* by the `terminate` method on the - /// `Registry`, once the registry's main "terminate" counter - /// reaches zero. - terminate: OnceLatch, - - /// the "stealer" half of the worker's deque - stealer: Stealer<JobRef>, -} - -impl ThreadInfo { - fn new(stealer: Stealer<JobRef>) -> ThreadInfo { - ThreadInfo { - primed: LockLatch::new(), - stopped: LockLatch::new(), - terminate: OnceLatch::new(), - stealer, - } - } -} - -/// //////////////////////////////////////////////////////////////////////// -/// WorkerThread identifiers - -pub(super) struct WorkerThread { - /// the "worker" half of our local deque - worker: Worker<JobRef>, - - /// the "stealer" half of the worker's broadcast deque - stealer: Stealer<JobRef>, - - /// local queue used for `spawn_fifo` indirection - fifo: JobFifo, - - index: usize, - - /// A weak random number generator. - rng: XorShift64Star, - - registry: Arc<Registry>, -} - -// This is a bit sketchy, but basically: the WorkerThread is -// allocated on the stack of the worker on entry and stored into this -// thread local variable. So it will remain valid at least until the -// worker is fully unwound. Using an unsafe pointer avoids the need -// for a RefCell<T> etc. -thread_local! { - static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; -} - -impl From<ThreadBuilder> for WorkerThread { - fn from(thread: ThreadBuilder) -> Self { - Self { - worker: thread.worker, - stealer: thread.stealer, - fifo: JobFifo::new(), - index: thread.index, - rng: XorShift64Star::new(), - registry: thread.registry, - } - } -} - -impl Drop for WorkerThread { - fn drop(&mut self) { - // Undo `set_current` - WORKER_THREAD_STATE.with(|t| { - assert!(t.get().eq(&(self as *const _))); - t.set(ptr::null()); - }); - } -} - -impl WorkerThread { - /// Gets the `WorkerThread` index for the current thread; returns - /// NULL if this is not a worker thread. This pointer is valid - /// anywhere on the current thread. - #[inline] - pub(super) fn current() -> *const WorkerThread { - WORKER_THREAD_STATE.with(Cell::get) - } - - /// Sets `self` as the worker thread index for the current thread. - /// This is done during worker thread startup. - unsafe fn set_current(thread: *const WorkerThread) { - WORKER_THREAD_STATE.with(|t| { - assert!(t.get().is_null()); - t.set(thread); - }); - } - - /// Returns the registry that owns this worker thread. - #[inline] - pub(super) fn registry(&self) -> &Arc<Registry> { - &self.registry - } - - /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). - #[inline] - pub(super) fn index(&self) -> usize { - self.index - } - - #[inline] - pub(super) unsafe fn push(&self, job: JobRef) { - let queue_was_empty = self.worker.is_empty(); - self.worker.push(job); - self.registry.sleep.new_internal_jobs(1, queue_was_empty); - } - - #[inline] - pub(super) unsafe fn push_fifo(&self, job: JobRef) { - self.push(self.fifo.push(job)); - } - - #[inline] - pub(super) fn local_deque_is_empty(&self) -> bool { - self.worker.is_empty() - } - - /// Attempts to obtain a "local" job -- typically this means - /// popping from the top of the stack, though if we are configured - /// for breadth-first execution, it would mean dequeuing from the - /// bottom. - #[inline] - pub(super) fn take_local_job(&self) -> Option<JobRef> { - let popped_job = self.worker.pop(); - - if popped_job.is_some() { - return popped_job; - } - - loop { - match self.stealer.steal() { - Steal::Success(job) => return Some(job), - Steal::Empty => return None, - Steal::Retry => {} - } - } - } - - fn has_injected_job(&self) -> bool { - !self.stealer.is_empty() || self.registry.has_injected_job() - } - - /// Wait until the latch is set. Try to keep busy by popping and - /// stealing tasks as necessary. - #[inline] - pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) { - let latch = latch.as_core_latch(); - if !latch.probe() { - self.wait_until_cold(latch); - } - } - - #[cold] - unsafe fn wait_until_cold(&self, latch: &CoreLatch) { - // the code below should swallow all panics and hence never - // unwind; but if something does wrong, we want to abort, - // because otherwise other code in rayon may assume that the - // latch has been signaled, and that can lead to random memory - // accesses, which would be *very bad* - let abort_guard = unwind::AbortIfPanic; - - 'outer: while !latch.probe() { - // Check for local work *before* we start marking ourself idle, - // especially to avoid modifying shared sleep state. - if let Some(job) = self.take_local_job() { - self.execute(job); - continue; - } - - let mut idle_state = self.registry.sleep.start_looking(self.index); - while !latch.probe() { - if let Some(job) = self.find_work() { - self.registry.sleep.work_found(); - self.execute(job); - // The job might have injected local work, so go back to the outer loop. - continue 'outer; - } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, || self.has_injected_job()) - } - } - - // If we were sleepy, we are not anymore. We "found work" -- - // whatever the surrounding thread was doing before it had to wait. - self.registry.sleep.work_found(); - break; - } - - mem::forget(abort_guard); // successful execution, do not abort - } - - unsafe fn wait_until_out_of_work(&self) { - debug_assert_eq!(self as *const _, WorkerThread::current()); - let registry = &*self.registry; - let index = self.index; - - self.wait_until(®istry.thread_infos[index].terminate); - - // Should not be any work left in our queue. - debug_assert!(self.take_local_job().is_none()); - - // Let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); - } - - fn find_work(&self) -> Option<JobRef> { - // Try to find some work to do. We give preference first - // to things in our local deque, then in other workers - // deques, and finally to injected jobs from the - // outside. The idea is to finish what we started before - // we take on something new. - self.take_local_job() - .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job()) - } - - pub(super) fn yield_now(&self) -> Yield { - match self.find_work() { - Some(job) => unsafe { - self.execute(job); - Yield::Executed - }, - None => Yield::Idle, - } - } - - pub(super) fn yield_local(&self) -> Yield { - match self.take_local_job() { - Some(job) => unsafe { - self.execute(job); - Yield::Executed - }, - None => Yield::Idle, - } - } - - #[inline] - pub(super) unsafe fn execute(&self, job: JobRef) { - job.execute(); - } - - /// Try to steal a single job and return it. - /// - /// This should only be done as a last resort, when there is no - /// local work to do. - fn steal(&self) -> Option<JobRef> { - // we only steal when we don't have any work to do locally - debug_assert!(self.local_deque_is_empty()); - - // otherwise, try to steal - let thread_infos = &self.registry.thread_infos.as_slice(); - let num_threads = thread_infos.len(); - if num_threads <= 1 { - return None; - } - - loop { - let mut retry = false; - let start = self.rng.next_usize(num_threads); - let job = (start..num_threads) - .chain(0..start) - .filter(move |&i| i != self.index) - .find_map(|victim_index| { - let victim = &thread_infos[victim_index]; - match victim.stealer.steal() { - Steal::Success(job) => Some(job), - Steal::Empty => None, - Steal::Retry => { - retry = true; - None - } - } - }); - if job.is_some() || !retry { - return job; - } - } - } -} - -/// //////////////////////////////////////////////////////////////////////// - -unsafe fn main_loop(thread: ThreadBuilder) { - let worker_thread = &WorkerThread::from(thread); - WorkerThread::set_current(worker_thread); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - - // Worker threads should not panic. If they do, just abort, as the - // internal state of the threadpool is corrupted. Note that if - // **user code** panics, we should catch that and redirect. - let abort_guard = unwind::AbortIfPanic; - - // Inform a user callback that we started a thread. - if let Some(ref handler) = registry.start_handler { - registry.catch_unwind(|| handler(index)); - } - - worker_thread.wait_until_out_of_work(); - - // Normal termination, do not abort. - mem::forget(abort_guard); - - // Inform a user callback that we exited a thread. - if let Some(ref handler) = registry.exit_handler { - registry.catch_unwind(|| handler(index)); - // We're already exiting the thread, there's nothing else to do. - } -} - -/// If already in a worker-thread, just execute `op`. Otherwise, -/// execute `op` in the default thread-pool. Either way, block until -/// `op` completes and return its return value. If `op` panics, that -/// panic will be propagated as well. The second argument indicates -/// `true` if injection was performed, `false` if executed directly. -pub(super) fn in_worker<OP, R>(op: OP) -> R -where - OP: FnOnce(&WorkerThread, bool) -> R + Send, - R: Send, -{ - unsafe { - let owner_thread = WorkerThread::current(); - if !owner_thread.is_null() { - // Perfectly valid to give them a `&T`: this is the - // current thread, so we know the data structure won't be - // invalidated until we return. - op(&*owner_thread, false) - } else { - global_registry().in_worker(op) - } - } -} - -/// [xorshift*] is a fast pseudorandom number generator which will -/// even tolerate weak seeding, as long as it's not zero. -/// -/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* -struct XorShift64Star { - state: Cell<u64>, -} - -impl XorShift64Star { - fn new() -> Self { - // Any non-zero seed will do -- this uses the hash of a global counter. - let mut seed = 0; - while seed == 0 { - let mut hasher = DefaultHasher::new(); - static COUNTER: AtomicUsize = AtomicUsize::new(0); - hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); - seed = hasher.finish(); - } - - XorShift64Star { - state: Cell::new(seed), - } - } - - fn next(&self) -> u64 { - let mut x = self.state.get(); - debug_assert_ne!(x, 0); - x ^= x >> 12; - x ^= x << 25; - x ^= x >> 27; - self.state.set(x); - x.wrapping_mul(0x2545_f491_4f6c_dd1d) - } - - /// Return a value from `0..n`. - fn next_usize(&self, n: usize) -> usize { - (self.next() % n as u64) as usize - } -} diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs deleted file mode 100644 index b7163d1..0000000 --- a/vendor/rayon-core/src/scope/mod.rs +++ /dev/null @@ -1,769 +0,0 @@ -//! Methods for custom fork-join scopes, created by the [`scope()`] -//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. -//! -//! [`scope()`]: fn.scope.html -//! [`in_place_scope()`]: fn.in_place_scope.html -//! [`join()`]: ../join/join.fn.html - -use crate::broadcast::BroadcastContext; -use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; -use crate::latch::{CountLatch, Latch}; -use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; -use crate::unwind; -use std::any::Any; -use std::fmt; -use std::marker::PhantomData; -use std::mem::ManuallyDrop; -use std::ptr; -use std::sync::atomic::{AtomicPtr, Ordering}; -use std::sync::Arc; - -#[cfg(test)] -mod test; - -/// Represents a fork-join scope which can be used to spawn any number of tasks. -/// See [`scope()`] for more information. -/// -///[`scope()`]: fn.scope.html -pub struct Scope<'scope> { - base: ScopeBase<'scope>, -} - -/// Represents a fork-join scope which can be used to spawn any number of tasks. -/// Those spawned from the same thread are prioritized in relative FIFO order. -/// See [`scope_fifo()`] for more information. -/// -///[`scope_fifo()`]: fn.scope_fifo.html -pub struct ScopeFifo<'scope> { - base: ScopeBase<'scope>, - fifos: Vec<JobFifo>, -} - -struct ScopeBase<'scope> { - /// thread registry where `scope()` was executed or where `in_place_scope()` - /// should spawn jobs. - registry: Arc<Registry>, - - /// if some job panicked, the error is stored here; it will be - /// propagated to the one who created the scope - panic: AtomicPtr<Box<dyn Any + Send + 'static>>, - - /// latch to track job counts - job_completed_latch: CountLatch, - - /// You can think of a scope as containing a list of closures to execute, - /// all of which outlive `'scope`. They're not actually required to be - /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because - /// the closures are only *moved* across threads to be executed. - marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>, -} - -/// Creates a "fork-join" scope `s` and invokes the closure with a -/// reference to `s`. This closure can then spawn asynchronous tasks -/// into `s`. Those tasks may run asynchronously with respect to the -/// closure; they may themselves spawn additional tasks into `s`. When -/// the closure returns, it will block until all tasks that have been -/// spawned into `s` complete. -/// -/// `scope()` is a more flexible building block compared to `join()`, -/// since a loop can be used to spawn any number of tasks without -/// recursing. However, that flexibility comes at a performance price: -/// tasks spawned using `scope()` must be allocated onto the heap, -/// whereas `join()` can make exclusive use of the stack. **Prefer -/// `join()` (or, even better, parallel iterators) where possible.** -/// -/// # Example -/// -/// The Rayon `join()` function launches two closures and waits for them -/// to stop. One could implement `join()` using a scope like so, although -/// it would be less efficient than the real implementation: -/// -/// ```rust -/// # use rayon_core as rayon; -/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB) -/// where A: FnOnce() -> RA + Send, -/// B: FnOnce() -> RB + Send, -/// RA: Send, -/// RB: Send, -/// { -/// let mut result_a: Option<RA> = None; -/// let mut result_b: Option<RB> = None; -/// rayon::scope(|s| { -/// s.spawn(|_| result_a = Some(oper_a())); -/// s.spawn(|_| result_b = Some(oper_b())); -/// }); -/// (result_a.unwrap(), result_b.unwrap()) -/// } -/// ``` -/// -/// # A note on threading -/// -/// The closure given to `scope()` executes in the Rayon thread-pool, -/// as do those given to `spawn()`. This means that you can't access -/// thread-local variables (well, you can, but they may have -/// unexpected values). -/// -/// # Task execution -/// -/// Task execution potentially starts as soon as `spawn()` is called. -/// The task will end sometime before `scope()` returns. Note that the -/// *closure* given to scope may return much earlier. In general -/// the lifetime of a scope created like `scope(body)` goes something like this: -/// -/// - Scope begins when `scope(body)` is called -/// - Scope body `body()` is invoked -/// - Scope tasks may be spawned -/// - Scope body returns -/// - Scope tasks execute, possibly spawning more tasks -/// - Once all tasks are done, scope ends and `scope()` returns -/// -/// To see how and when tasks are joined, consider this example: -/// -/// ```rust -/// # use rayon_core as rayon; -/// // point start -/// rayon::scope(|s| { -/// s.spawn(|s| { // task s.1 -/// s.spawn(|s| { // task s.1.1 -/// rayon::scope(|t| { -/// t.spawn(|_| ()); // task t.1 -/// t.spawn(|_| ()); // task t.2 -/// }); -/// }); -/// }); -/// s.spawn(|s| { // task s.2 -/// }); -/// // point mid -/// }); -/// // point end -/// ``` -/// -/// The various tasks that are run will execute roughly like so: -/// -/// ```notrust -/// | (start) -/// | -/// | (scope `s` created) -/// +-----------------------------------------------+ (task s.2) -/// +-------+ (task s.1) | -/// | | | -/// | +---+ (task s.1.1) | -/// | | | | -/// | | | (scope `t` created) | -/// | | +----------------+ (task t.2) | -/// | | +---+ (task t.1) | | -/// | (mid) | | | | | -/// : | + <-+------------+ (scope `t` ends) | -/// : | | | -/// |<------+---+-----------------------------------+ (scope `s` ends) -/// | -/// | (end) -/// ``` -/// -/// The point here is that everything spawned into scope `s` will -/// terminate (at latest) at the same point -- right before the -/// original call to `rayon::scope` returns. This includes new -/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new -/// scope is created (such as `t`), the things spawned into that scope -/// will be joined before that scope returns, which in turn occurs -/// before the creating task (task `s.1.1` in this case) finishes. -/// -/// There is no guaranteed order of execution for spawns in a scope, -/// given that other threads may steal tasks at any time. However, they -/// are generally prioritized in a LIFO order on the thread from which -/// they were spawned. So in this example, absent any stealing, we can -/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other -/// threads always steal from the other end of the deque, like FIFO -/// order. The idea is that "recent" tasks are most likely to be fresh -/// in the local CPU's cache, while other threads can steal older -/// "stale" tasks. For an alternate approach, consider -/// [`scope_fifo()`] instead. -/// -/// [`scope_fifo()`]: fn.scope_fifo.html -/// -/// # Accessing stack data -/// -/// In general, spawned tasks may access stack data in place that -/// outlives the scope itself. Other data must be fully owned by the -/// spawned task. -/// -/// ```rust -/// # use rayon_core as rayon; -/// let ok: Vec<i32> = vec![1, 2, 3]; -/// rayon::scope(|s| { -/// let bad: Vec<i32> = vec![4, 5, 6]; -/// s.spawn(|_| { -/// // We can access `ok` because outlives the scope `s`. -/// println!("ok: {:?}", ok); -/// -/// // If we just try to use `bad` here, the closure will borrow `bad` -/// // (because we are just printing it out, and that only requires a -/// // borrow), which will result in a compilation error. Read on -/// // for options. -/// // println!("bad: {:?}", bad); -/// }); -/// }); -/// ``` -/// -/// As the comments example above suggest, to reference `bad` we must -/// take ownership of it. One way to do this is to detach the closure -/// from the surrounding stack frame, using the `move` keyword. This -/// will cause it to take ownership of *all* the variables it touches, -/// in this case including both `ok` *and* `bad`: -/// -/// ```rust -/// # use rayon_core as rayon; -/// let ok: Vec<i32> = vec![1, 2, 3]; -/// rayon::scope(|s| { -/// let bad: Vec<i32> = vec![4, 5, 6]; -/// s.spawn(move |_| { -/// println!("ok: {:?}", ok); -/// println!("bad: {:?}", bad); -/// }); -/// -/// // That closure is fine, but now we can't use `ok` anywhere else, -/// // since it is owned by the previous task: -/// // s.spawn(|_| println!("ok: {:?}", ok)); -/// }); -/// ``` -/// -/// While this works, it could be a problem if we want to use `ok` elsewhere. -/// There are two choices. We can keep the closure as a `move` closure, but -/// instead of referencing the variable `ok`, we create a shadowed variable that -/// is a borrow of `ok` and capture *that*: -/// -/// ```rust -/// # use rayon_core as rayon; -/// let ok: Vec<i32> = vec![1, 2, 3]; -/// rayon::scope(|s| { -/// let bad: Vec<i32> = vec![4, 5, 6]; -/// let ok: &Vec<i32> = &ok; // shadow the original `ok` -/// s.spawn(move |_| { -/// println!("ok: {:?}", ok); // captures the shadowed version -/// println!("bad: {:?}", bad); -/// }); -/// -/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references -/// // can be shared freely. Note that we need a `move` closure here though, -/// // because otherwise we'd be trying to borrow the shadowed `ok`, -/// // and that doesn't outlive `scope`. -/// s.spawn(move |_| println!("ok: {:?}", ok)); -/// }); -/// ``` -/// -/// Another option is not to use the `move` keyword but instead to take ownership -/// of individual variables: -/// -/// ```rust -/// # use rayon_core as rayon; -/// let ok: Vec<i32> = vec![1, 2, 3]; -/// rayon::scope(|s| { -/// let bad: Vec<i32> = vec![4, 5, 6]; -/// s.spawn(|_| { -/// // Transfer ownership of `bad` into a local variable (also named `bad`). -/// // This will force the closure to take ownership of `bad` from the environment. -/// let bad = bad; -/// println!("ok: {:?}", ok); // `ok` is only borrowed. -/// println!("bad: {:?}", bad); // refers to our local variable, above. -/// }); -/// -/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` -/// }); -/// ``` -/// -/// # Panics -/// -/// If a panic occurs, either in the closure given to `scope()` or in -/// any of the spawned jobs, that panic will be propagated and the -/// call to `scope()` will panic. If multiple panics occurs, it is -/// non-deterministic which of their panic values will propagate. -/// Regardless, once a task is spawned using `scope.spawn()`, it will -/// execute, even if the spawning task should later panic. `scope()` -/// returns once all spawned jobs have completed, and any panics are -/// propagated at that point. -pub fn scope<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&Scope<'scope>) -> R + Send, - R: Send, -{ - in_worker(|owner_thread, _| { - let scope = Scope::<'scope>::new(Some(owner_thread), None); - scope.base.complete(Some(owner_thread), || op(&scope)) - }) -} - -/// Creates a "fork-join" scope `s` with FIFO order, and invokes the -/// closure with a reference to `s`. This closure can then spawn -/// asynchronous tasks into `s`. Those tasks may run asynchronously with -/// respect to the closure; they may themselves spawn additional tasks -/// into `s`. When the closure returns, it will block until all tasks -/// that have been spawned into `s` complete. -/// -/// # Task execution -/// -/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a -/// difference in the order of execution. Consider a similar example: -/// -/// [`scope()`]: fn.scope.html -/// -/// ```rust -/// # use rayon_core as rayon; -/// // point start -/// rayon::scope_fifo(|s| { -/// s.spawn_fifo(|s| { // task s.1 -/// s.spawn_fifo(|s| { // task s.1.1 -/// rayon::scope_fifo(|t| { -/// t.spawn_fifo(|_| ()); // task t.1 -/// t.spawn_fifo(|_| ()); // task t.2 -/// }); -/// }); -/// }); -/// s.spawn_fifo(|s| { // task s.2 -/// }); -/// // point mid -/// }); -/// // point end -/// ``` -/// -/// The various tasks that are run will execute roughly like so: -/// -/// ```notrust -/// | (start) -/// | -/// | (FIFO scope `s` created) -/// +--------------------+ (task s.1) -/// +-------+ (task s.2) | -/// | | +---+ (task s.1.1) -/// | | | | -/// | | | | (FIFO scope `t` created) -/// | | | +----------------+ (task t.1) -/// | | | +---+ (task t.2) | -/// | (mid) | | | | | -/// : | | + <-+------------+ (scope `t` ends) -/// : | | | -/// |<------+------------+---+ (scope `s` ends) -/// | -/// | (end) -/// ``` -/// -/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on -/// the thread from which they were spawned, as opposed to `scope()`'s -/// LIFO. So in this example, we can expect `s.1` to execute before -/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in -/// FIFO order, as usual. Overall, this has roughly the same order as -/// the now-deprecated [`breadth_first`] option, except the effect is -/// isolated to a particular scope. If spawns are intermingled from any -/// combination of `scope()` and `scope_fifo()`, or from different -/// threads, their order is only specified with respect to spawns in the -/// same scope and thread. -/// -/// For more details on this design, see Rayon [RFC #1]. -/// -/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first -/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md -/// -/// # Panics -/// -/// If a panic occurs, either in the closure given to `scope_fifo()` or -/// in any of the spawned jobs, that panic will be propagated and the -/// call to `scope_fifo()` will panic. If multiple panics occurs, it is -/// non-deterministic which of their panic values will propagate. -/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it -/// will execute, even if the spawning task should later panic. -/// `scope_fifo()` returns once all spawned jobs have completed, and any -/// panics are propagated at that point. -pub fn scope_fifo<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, - R: Send, -{ - in_worker(|owner_thread, _| { - let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); - scope.base.complete(Some(owner_thread), || op(&scope)) - }) -} - -/// Creates a "fork-join" scope `s` and invokes the closure with a -/// reference to `s`. This closure can then spawn asynchronous tasks -/// into `s`. Those tasks may run asynchronously with respect to the -/// closure; they may themselves spawn additional tasks into `s`. When -/// the closure returns, it will block until all tasks that have been -/// spawned into `s` complete. -/// -/// This is just like `scope()` except the closure runs on the same thread -/// that calls `in_place_scope()`. Only work that it spawns runs in the -/// thread pool. -/// -/// # Panics -/// -/// If a panic occurs, either in the closure given to `in_place_scope()` or in -/// any of the spawned jobs, that panic will be propagated and the -/// call to `in_place_scope()` will panic. If multiple panics occurs, it is -/// non-deterministic which of their panic values will propagate. -/// Regardless, once a task is spawned using `scope.spawn()`, it will -/// execute, even if the spawning task should later panic. `in_place_scope()` -/// returns once all spawned jobs have completed, and any panics are -/// propagated at that point. -pub fn in_place_scope<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&Scope<'scope>) -> R, -{ - do_in_place_scope(None, op) -} - -pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R -where - OP: FnOnce(&Scope<'scope>) -> R, -{ - let thread = unsafe { WorkerThread::current().as_ref() }; - let scope = Scope::<'scope>::new(thread, registry); - scope.base.complete(thread, || op(&scope)) -} - -/// Creates a "fork-join" scope `s` with FIFO order, and invokes the -/// closure with a reference to `s`. This closure can then spawn -/// asynchronous tasks into `s`. Those tasks may run asynchronously with -/// respect to the closure; they may themselves spawn additional tasks -/// into `s`. When the closure returns, it will block until all tasks -/// that have been spawned into `s` complete. -/// -/// This is just like `scope_fifo()` except the closure runs on the same thread -/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the -/// thread pool. -/// -/// # Panics -/// -/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in -/// any of the spawned jobs, that panic will be propagated and the -/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is -/// non-deterministic which of their panic values will propagate. -/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will -/// execute, even if the spawning task should later panic. `in_place_scope_fifo()` -/// returns once all spawned jobs have completed, and any panics are -/// propagated at that point. -pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&ScopeFifo<'scope>) -> R, -{ - do_in_place_scope_fifo(None, op) -} - -pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R -where - OP: FnOnce(&ScopeFifo<'scope>) -> R, -{ - let thread = unsafe { WorkerThread::current().as_ref() }; - let scope = ScopeFifo::<'scope>::new(thread, registry); - scope.base.complete(thread, || op(&scope)) -} - -impl<'scope> Scope<'scope> { - fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { - let base = ScopeBase::new(owner, registry); - Scope { base } - } - - /// Spawns a job into the fork-join scope `self`. This job will - /// execute sometime before the fork-join scope completes. The - /// job is specified as a closure, and this closure receives its - /// own reference to the scope `self` as argument. This can be - /// used to inject new jobs into `self`. - /// - /// # Returns - /// - /// Nothing. The spawned closures cannot pass back values to the - /// caller directly, though they can write to local variables on - /// the stack (if those variables outlive the scope) or - /// communicate through shared channels. - /// - /// (The intention is to eventually integrate with Rust futures to - /// support spawns of functions that compute a value.) - /// - /// # Examples - /// - /// ```rust - /// # use rayon_core as rayon; - /// let mut value_a = None; - /// let mut value_b = None; - /// let mut value_c = None; - /// rayon::scope(|s| { - /// s.spawn(|s1| { - /// // ^ this is the same scope as `s`; this handle `s1` - /// // is intended for use by the spawned task, - /// // since scope handles cannot cross thread boundaries. - /// - /// value_a = Some(22); - /// - /// // the scope `s` will not end until all these tasks are done - /// s1.spawn(|_| { - /// value_b = Some(44); - /// }); - /// }); - /// - /// s.spawn(|_| { - /// value_c = Some(66); - /// }); - /// }); - /// assert_eq!(value_a, Some(22)); - /// assert_eq!(value_b, Some(44)); - /// assert_eq!(value_c, Some(66)); - /// ``` - /// - /// # See also - /// - /// The [`scope` function] has more extensive documentation about - /// task spawning. - /// - /// [`scope` function]: fn.scope.html - pub fn spawn<BODY>(&self, body: BODY) - where - BODY: FnOnce(&Scope<'scope>) + Send + 'scope, - { - let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || unsafe { - // SAFETY: this job will execute before the scope ends. - let scope = scope_ptr.as_ref(); - ScopeBase::execute_job(&scope.base, move || body(scope)) - }); - let job_ref = self.base.heap_job_ref(job); - - // Since `Scope` implements `Sync`, we can't be sure that we're still in a - // thread of this pool, so we can't just push to the local worker thread. - // Also, this might be an in-place scope. - self.base.registry.inject_or_push(job_ref); - } - - /// Spawns a job into every thread of the fork-join scope `self`. This job will - /// execute on each thread sometime before the fork-join scope completes. The - /// job is specified as a closure, and this closure receives its own reference - /// to the scope `self` as argument, as well as a `BroadcastContext`. - pub fn spawn_broadcast<BODY>(&self, body: BODY) - where - BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, - { - let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || unsafe { - // SAFETY: this job will execute before the scope ends. - let scope = scope_ptr.as_ref(); - let body = &body; - let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - ScopeBase::execute_job(&scope.base, func) - }); - self.base.inject_broadcast(job) - } -} - -impl<'scope> ScopeFifo<'scope> { - fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { - let base = ScopeBase::new(owner, registry); - let num_threads = base.registry.num_threads(); - let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); - ScopeFifo { base, fifos } - } - - /// Spawns a job into the fork-join scope `self`. This job will - /// execute sometime before the fork-join scope completes. The - /// job is specified as a closure, and this closure receives its - /// own reference to the scope `self` as argument. This can be - /// used to inject new jobs into `self`. - /// - /// # See also - /// - /// This method is akin to [`Scope::spawn()`], but with a FIFO - /// priority. The [`scope_fifo` function] has more details about - /// this distinction. - /// - /// [`Scope::spawn()`]: struct.Scope.html#method.spawn - /// [`scope_fifo` function]: fn.scope_fifo.html - pub fn spawn_fifo<BODY>(&self, body: BODY) - where - BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, - { - let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || unsafe { - // SAFETY: this job will execute before the scope ends. - let scope = scope_ptr.as_ref(); - ScopeBase::execute_job(&scope.base, move || body(scope)) - }); - let job_ref = self.base.heap_job_ref(job); - - // If we're in the pool, use our scope's private fifo for this thread to execute - // in a locally-FIFO order. Otherwise, just use the pool's global injector. - match self.base.registry.current_thread() { - Some(worker) => { - let fifo = &self.fifos[worker.index()]; - // SAFETY: this job will execute before the scope ends. - unsafe { worker.push(fifo.push(job_ref)) }; - } - None => self.base.registry.inject(job_ref), - } - } - - /// Spawns a job into every thread of the fork-join scope `self`. This job will - /// execute on each thread sometime before the fork-join scope completes. The - /// job is specified as a closure, and this closure receives its own reference - /// to the scope `self` as argument, as well as a `BroadcastContext`. - pub fn spawn_broadcast<BODY>(&self, body: BODY) - where - BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, - { - let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || unsafe { - // SAFETY: this job will execute before the scope ends. - let scope = scope_ptr.as_ref(); - let body = &body; - let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - ScopeBase::execute_job(&scope.base, func) - }); - self.base.inject_broadcast(job) - } -} - -impl<'scope> ScopeBase<'scope> { - /// Creates the base of a new scope for the given registry - fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { - let registry = registry.unwrap_or_else(|| match owner { - Some(owner) => owner.registry(), - None => global_registry(), - }); - - ScopeBase { - registry: Arc::clone(registry), - panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: CountLatch::new(owner), - marker: PhantomData, - } - } - - fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef - where - FUNC: FnOnce() + Send + 'scope, - { - unsafe { - self.job_completed_latch.increment(); - job.into_job_ref() - } - } - - fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>) - where - FUNC: Fn() + Send + Sync + 'scope, - { - let n_threads = self.registry.num_threads(); - let job_refs = (0..n_threads).map(|_| unsafe { - self.job_completed_latch.increment(); - ArcJob::as_job_ref(&job) - }); - - self.registry.inject_broadcast(job_refs); - } - - /// Executes `func` as a job, either aborting or executing as - /// appropriate. - fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R - where - FUNC: FnOnce() -> R, - { - let result = unsafe { Self::execute_job_closure(self, func) }; - self.job_completed_latch.wait(owner); - self.maybe_propagate_panic(); - result.unwrap() // only None if `op` panicked, and that would have been propagated - } - - /// Executes `func` as a job, either aborting or executing as - /// appropriate. - unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) - where - FUNC: FnOnce(), - { - let _: Option<()> = Self::execute_job_closure(this, func); - } - - /// Executes `func` as a job in scope. Adjusts the "job completed" - /// counters and also catches any panic and stores it into - /// `scope`. - unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> - where - FUNC: FnOnce() -> R, - { - let result = match unwind::halt_unwinding(func) { - Ok(r) => Some(r), - Err(err) => { - (*this).job_panicked(err); - None - } - }; - Latch::set(&(*this).job_completed_latch); - result - } - - fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { - // capture the first error we see, free the rest - if self.panic.load(Ordering::Relaxed).is_null() { - let nil = ptr::null_mut(); - let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr - let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; - if self - .panic - .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - // ownership now transferred into self.panic - } else { - // another panic raced in ahead of us, so drop ours - let _: Box<Box<_>> = ManuallyDrop::into_inner(err); - } - } - } - - fn maybe_propagate_panic(&self) { - // propagate panic, if any occurred; at this point, all - // outstanding jobs have completed, so we can use a relaxed - // ordering: - let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); - if !panic.is_null() { - let value = unsafe { Box::from_raw(panic) }; - unwind::resume_unwinding(*value); - } - } -} - -impl<'scope> fmt::Debug for Scope<'scope> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scope") - .field("pool_id", &self.base.registry.id()) - .field("panic", &self.base.panic) - .field("job_completed_latch", &self.base.job_completed_latch) - .finish() - } -} - -impl<'scope> fmt::Debug for ScopeFifo<'scope> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ScopeFifo") - .field("num_fifos", &self.fifos.len()) - .field("pool_id", &self.base.registry.id()) - .field("panic", &self.base.panic) - .field("job_completed_latch", &self.base.job_completed_latch) - .finish() - } -} - -/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. -/// -/// Unsafe code is still required to dereference the pointer, but that's fine in -/// scope jobs that are guaranteed to execute before the scope ends. -struct ScopePtr<T>(*const T); - -// SAFETY: !Send for raw pointers is not for safety, just as a lint -unsafe impl<T: Sync> Send for ScopePtr<T> {} - -// SAFETY: !Sync for raw pointers is not for safety, just as a lint -unsafe impl<T: Sync> Sync for ScopePtr<T> {} - -impl<T> ScopePtr<T> { - // Helper to avoid disjoint captures of `scope_ptr.0` - unsafe fn as_ref(&self) -> &T { - &*self.0 - } -} diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs deleted file mode 100644 index ad8c4af..0000000 --- a/vendor/rayon-core/src/scope/test.rs +++ /dev/null @@ -1,619 +0,0 @@ -use crate::unwind; -use crate::ThreadPoolBuilder; -use crate::{scope, scope_fifo, Scope, ScopeFifo}; -use rand::{Rng, SeedableRng}; -use rand_xorshift::XorShiftRng; -use std::cmp; -use std::iter::once; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Barrier, Mutex}; -use std::vec; - -#[test] -fn scope_empty() { - scope(|_| {}); -} - -#[test] -fn scope_result() { - let x = scope(|_| 22); - assert_eq!(x, 22); -} - -#[test] -fn scope_two() { - let counter = &AtomicUsize::new(0); - scope(|s| { - s.spawn(move |_| { - counter.fetch_add(1, Ordering::SeqCst); - }); - s.spawn(move |_| { - counter.fetch_add(10, Ordering::SeqCst); - }); - }); - - let v = counter.load(Ordering::SeqCst); - assert_eq!(v, 11); -} - -#[test] -fn scope_divide_and_conquer() { - let counter_p = &AtomicUsize::new(0); - scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024))); - - let counter_s = &AtomicUsize::new(0); - divide_and_conquer_seq(counter_s, 1024); - - let p = counter_p.load(Ordering::SeqCst); - let s = counter_s.load(Ordering::SeqCst); - assert_eq!(p, s); -} - -fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) { - if size > 1 { - scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); - scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); - } else { - // count the leaves - counter.fetch_add(1, Ordering::SeqCst); - } -} - -fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) { - if size > 1 { - divide_and_conquer_seq(counter, size / 2); - divide_and_conquer_seq(counter, size / 2); - } else { - // count the leaves - counter.fetch_add(1, Ordering::SeqCst); - } -} - -struct Tree<T: Send> { - value: T, - children: Vec<Tree<T>>, -} - -impl<T: Send> Tree<T> { - fn iter(&self) -> vec::IntoIter<&T> { - once(&self.value) - .chain(self.children.iter().flat_map(Tree::iter)) - .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow - .into_iter() - } - - fn update<OP>(&mut self, op: OP) - where - OP: Fn(&mut T) + Sync, - T: Send, - { - scope(|s| self.update_in_scope(&op, s)); - } - - fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>) - where - OP: Fn(&mut T) + Sync, - { - let Tree { - ref mut value, - ref mut children, - } = *self; - scope.spawn(move |scope| { - for child in children { - scope.spawn(move |scope| child.update_in_scope(op, scope)); - } - }); - - op(value); - } -} - -fn random_tree(depth: usize) -> Tree<u32> { - assert!(depth > 0); - let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); - (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); - let mut rng = XorShiftRng::from_seed(seed); - random_tree1(depth, &mut rng) -} - -fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> { - let children = if depth == 0 { - vec![] - } else { - (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level - .map(|_| random_tree1(depth - 1, rng)) - .collect() - }; - - Tree { - value: rng.gen_range(0..1_000_000), - children, - } -} - -#[test] -fn update_tree() { - let mut tree: Tree<u32> = random_tree(10); - let values: Vec<u32> = tree.iter().cloned().collect(); - tree.update(|v| *v += 1); - let new_values: Vec<u32> = tree.iter().cloned().collect(); - assert_eq!(values.len(), new_values.len()); - for (&i, &j) in values.iter().zip(&new_values) { - assert_eq!(i + 1, j); - } -} - -/// Check that if you have a chain of scoped tasks where T0 spawns T1 -/// spawns T2 and so forth down to Tn, the stack space should not grow -/// linearly with N. We test this by some unsafe hackery and -/// permitting an approx 10% change with a 10x input change. -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn linear_stack_growth() { - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let mut max_diff = Mutex::new(0); - let bottom_of_stack = 0; - scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5)); - let diff_when_5 = *max_diff.get_mut().unwrap() as f64; - - scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500)); - let diff_when_500 = *max_diff.get_mut().unwrap() as f64; - - let ratio = diff_when_5 / diff_when_500; - assert!( - ratio > 0.9 && ratio < 1.1, - "stack usage ratio out of bounds: {}", - ratio - ); - }); -} - -fn the_final_countdown<'scope>( - s: &Scope<'scope>, - bottom_of_stack: &'scope i32, - max: &'scope Mutex<usize>, - n: usize, -) { - let top_of_stack = 0; - let p = bottom_of_stack as *const i32 as usize; - let q = &top_of_stack as *const i32 as usize; - let diff = if p > q { p - q } else { q - p }; - - let mut data = max.lock().unwrap(); - *data = cmp::max(diff, *data); - - if n > 0 { - s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); - } -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_scope() { - scope(|_| panic!("Hello, world!")); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_spawn() { - scope(|s| s.spawn(|_| panic!("Hello, world!"))); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_nested_spawn() { - scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!"))))); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate_nested_scope_spawn() { - scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!"))))); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_propagate_still_execute_1() { - let mut x = false; - match unwind::halt_unwinding(|| { - scope(|s| { - s.spawn(|_| panic!("Hello, world!")); // job A - s.spawn(|_| x = true); // job B, should still execute even though A panics - }); - }) { - Ok(_) => panic!("failed to propagate panic"), - Err(_) => assert!(x, "job b failed to execute"), - } -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_propagate_still_execute_2() { - let mut x = false; - match unwind::halt_unwinding(|| { - scope(|s| { - s.spawn(|_| x = true); // job B, should still execute even though A panics - s.spawn(|_| panic!("Hello, world!")); // job A - }); - }) { - Ok(_) => panic!("failed to propagate panic"), - Err(_) => assert!(x, "job b failed to execute"), - } -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_propagate_still_execute_3() { - let mut x = false; - match unwind::halt_unwinding(|| { - scope(|s| { - s.spawn(|_| x = true); // spawned job should still execute despite later panic - panic!("Hello, world!"); - }); - }) { - Ok(_) => panic!("failed to propagate panic"), - Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), - } -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_propagate_still_execute_4() { - let mut x = false; - match unwind::halt_unwinding(|| { - scope(|s| { - s.spawn(|_| panic!("Hello, world!")); - x = true; - }); - }) { - Ok(_) => panic!("failed to propagate panic"), - Err(_) => assert!(x, "panic in spawn tainted scope"), - } -} - -macro_rules! test_order { - ($scope:ident => $spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let vec = Mutex::new(vec![]); - $scope(|scope| { - let vec = &vec; - for i in 0..10 { - scope.$spawn(move |scope| { - for j in 0..10 { - scope.$spawn(move |_| { - vec.lock().unwrap().push(i * 10 + j); - }); - } - }); - } - }); - vec.into_inner().unwrap() - }) - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn lifo_order() { - // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. - let vec = test_order!(scope => spawn); - let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn fifo_order() { - // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. - let vec = test_order!(scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -macro_rules! test_nested_order { - ($outer_scope:ident => $outer_spawn:ident, - $inner_scope:ident => $inner_spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let vec = Mutex::new(vec![]); - $outer_scope(|scope| { - let vec = &vec; - for i in 0..10 { - scope.$outer_spawn(move |_| { - $inner_scope(|scope| { - for j in 0..10 { - scope.$inner_spawn(move |_| { - vec.lock().unwrap().push(i * 10 + j); - }); - } - }); - }); - } - }); - vec.into_inner().unwrap() - }) - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_lifo_order() { - // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. - let vec = test_nested_order!(scope => spawn, scope => spawn); - let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_fifo_order() { - // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. - let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_lifo_fifo_order() { - // LIFO on the outside, FIFO on the inside - let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..10) - .rev() - .flat_map(|i| (0..10).map(move |j| i * 10 + j)) - .collect(); - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_fifo_lifo_order() { - // FIFO on the outside, LIFO on the inside - let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); - let expected: Vec<i32> = (0..10) - .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) - .collect(); - assert_eq!(vec, expected); -} - -macro_rules! spawn_push { - ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{ - $scope.$spawn(move |_| $vec.lock().unwrap().push($i)); - }}; -} - -/// Test spawns pushing a series of numbers, interleaved -/// such that negative values are using an inner scope. -macro_rules! test_mixed_order { - ($outer_scope:ident => $outer_spawn:ident, - $inner_scope:ident => $inner_spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let vec = Mutex::new(vec![]); - $outer_scope(|outer_scope| { - let vec = &vec; - spawn_push!(outer_scope.$outer_spawn, vec, 0); - $inner_scope(|inner_scope| { - spawn_push!(inner_scope.$inner_spawn, vec, -1); - spawn_push!(outer_scope.$outer_spawn, vec, 1); - spawn_push!(inner_scope.$inner_spawn, vec, -2); - spawn_push!(outer_scope.$outer_spawn, vec, 2); - spawn_push!(inner_scope.$inner_spawn, vec, -3); - }); - spawn_push!(outer_scope.$outer_spawn, vec, 3); - }); - vec.into_inner().unwrap() - }) - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_lifo_order() { - // NB: the end of the inner scope makes us execute some of the outer scope - // before they've all been spawned, so they're not perfectly LIFO. - let vec = test_mixed_order!(scope => spawn, scope => spawn); - let expected = vec![-3, 2, -2, 1, -1, 3, 0]; - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_fifo_order() { - let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); - let expected = vec![-1, 0, -2, 1, -3, 2, 3]; - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_lifo_fifo_order() { - // NB: the end of the inner scope makes us execute some of the outer scope - // before they've all been spawned, so they're not perfectly LIFO. - let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo); - let expected = vec![-1, 2, -2, 1, -3, 3, 0]; - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_fifo_lifo_order() { - let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); - let expected = vec![-3, 0, -2, 1, -1, 2, 3]; - assert_eq!(vec, expected); -} - -#[test] -fn static_scope() { - static COUNTER: AtomicUsize = AtomicUsize::new(0); - - let mut range = 0..100; - let sum = range.clone().sum(); - let iter = &mut range; - - COUNTER.store(0, Ordering::Relaxed); - scope(|s: &Scope<'static>| { - // While we're allowed the locally borrowed iterator, - // the spawns must be static. - for i in iter { - s.spawn(move |_| { - COUNTER.fetch_add(i, Ordering::Relaxed); - }); - } - }); - - assert_eq!(COUNTER.load(Ordering::Relaxed), sum); -} - -#[test] -fn static_scope_fifo() { - static COUNTER: AtomicUsize = AtomicUsize::new(0); - - let mut range = 0..100; - let sum = range.clone().sum(); - let iter = &mut range; - - COUNTER.store(0, Ordering::Relaxed); - scope_fifo(|s: &ScopeFifo<'static>| { - // While we're allowed the locally borrowed iterator, - // the spawns must be static. - for i in iter { - s.spawn_fifo(move |_| { - COUNTER.fetch_add(i, Ordering::Relaxed); - }); - } - }); - - assert_eq!(COUNTER.load(Ordering::Relaxed), sum); -} - -#[test] -fn mixed_lifetime_scope() { - fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { - scope(move |s: &Scope<'counter>| { - // We can borrow 'slice here, but the spawns can only borrow 'counter. - for &c in counters { - s.spawn(move |_| { - c.fetch_add(1, Ordering::Relaxed); - }); - } - }); - } - - let counter = AtomicUsize::new(0); - increment(&[&counter; 100]); - assert_eq!(counter.into_inner(), 100); -} - -#[test] -fn mixed_lifetime_scope_fifo() { - fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { - scope_fifo(move |s: &ScopeFifo<'counter>| { - // We can borrow 'slice here, but the spawns can only borrow 'counter. - for &c in counters { - s.spawn_fifo(move |_| { - c.fetch_add(1, Ordering::Relaxed); - }); - } - }); - } - - let counter = AtomicUsize::new(0); - increment(&[&counter; 100]); - assert_eq!(counter.into_inner(), 100); -} - -#[test] -fn scope_spawn_broadcast() { - let sum = AtomicUsize::new(0); - let n = scope(|s| { - s.spawn_broadcast(|_, ctx| { - sum.fetch_add(ctx.index(), Ordering::Relaxed); - }); - crate::current_num_threads() - }); - assert_eq!(sum.into_inner(), n * (n - 1) / 2); -} - -#[test] -fn scope_fifo_spawn_broadcast() { - let sum = AtomicUsize::new(0); - let n = scope_fifo(|s| { - s.spawn_broadcast(|_, ctx| { - sum.fetch_add(ctx.index(), Ordering::Relaxed); - }); - crate::current_num_threads() - }); - assert_eq!(sum.into_inner(), n * (n - 1) / 2); -} - -#[test] -fn scope_spawn_broadcast_nested() { - let sum = AtomicUsize::new(0); - let n = scope(|s| { - s.spawn_broadcast(|s, _| { - s.spawn_broadcast(|_, ctx| { - sum.fetch_add(ctx.index(), Ordering::Relaxed); - }); - }); - crate::current_num_threads() - }); - assert_eq!(sum.into_inner(), n * n * (n - 1) / 2); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_spawn_broadcast_barrier() { - let barrier = Barrier::new(8); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - pool.in_place_scope(|s| { - s.spawn_broadcast(|_, _| { - barrier.wait(); - }); - barrier.wait(); - }); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_spawn_broadcast_panic_one() { - let count = AtomicUsize::new(0); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let result = crate::unwind::halt_unwinding(|| { - pool.scope(|s| { - s.spawn_broadcast(|_, ctx| { - count.fetch_add(1, Ordering::Relaxed); - if ctx.index() == 3 { - panic!("Hello, world!"); - } - }); - }); - }); - assert_eq!(count.into_inner(), 7); - assert!(result.is_err(), "broadcast panic should propagate!"); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_spawn_broadcast_panic_many() { - let count = AtomicUsize::new(0); - let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); - let result = crate::unwind::halt_unwinding(|| { - pool.scope(|s| { - s.spawn_broadcast(|_, ctx| { - count.fetch_add(1, Ordering::Relaxed); - if ctx.index() % 2 == 0 { - panic!("Hello, world!"); - } - }); - }); - }); - assert_eq!(count.into_inner(), 7); - assert!(result.is_err(), "broadcast panic should propagate!"); -} diff --git a/vendor/rayon-core/src/sleep/README.md b/vendor/rayon-core/src/sleep/README.md deleted file mode 100644 index 55426c8..0000000 --- a/vendor/rayon-core/src/sleep/README.md +++ /dev/null @@ -1,219 +0,0 @@ -# Introduction: the sleep module - -The code in this module governs when worker threads should go to -sleep. The system used in this code was introduced in [Rayon RFC #5]. -There is also a [video walkthrough] available. Both of those may be -valuable resources to understanding the code, though naturally they -will also grow stale over time. The comments in this file are -extracted from the RFC and meant to be kept up to date. - -[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5 -[video walkthrough]: https://youtu.be/HvmQsE5M4cY - -# The `Sleep` struct - -The `Sleep` struct is embedded into each registry. It performs several functions: - -* It tracks when workers are awake or asleep. -* It decides how long a worker should look for work before it goes to sleep, - via a callback that is invoked periodically from the worker's search loop. -* It is notified when latches are set, jobs are published, or other - events occur, and it will go and wake the appropriate threads if - they are sleeping. - -# Thread states - -There are three main thread states: - -* An **active** thread is one that is actively executing a job. -* An **idle** thread is one that is searching for work to do. It will be - trying to steal work or pop work from the global injector queue. -* A **sleeping** thread is one that is blocked on a condition variable, - waiting to be awoken. - -We sometimes refer to the final two states collectively as **inactive**. -Threads begin as idle but transition to idle and finally sleeping when -they're unable to find work to do. - -## Sleepy threads - -There is one other special state worth mentioning. During the idle state, -threads can get **sleepy**. A sleepy thread is still idle, in that it is still -searching for work, but it is *about* to go to sleep after it does one more -search (or some other number, potentially). When a thread enters the sleepy -state, it signals (via the **jobs event counter**, described below) that it is -about to go to sleep. If new work is published, this will lead to the counter -being adjusted. When the thread actually goes to sleep, it will (hopefully, but -not guaranteed) see that the counter has changed and elect not to sleep, but -instead to search again. See the section on the **jobs event counter** for more -details. - -# The counters - -One of the key structs in the sleep module is `AtomicCounters`, found in -`counters.rs`. It packs three counters into one atomically managed value: - -* Two **thread counters**, which track the number of threads in a particular state. -* The **jobs event counter**, which is used to signal when new work is available. - It (sort of) tracks the number of jobs posted, but not quite, and it can rollover. - -## Thread counters - -There are two thread counters, one that tracks **inactive** threads and one that -tracks **sleeping** threads. From this, one can deduce the number of threads -that are idle by subtracting sleeping threads from inactive threads. We track -the counters in this way because it permits simpler atomic operations. One can -increment the number of sleeping threads (and thus decrease the number of idle -threads) simply by doing one atomic increment, for example. Similarly, one can -decrease the number of sleeping threads (and increase the number of idle -threads) through one atomic decrement. - -These counters are adjusted as follows: - -* When a thread enters the idle state: increment the inactive thread counter. -* When a thread enters the sleeping state: increment the sleeping thread counter. -* When a thread awakens a sleeping thread: decrement the sleeping thread counter. - * Subtle point: the thread that *awakens* the sleeping thread decrements the - counter, not the thread that is *sleeping*. This is because there is a delay - between signaling a thread to wake and the thread actually waking: - decrementing the counter when awakening the thread means that other threads - that may be posting work will see the up-to-date value that much faster. -* When a thread finds work, exiting the idle state: decrement the inactive - thread counter. - -## Jobs event counter - -The final counter is the **jobs event counter**. The role of this counter is to -help sleepy threads detect when new work is posted in a lightweight fashion. In -its simplest form, we would simply have a counter that gets incremented each -time a new job is posted. This way, when a thread gets sleepy, it could read the -counter, and then compare to see if the value has changed before it actually -goes to sleep. But this [turns out to be too expensive] in practice, so we use a -somewhat more complex scheme. - -[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747 - -The idea is that the counter toggles between two states, depending on whether -its value is even or odd (or, equivalently, on the value of its low bit): - -* Even -- If the low bit is zero, then it means that there has been no new work - since the last thread got sleepy. -* Odd -- If the low bit is one, then it means that new work was posted since - the last thread got sleepy. - -### New work is posted - -When new work is posted, we check the value of the counter: if it is even, -then we increment it by one, so that it becomes odd. - -### Worker thread gets sleepy - -When a worker thread gets sleepy, it will read the value of the counter. If the -counter is odd, it will increment the counter so that it is even. Either way, it -remembers the final value of the counter. The final value will be used later, -when the thread is going to sleep. If at that time the counter has not changed, -then we can assume no new jobs have been posted (though note the remote -possibility of rollover, discussed in detail below). - -# Protocol for a worker thread to post work - -The full protocol for a thread to post work is as follows - -* If the work is posted into the injection queue, then execute a seq-cst fence (see below). -* Load the counters, incrementing the JEC if it is even so that it is odd. -* Check if there are idle threads available to handle this new job. If not, - and there are sleeping threads, then wake one or more threads. - -# Protocol for a worker thread to fall asleep - -The full protocol for a thread to fall asleep is as follows: - -* After completing all its jobs, the worker goes idle and begins to - search for work. As it searches, it counts "rounds". In each round, - it searches all other work threads' queues, plus the 'injector queue' for - work injected from the outside. If work is found in this search, the thread - becomes active again and hence restarts this protocol from the top. -* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy` - above, remembering the `final_value` of the JEC. It does one more search for work. -* If no work is found, the thread atomically: - * Checks the JEC to see that it has not changed from `final_value`. - * If it has, then the thread goes back to searching for work. We reset to - just before we got sleepy, so that we will do one more search - before attending to sleep again (rather than searching for many rounds). - * Increments the number of sleeping threads by 1. -* The thread then executes a seq-cst fence operation (see below). -* The thread then does one final check for injected jobs (see below). If any - are available, it returns to the 'pre-sleepy' state as if the JEC had changed. -* The thread waits to be signaled. Once signaled, it returns to the idle state. - -# The jobs event counter and deadlock - -As described in the section on the JEC, the main concern around going to sleep -is avoiding a race condition wherein: - -* Thread A looks for work, finds none. -* Thread B posts work but sees no sleeping threads. -* Thread A goes to sleep. - -The JEC protocol largely prevents this, but due to rollover, this prevention is -not complete. It is possible -- if unlikely -- that enough activity occurs for -Thread A to observe the same JEC value that it saw when getting sleepy. If the -new work being published came from *inside* the thread-pool, then this race -condition isn't too harmful. It means that we have fewer workers processing the -work then we should, but we won't deadlock. This seems like an acceptable risk -given that this is unlikely in practice. - -However, if the work was posted as an *external* job, that is a problem. In that -case, it's possible that all of our workers could go to sleep, and the external -job would never get processed. To prevent that, the sleeping protocol includes -one final check to see if the injector queue is empty before fully falling -asleep. Note that this final check occurs **after** the number of sleeping -threads has been incremented. We are not concerned therefore with races against -injections that occur after that increment, only before. - -Unfortunately, there is one rather subtle point concerning this final check: -we wish to avoid the possibility that: - -* work is pushed into the injection queue by an outside thread X, -* the sleepy thread S sees the JEC but it has rolled over and is equal -* the sleepy thread S reads the injection queue but does not see the work posted by X. - -This is possible because the C++ memory model typically offers guarantees of the -form "if you see the access A, then you must see those other accesses" -- but it -doesn't guarantee that you will see the access A (i.e., if you think of -processors with independent caches, you may be operating on very out of date -cache state). - -## Using seq-cst fences to prevent deadlock - -To overcome this problem, we have inserted two sequentially consistent fence -operations into the protocols above: - -* One fence occurs after work is posted into the injection queue, but before the - counters are read (including the number of sleeping threads). - * Note that no fence is needed for work posted to internal queues, since it is ok - to overlook work in that case. -* One fence occurs after the number of sleeping threads is incremented, but - before the injection queue is read. - -### Proof sketch - -What follows is a "proof sketch" that the protocol is deadlock free. We model -two relevant bits of memory, the job injector queue J and the atomic counters C. - -Consider the actions of the injecting thread: - -* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics. -* PushFence: A sequentially consistent fence is executed. -* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first). - -Meanwhile, the sleepy thread does the following: - -* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C. -* SleepFence: A sequentially consistent fence is executed. -* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics. - -Either PushFence or SleepFence must come first: - -* If PushFence comes first, then PushJob must be visible to ReadJob. -* If SleepFence comes first, then IncSleepers is visible to ReadSleepers.
\ No newline at end of file diff --git a/vendor/rayon-core/src/sleep/counters.rs b/vendor/rayon-core/src/sleep/counters.rs deleted file mode 100644 index 53d2c55..0000000 --- a/vendor/rayon-core/src/sleep/counters.rs +++ /dev/null @@ -1,277 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -pub(super) struct AtomicCounters { - /// Packs together a number of counters. The counters are ordered as - /// follows, from least to most significant bits (here, we assuming - /// that [`THREADS_BITS`] is equal to 10): - /// - /// * Bits 0..10: Stores the number of **sleeping threads** - /// * Bits 10..20: Stores the number of **inactive threads** - /// * Bits 20..: Stores the **job event counter** (JEC) - /// - /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note - /// that the total number of bits (and hence the number of bits used for the - /// JEC) will depend on whether we are using a 32- or 64-bit architecture. - value: AtomicUsize, -} - -#[derive(Copy, Clone)] -pub(super) struct Counters { - word: usize, -} - -/// A value read from the **Jobs Event Counter**. -/// See the [`README.md`](README.md) for more -/// coverage of how the jobs event counter works. -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] -pub(super) struct JobsEventCounter(usize); - -impl JobsEventCounter { - pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); - - #[inline] - pub(super) fn as_usize(self) -> usize { - self.0 - } - - /// The JEC "is sleepy" if the last thread to increment it was in the - /// process of becoming sleepy. This is indicated by its value being *even*. - /// When new jobs are posted, they check if the JEC is sleepy, and if so - /// they incremented it. - #[inline] - pub(super) fn is_sleepy(self) -> bool { - (self.as_usize() & 1) == 0 - } - - /// The JEC "is active" if the last thread to increment it was posting new - /// work. This is indicated by its value being *odd*. When threads get - /// sleepy, they will check if the JEC is active, and increment it. - #[inline] - pub(super) fn is_active(self) -> bool { - !self.is_sleepy() - } -} - -/// Number of bits used for the thread counters. -#[cfg(target_pointer_width = "64")] -const THREADS_BITS: usize = 16; - -#[cfg(target_pointer_width = "32")] -const THREADS_BITS: usize = 8; - -/// Bits to shift to select the sleeping threads -/// (used with `select_bits`). -#[allow(clippy::erasing_op)] -const SLEEPING_SHIFT: usize = 0 * THREADS_BITS; - -/// Bits to shift to select the inactive threads -/// (used with `select_bits`). -#[allow(clippy::identity_op)] -const INACTIVE_SHIFT: usize = 1 * THREADS_BITS; - -/// Bits to shift to select the JEC -/// (use JOBS_BITS). -const JEC_SHIFT: usize = 2 * THREADS_BITS; - -/// Max value for the thread counters. -pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1; - -/// Constant that can be added to add one sleeping thread. -const ONE_SLEEPING: usize = 1; - -/// Constant that can be added to add one inactive thread. -/// An inactive thread is either idle, sleepy, or sleeping. -const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT; - -/// Constant that can be added to add one to the JEC. -const ONE_JEC: usize = 1 << JEC_SHIFT; - -impl AtomicCounters { - #[inline] - pub(super) fn new() -> AtomicCounters { - AtomicCounters { - value: AtomicUsize::new(0), - } - } - - /// Load and return the current value of the various counters. - /// This value can then be given to other method which will - /// attempt to update the counters via compare-and-swap. - #[inline] - pub(super) fn load(&self, ordering: Ordering) -> Counters { - Counters::new(self.value.load(ordering)) - } - - #[inline] - fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool { - self.value - .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed) - .is_ok() - } - - /// Adds an inactive thread. This cannot fail. - /// - /// This should be invoked when a thread enters its idle loop looking - /// for work. It is decremented when work is found. Note that it is - /// not decremented if the thread transitions from idle to sleepy or sleeping; - /// so the number of inactive threads is always greater-than-or-equal - /// to the number of sleeping threads. - #[inline] - pub(super) fn add_inactive_thread(&self) { - self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); - } - - /// Increments the jobs event counter if `increment_when`, when applied to - /// the current value, is true. Used to toggle the JEC from even (sleepy) to - /// odd (active) or vice versa. Returns the final value of the counters, for - /// which `increment_when` is guaranteed to return false. - pub(super) fn increment_jobs_event_counter_if( - &self, - increment_when: impl Fn(JobsEventCounter) -> bool, - ) -> Counters { - loop { - let old_value = self.load(Ordering::SeqCst); - if increment_when(old_value.jobs_counter()) { - let new_value = old_value.increment_jobs_counter(); - if self.try_exchange(old_value, new_value, Ordering::SeqCst) { - return new_value; - } - } else { - return old_value; - } - } - } - - /// Subtracts an inactive thread. This cannot fail. It is invoked - /// when a thread finds work and hence becomes active. It returns the - /// number of sleeping threads to wake up (if any). - /// - /// See `add_inactive_thread`. - #[inline] - pub(super) fn sub_inactive_thread(&self) -> usize { - let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst)); - debug_assert!( - old_value.inactive_threads() > 0, - "sub_inactive_thread: old_value {:?} has no inactive threads", - old_value, - ); - debug_assert!( - old_value.sleeping_threads() <= old_value.inactive_threads(), - "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads", - old_value, - old_value.sleeping_threads(), - old_value.inactive_threads(), - ); - - // Current heuristic: whenever an inactive thread goes away, if - // there are any sleeping threads, wake 'em up. - let sleeping_threads = old_value.sleeping_threads(); - std::cmp::min(sleeping_threads, 2) - } - - /// Subtracts a sleeping thread. This cannot fail, but it is only - /// safe to do if you you know the number of sleeping threads is - /// non-zero (i.e., because you have just awoken a sleeping - /// thread). - #[inline] - pub(super) fn sub_sleeping_thread(&self) { - let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst)); - debug_assert!( - old_value.sleeping_threads() > 0, - "sub_sleeping_thread: old_value {:?} had no sleeping threads", - old_value, - ); - debug_assert!( - old_value.sleeping_threads() <= old_value.inactive_threads(), - "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads", - old_value, - old_value.sleeping_threads(), - old_value.inactive_threads(), - ); - } - - #[inline] - pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool { - debug_assert!( - old_value.inactive_threads() > 0, - "try_add_sleeping_thread: old_value {:?} has no inactive threads", - old_value, - ); - debug_assert!( - old_value.sleeping_threads() < THREADS_MAX, - "try_add_sleeping_thread: old_value {:?} has too many sleeping threads", - old_value, - ); - - let mut new_value = old_value; - new_value.word += ONE_SLEEPING; - - self.try_exchange(old_value, new_value, Ordering::SeqCst) - } -} - -#[inline] -fn select_thread(word: usize, shift: usize) -> usize { - (word >> shift) & THREADS_MAX -} - -#[inline] -fn select_jec(word: usize) -> usize { - word >> JEC_SHIFT -} - -impl Counters { - #[inline] - fn new(word: usize) -> Counters { - Counters { word } - } - - #[inline] - fn increment_jobs_counter(self) -> Counters { - // We can freely add to JEC because it occupies the most significant bits. - // Thus it doesn't overflow into the other counters, just wraps itself. - Counters { - word: self.word.wrapping_add(ONE_JEC), - } - } - - #[inline] - pub(super) fn jobs_counter(self) -> JobsEventCounter { - JobsEventCounter(select_jec(self.word)) - } - - /// The number of threads that are not actively - /// executing work. They may be idle, sleepy, or asleep. - #[inline] - pub(super) fn inactive_threads(self) -> usize { - select_thread(self.word, INACTIVE_SHIFT) - } - - #[inline] - pub(super) fn awake_but_idle_threads(self) -> usize { - debug_assert!( - self.sleeping_threads() <= self.inactive_threads(), - "sleeping threads: {} > raw idle threads {}", - self.sleeping_threads(), - self.inactive_threads() - ); - self.inactive_threads() - self.sleeping_threads() - } - - #[inline] - pub(super) fn sleeping_threads(self) -> usize { - select_thread(self.word, SLEEPING_SHIFT) - } -} - -impl std::fmt::Debug for Counters { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let word = format!("{:016x}", self.word); - fmt.debug_struct("Counters") - .field("word", &word) - .field("jobs", &self.jobs_counter().0) - .field("inactive", &self.inactive_threads()) - .field("sleeping", &self.sleeping_threads()) - .finish() - } -} diff --git a/vendor/rayon-core/src/sleep/mod.rs b/vendor/rayon-core/src/sleep/mod.rs deleted file mode 100644 index 03d1077..0000000 --- a/vendor/rayon-core/src/sleep/mod.rs +++ /dev/null @@ -1,325 +0,0 @@ -//! Code that decides when workers should go to sleep. See README.md -//! for an overview. - -use crate::latch::CoreLatch; -use crossbeam_utils::CachePadded; -use std::sync::atomic::Ordering; -use std::sync::{Condvar, Mutex}; -use std::thread; -use std::usize; - -mod counters; -pub(crate) use self::counters::THREADS_MAX; -use self::counters::{AtomicCounters, JobsEventCounter}; - -/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping -/// of workers. It has callbacks that are invoked periodically at significant events, -/// such as when workers are looping and looking for work, when latches are set, or when -/// jobs are published, and it either blocks threads or wakes them in response to these -/// events. See the [`README.md`] in this module for more details. -/// -/// [`README.md`] README.md -pub(super) struct Sleep { - /// One "sleep state" per worker. Used to track if a worker is sleeping and to have - /// them block. - worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, - - counters: AtomicCounters, -} - -/// An instance of this struct is created when a thread becomes idle. -/// It is consumed when the thread finds work, and passed by `&mut` -/// reference for operations that preserve the idle state. (In other -/// words, producing one of these structs is evidence the thread is -/// idle.) It tracks state such as how long the thread has been idle. -pub(super) struct IdleState { - /// What is worker index of the idle thread? - worker_index: usize, - - /// How many rounds have we been circling without sleeping? - rounds: u32, - - /// Once we become sleepy, what was the sleepy counter value? - /// Set to `INVALID_SLEEPY_COUNTER` otherwise. - jobs_counter: JobsEventCounter, -} - -/// The "sleep state" for an individual worker. -#[derive(Default)] -struct WorkerSleepState { - /// Set to true when the worker goes to sleep; set to false when - /// the worker is notified or when it wakes. - is_blocked: Mutex<bool>, - - condvar: Condvar, -} - -const ROUNDS_UNTIL_SLEEPY: u32 = 32; -const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; - -impl Sleep { - pub(super) fn new(n_threads: usize) -> Sleep { - assert!(n_threads <= THREADS_MAX); - Sleep { - worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), - counters: AtomicCounters::new(), - } - } - - #[inline] - pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { - self.counters.add_inactive_thread(); - - IdleState { - worker_index, - rounds: 0, - jobs_counter: JobsEventCounter::DUMMY, - } - } - - #[inline] - pub(super) fn work_found(&self) { - // If we were the last idle thread and other threads are still sleeping, - // then we should wake up another thread. - let threads_to_wake = self.counters.sub_inactive_thread(); - self.wake_any_threads(threads_to_wake as u32); - } - - #[inline] - pub(super) fn no_work_found( - &self, - idle_state: &mut IdleState, - latch: &CoreLatch, - has_injected_jobs: impl FnOnce() -> bool, - ) { - if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { - thread::yield_now(); - idle_state.rounds += 1; - } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { - idle_state.jobs_counter = self.announce_sleepy(); - idle_state.rounds += 1; - thread::yield_now(); - } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { - idle_state.rounds += 1; - thread::yield_now(); - } else { - debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); - self.sleep(idle_state, latch, has_injected_jobs); - } - } - - #[cold] - fn announce_sleepy(&self) -> JobsEventCounter { - self.counters - .increment_jobs_event_counter_if(JobsEventCounter::is_active) - .jobs_counter() - } - - #[cold] - fn sleep( - &self, - idle_state: &mut IdleState, - latch: &CoreLatch, - has_injected_jobs: impl FnOnce() -> bool, - ) { - let worker_index = idle_state.worker_index; - - if !latch.get_sleepy() { - return; - } - - let sleep_state = &self.worker_sleep_states[worker_index]; - let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); - debug_assert!(!*is_blocked); - - // Our latch was signalled. We should wake back up fully as we - // will have some stuff to do. - if !latch.fall_asleep() { - idle_state.wake_fully(); - return; - } - - loop { - let counters = self.counters.load(Ordering::SeqCst); - - // Check if the JEC has changed since we got sleepy. - debug_assert!(idle_state.jobs_counter.is_sleepy()); - if counters.jobs_counter() != idle_state.jobs_counter { - // JEC has changed, so a new job was posted, but for some reason - // we didn't see it. We should return to just before the SLEEPY - // state so we can do another search and (if we fail to find - // work) go back to sleep. - idle_state.wake_partly(); - latch.wake_up(); - return; - } - - // Otherwise, let's move from IDLE to SLEEPING. - if self.counters.try_add_sleeping_thread(counters) { - break; - } - } - - // Successfully registered as asleep. - - // We have one last check for injected jobs to do. This protects against - // deadlock in the very unlikely event that - // - // - an external job is being injected while we are sleepy - // - that job triggers the rollover over the JEC such that we don't see it - // - we are the last active worker thread - std::sync::atomic::fence(Ordering::SeqCst); - if has_injected_jobs() { - // If we see an externally injected job, then we have to 'wake - // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by - // the one that wakes us.) - self.counters.sub_sleeping_thread(); - } else { - // If we don't see an injected job (the normal case), then flag - // ourselves as asleep and wait till we are notified. - // - // (Note that `is_blocked` is held under a mutex and the mutex was - // acquired *before* we incremented the "sleepy counter". This means - // that whomever is coming to wake us will have to wait until we - // release the mutex in the call to `wait`, so they will see this - // boolean as true.) - *is_blocked = true; - while *is_blocked { - is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); - } - } - - // Update other state: - idle_state.wake_fully(); - latch.wake_up(); - } - - /// Notify the given thread that it should wake up (if it is - /// sleeping). When this method is invoked, we typically know the - /// thread is asleep, though in rare cases it could have been - /// awoken by (e.g.) new work having been posted. - pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { - self.wake_specific_thread(target_worker_index); - } - - /// Signals that `num_jobs` new jobs were injected into the thread - /// pool from outside. This function will ensure that there are - /// threads available to process them, waking threads from sleep - /// if necessary. - /// - /// # Parameters - /// - /// - `num_jobs` -- lower bound on number of jobs available for stealing. - /// We'll try to get at least one thread per job. - #[inline] - pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { - // This fence is needed to guarantee that threads - // as they are about to fall asleep, observe any - // new jobs that may have been injected. - std::sync::atomic::fence(Ordering::SeqCst); - - self.new_jobs(num_jobs, queue_was_empty) - } - - /// Signals that `num_jobs` new jobs were pushed onto a thread's - /// local deque. This function will try to ensure that there are - /// threads available to process them, waking threads from sleep - /// if necessary. However, this is not guaranteed: under certain - /// race conditions, the function may fail to wake any new - /// threads; in that case the existing thread should eventually - /// pop the job. - /// - /// # Parameters - /// - /// - `num_jobs` -- lower bound on number of jobs available for stealing. - /// We'll try to get at least one thread per job. - #[inline] - pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { - self.new_jobs(num_jobs, queue_was_empty) - } - - /// Common helper for `new_injected_jobs` and `new_internal_jobs`. - #[inline] - fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { - // Read the counters and -- if sleepy workers have announced themselves - // -- announce that there is now work available. The final value of `counters` - // with which we exit the loop thus corresponds to a state when - let counters = self - .counters - .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); - let num_awake_but_idle = counters.awake_but_idle_threads(); - let num_sleepers = counters.sleeping_threads(); - - if num_sleepers == 0 { - // nobody to wake - return; - } - - // Promote from u16 to u32 so we can interoperate with - // num_jobs more easily. - let num_awake_but_idle = num_awake_but_idle as u32; - let num_sleepers = num_sleepers as u32; - - // If the queue is non-empty, then we always wake up a worker - // -- clearly the existing idle jobs aren't enough. Otherwise, - // check to see if we have enough idle workers. - if !queue_was_empty { - let num_to_wake = std::cmp::min(num_jobs, num_sleepers); - self.wake_any_threads(num_to_wake); - } else if num_awake_but_idle < num_jobs { - let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); - self.wake_any_threads(num_to_wake); - } - } - - #[cold] - fn wake_any_threads(&self, mut num_to_wake: u32) { - if num_to_wake > 0 { - for i in 0..self.worker_sleep_states.len() { - if self.wake_specific_thread(i) { - num_to_wake -= 1; - if num_to_wake == 0 { - return; - } - } - } - } - } - - fn wake_specific_thread(&self, index: usize) -> bool { - let sleep_state = &self.worker_sleep_states[index]; - - let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); - if *is_blocked { - *is_blocked = false; - sleep_state.condvar.notify_one(); - - // When the thread went to sleep, it will have incremented - // this value. When we wake it, its our job to decrement - // it. We could have the thread do it, but that would - // introduce a delay between when the thread was - // *notified* and when this counter was decremented. That - // might mislead people with new work into thinking that - // there are sleeping threads that they should try to - // wake, when in fact there is nothing left for them to - // do. - self.counters.sub_sleeping_thread(); - - true - } else { - false - } - } -} - -impl IdleState { - fn wake_fully(&mut self) { - self.rounds = 0; - self.jobs_counter = JobsEventCounter::DUMMY; - } - - fn wake_partly(&mut self) { - self.rounds = ROUNDS_UNTIL_SLEEPY; - self.jobs_counter = JobsEventCounter::DUMMY; - } -} diff --git a/vendor/rayon-core/src/spawn/mod.rs b/vendor/rayon-core/src/spawn/mod.rs deleted file mode 100644 index 1aa9edb..0000000 --- a/vendor/rayon-core/src/spawn/mod.rs +++ /dev/null @@ -1,163 +0,0 @@ -use crate::job::*; -use crate::registry::Registry; -use crate::unwind; -use std::mem; -use std::sync::Arc; - -/// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not -/// tied to the current stack frame, and hence it cannot hold any -/// references other than those with `'static` lifetime. If you want -/// to spawn a task that references stack data, use [the `scope()` -/// function][scope] to create a scope. -/// -/// [scope]: fn.scope.html -/// -/// Since tasks spawned with this function cannot hold references into -/// the enclosing stack frame, you almost certainly want to use a -/// `move` closure as their argument (otherwise, the closure will -/// typically hold references to any variables from the enclosing -/// function that you happen to use). -/// -/// This API assumes that the closure is executed purely for its -/// side-effects (i.e., it might send messages, modify data protected -/// by a mutex, or some such thing). -/// -/// There is no guaranteed order of execution for spawns, given that -/// other threads may steal tasks at any time. However, they are -/// generally prioritized in a LIFO order on the thread from which -/// they were spawned. Other threads always steal from the other end of -/// the deque, like FIFO order. The idea is that "recent" tasks are -/// most likely to be fresh in the local CPU's cache, while other -/// threads can steal older "stale" tasks. For an alternate approach, -/// consider [`spawn_fifo()`] instead. -/// -/// [`spawn_fifo()`]: fn.spawn_fifo.html -/// -/// # Panic handling -/// -/// If this closure should panic, the resulting panic will be -/// propagated to the panic handler registered in the `ThreadPoolBuilder`, -/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more -/// details. -/// -/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler -/// -/// # Examples -/// -/// This code creates a Rayon task that increments a global counter. -/// -/// ```rust -/// # use rayon_core as rayon; -/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -/// -/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; -/// -/// rayon::spawn(move || { -/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst); -/// }); -/// ``` -pub fn spawn<F>(func: F) -where - F: FnOnce() + Send + 'static, -{ - // We assert that current registry has not terminated. - unsafe { spawn_in(func, &Registry::current()) } -} - -/// Spawns an asynchronous job in `registry.` -/// -/// Unsafe because `registry` must not yet have terminated. -pub(super) unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>) -where - F: FnOnce() + Send + 'static, -{ - // We assert that this does not hold any references (we know - // this because of the `'static` bound in the interface); - // moreover, we assert that the code below is not supposed to - // be able to panic, and hence the data won't leak but will be - // enqueued into some deque for later execution. - let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic - let job_ref = spawn_job(func, registry); - registry.inject_or_push(job_ref); - mem::forget(abort_guard); -} - -unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef -where - F: FnOnce() + Send + 'static, -{ - // Ensure that registry cannot terminate until this job has - // executed. This ref is decremented at the (*) below. - registry.increment_terminate_count(); - - HeapJob::new({ - let registry = Arc::clone(registry); - move || { - registry.catch_unwind(func); - registry.terminate(); // (*) permit registry to terminate now - } - }) - .into_static_job_ref() -} - -/// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not -/// tied to the current stack frame, and hence it cannot hold any -/// references other than those with `'static` lifetime. If you want -/// to spawn a task that references stack data, use [the `scope_fifo()` -/// function](fn.scope_fifo.html) to create a scope. -/// -/// The behavior is essentially the same as [the `spawn` -/// function](fn.spawn.html), except that calls from the same thread -/// will be prioritized in FIFO order. This is similar to the now- -/// deprecated [`breadth_first`] option, except the effect is isolated -/// to relative `spawn_fifo` calls, not all threadpool tasks. -/// -/// For more details on this design, see Rayon [RFC #1]. -/// -/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first -/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md -/// -/// # Panic handling -/// -/// If this closure should panic, the resulting panic will be -/// propagated to the panic handler registered in the `ThreadPoolBuilder`, -/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more -/// details. -/// -/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler -pub fn spawn_fifo<F>(func: F) -where - F: FnOnce() + Send + 'static, -{ - // We assert that current registry has not terminated. - unsafe { spawn_fifo_in(func, &Registry::current()) } -} - -/// Spawns an asynchronous FIFO job in `registry.` -/// -/// Unsafe because `registry` must not yet have terminated. -pub(super) unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>) -where - F: FnOnce() + Send + 'static, -{ - // We assert that this does not hold any references (we know - // this because of the `'static` bound in the interface); - // moreover, we assert that the code below is not supposed to - // be able to panic, and hence the data won't leak but will be - // enqueued into some deque for later execution. - let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic - let job_ref = spawn_job(func, registry); - - // If we're in the pool, use our thread's private fifo for this thread to execute - // in a locally-FIFO order. Otherwise, just use the pool's global injector. - match registry.current_thread() { - Some(worker) => worker.push_fifo(job_ref), - None => registry.inject(job_ref), - } - mem::forget(abort_guard); -} - -#[cfg(test)] -mod test; diff --git a/vendor/rayon-core/src/spawn/test.rs b/vendor/rayon-core/src/spawn/test.rs deleted file mode 100644 index b7a0535..0000000 --- a/vendor/rayon-core/src/spawn/test.rs +++ /dev/null @@ -1,255 +0,0 @@ -use crate::scope; -use std::any::Any; -use std::sync::mpsc::channel; -use std::sync::Mutex; - -use super::{spawn, spawn_fifo}; -use crate::ThreadPoolBuilder; - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_then_join_in_worker() { - let (tx, rx) = channel(); - scope(move |_| { - spawn(move || tx.send(22).unwrap()); - }); - assert_eq!(22, rx.recv().unwrap()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_then_join_outside_worker() { - let (tx, rx) = channel(); - spawn(move || tx.send(22).unwrap()); - assert_eq!(22, rx.recv().unwrap()); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_fwd() { - let (tx, rx) = channel(); - - let tx = Mutex::new(tx); - let panic_handler = move |err: Box<dyn Any + Send>| { - let tx = tx.lock().unwrap(); - if let Some(&msg) = err.downcast_ref::<&str>() { - if msg == "Hello, world!" { - tx.send(1).unwrap(); - } else { - tx.send(2).unwrap(); - } - } else { - tx.send(3).unwrap(); - } - }; - - let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); - - builder - .build() - .unwrap() - .spawn(move || panic!("Hello, world!")); - - assert_eq!(1, rx.recv().unwrap()); -} - -/// Test what happens when the thread-pool is dropped but there are -/// still active asynchronous tasks. We expect the thread-pool to stay -/// alive and executing until those threads are complete. -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn termination_while_things_are_executing() { - let (tx0, rx0) = channel(); - let (tx1, rx1) = channel(); - - // Create a thread-pool and spawn some code in it, but then drop - // our reference to it. - { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - thread_pool.spawn(move || { - let data = rx0.recv().unwrap(); - - // At this point, we know the "main" reference to the - // `ThreadPool` has been dropped, but there are still - // active threads. Launch one more. - spawn(move || { - tx1.send(data).unwrap(); - }); - }); - } - - tx0.send(22).unwrap(); - let v = rx1.recv().unwrap(); - assert_eq!(v, 22); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn custom_panic_handler_and_spawn() { - let (tx, rx) = channel(); - - // Create a parallel closure that will send panics on the - // channel; since the closure is potentially executed in parallel - // with itself, we have to wrap `tx` in a mutex. - let tx = Mutex::new(tx); - let panic_handler = move |e: Box<dyn Any + Send>| { - tx.lock().unwrap().send(e).unwrap(); - }; - - // Execute an async that will panic. - let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); - builder.build().unwrap().spawn(move || { - panic!("Hello, world!"); - }); - - // Check that we got back the panic we expected. - let error = rx.recv().unwrap(); - if let Some(&msg) = error.downcast_ref::<&str>() { - assert_eq!(msg, "Hello, world!"); - } else { - panic!("did not receive a string from panic handler"); - } -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn custom_panic_handler_and_nested_spawn() { - let (tx, rx) = channel(); - - // Create a parallel closure that will send panics on the - // channel; since the closure is potentially executed in parallel - // with itself, we have to wrap `tx` in a mutex. - let tx = Mutex::new(tx); - let panic_handler = move |e| { - tx.lock().unwrap().send(e).unwrap(); - }; - - // Execute an async that will (eventually) panic. - const PANICS: usize = 3; - let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); - builder.build().unwrap().spawn(move || { - // launch 3 nested spawn-asyncs; these should be in the same - // thread-pool and hence inherit the same panic handler - for _ in 0..PANICS { - spawn(move || { - panic!("Hello, world!"); - }); - } - }); - - // Check that we get back the panics we expected. - for _ in 0..PANICS { - let error = rx.recv().unwrap(); - if let Some(&msg) = error.downcast_ref::<&str>() { - assert_eq!(msg, "Hello, world!"); - } else { - panic!("did not receive a string from panic handler"); - } - } -} - -macro_rules! test_order { - ($outer_spawn:ident, $inner_spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - let (tx, rx) = channel(); - pool.install(move || { - for i in 0..10 { - let tx = tx.clone(); - $outer_spawn(move || { - for j in 0..10 { - let tx = tx.clone(); - $inner_spawn(move || { - tx.send(i * 10 + j).unwrap(); - }); - } - }); - } - }); - rx.iter().collect::<Vec<i32>>() - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn lifo_order() { - // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order. - let vec = test_order!(spawn, spawn); - let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn fifo_order() { - // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. - let vec = test_order!(spawn_fifo, spawn_fifo); - let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn lifo_fifo_order() { - // LIFO on the outside, FIFO on the inside - let vec = test_order!(spawn, spawn_fifo); - let expected: Vec<i32> = (0..10) - .rev() - .flat_map(|i| (0..10).map(move |j| i * 10 + j)) - .collect(); - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn fifo_lifo_order() { - // FIFO on the outside, LIFO on the inside - let vec = test_order!(spawn_fifo, spawn); - let expected: Vec<i32> = (0..10) - .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) - .collect(); - assert_eq!(vec, expected); -} - -macro_rules! spawn_send { - ($spawn:ident, $tx:ident, $i:expr) => {{ - let tx = $tx.clone(); - $spawn(move || tx.send($i).unwrap()); - }}; -} - -/// Test mixed spawns pushing a series of numbers, interleaved such -/// such that negative values are using the second kind of spawn. -macro_rules! test_mixed_order { - ($pos_spawn:ident, $neg_spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - let (tx, rx) = channel(); - pool.install(move || { - spawn_send!($pos_spawn, tx, 0); - spawn_send!($neg_spawn, tx, -1); - spawn_send!($pos_spawn, tx, 1); - spawn_send!($neg_spawn, tx, -2); - spawn_send!($pos_spawn, tx, 2); - spawn_send!($neg_spawn, tx, -3); - spawn_send!($pos_spawn, tx, 3); - }); - rx.iter().collect::<Vec<i32>>() - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_lifo_fifo_order() { - let vec = test_mixed_order!(spawn, spawn_fifo); - let expected = vec![3, -1, 2, -2, 1, -3, 0]; - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mixed_fifo_lifo_order() { - let vec = test_mixed_order!(spawn_fifo, spawn); - let expected = vec![0, -3, 1, -2, 2, -1, 3]; - assert_eq!(vec, expected); -} diff --git a/vendor/rayon-core/src/test.rs b/vendor/rayon-core/src/test.rs deleted file mode 100644 index 25b8487..0000000 --- a/vendor/rayon-core/src/test.rs +++ /dev/null @@ -1,200 +0,0 @@ -#![cfg(test)] - -use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier}; - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn worker_thread_index() { - let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - assert_eq!(pool.current_num_threads(), 22); - assert_eq!(pool.current_thread_index(), None); - let index = pool.install(|| pool.current_thread_index().unwrap()); - assert!(index < 22); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn start_callback_called() { - let n_threads = 16; - let n_called = Arc::new(AtomicUsize::new(0)); - // Wait for all the threads in the pool plus the one running tests. - let barrier = Arc::new(Barrier::new(n_threads + 1)); - - let b = Arc::clone(&barrier); - let nc = Arc::clone(&n_called); - let start_handler = move |_| { - nc.fetch_add(1, Ordering::SeqCst); - b.wait(); - }; - - let conf = ThreadPoolBuilder::new() - .num_threads(n_threads) - .start_handler(start_handler); - let _ = conf.build().unwrap(); - - // Wait for all the threads to have been scheduled to run. - barrier.wait(); - - // The handler must have been called on every started thread. - assert_eq!(n_called.load(Ordering::SeqCst), n_threads); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn exit_callback_called() { - let n_threads = 16; - let n_called = Arc::new(AtomicUsize::new(0)); - // Wait for all the threads in the pool plus the one running tests. - let barrier = Arc::new(Barrier::new(n_threads + 1)); - - let b = Arc::clone(&barrier); - let nc = Arc::clone(&n_called); - let exit_handler = move |_| { - nc.fetch_add(1, Ordering::SeqCst); - b.wait(); - }; - - let conf = ThreadPoolBuilder::new() - .num_threads(n_threads) - .exit_handler(exit_handler); - { - let _ = conf.build().unwrap(); - // Drop the pool so it stops the running threads. - } - - // Wait for all the threads to have been scheduled to run. - barrier.wait(); - - // The handler must have been called on every exiting thread. - assert_eq!(n_called.load(Ordering::SeqCst), n_threads); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn handler_panics_handled_correctly() { - let n_threads = 16; - let n_called = Arc::new(AtomicUsize::new(0)); - // Wait for all the threads in the pool plus the one running tests. - let start_barrier = Arc::new(Barrier::new(n_threads + 1)); - let exit_barrier = Arc::new(Barrier::new(n_threads + 1)); - - let start_handler = move |_| { - panic!("ensure panic handler is called when starting"); - }; - let exit_handler = move |_| { - panic!("ensure panic handler is called when exiting"); - }; - - let sb = Arc::clone(&start_barrier); - let eb = Arc::clone(&exit_barrier); - let nc = Arc::clone(&n_called); - let panic_handler = move |_| { - let val = nc.fetch_add(1, Ordering::SeqCst); - if val < n_threads { - sb.wait(); - } else { - eb.wait(); - } - }; - - let conf = ThreadPoolBuilder::new() - .num_threads(n_threads) - .start_handler(start_handler) - .exit_handler(exit_handler) - .panic_handler(panic_handler); - { - let _ = conf.build().unwrap(); - - // Wait for all the threads to start, panic in the start handler, - // and been taken care of by the panic handler. - start_barrier.wait(); - - // Drop the pool so it stops the running threads. - } - - // Wait for all the threads to exit, panic in the exit handler, - // and been taken care of by the panic handler. - exit_barrier.wait(); - - // The panic handler must have been called twice on every thread. - assert_eq!(n_called.load(Ordering::SeqCst), 2 * n_threads); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn check_config_build() { - let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - assert_eq!(pool.current_num_threads(), 22); -} - -/// Helper used by check_error_send_sync to ensure ThreadPoolBuildError is Send + Sync -fn _send_sync<T: Send + Sync>() {} - -#[test] -fn check_error_send_sync() { - _send_sync::<ThreadPoolBuildError>(); -} - -#[allow(deprecated)] -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn configuration() { - let start_handler = move |_| {}; - let exit_handler = move |_| {}; - let panic_handler = move |_| {}; - let thread_name = move |i| format!("thread_name_{}", i); - - // Ensure we can call all public methods on Configuration - crate::Configuration::new() - .thread_name(thread_name) - .num_threads(5) - .panic_handler(panic_handler) - .stack_size(4e6 as usize) - .breadth_first() - .start_handler(start_handler) - .exit_handler(exit_handler) - .build() - .unwrap(); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn default_pool() { - ThreadPoolBuilder::default().build().unwrap(); -} - -/// Test that custom spawned threads get their `WorkerThread` cleared once -/// the pool is done with them, allowing them to be used with rayon again -/// later. e.g. WebAssembly want to have their own pool of available threads. -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { - let n_threads = 5; - let mut handles = vec![]; - let pool = ThreadPoolBuilder::new() - .num_threads(n_threads) - .spawn_handler(|thread| { - let handle = std::thread::spawn(move || { - thread.run(); - - // Afterward, the current thread shouldn't be set anymore. - assert_eq!(crate::current_thread_index(), None); - }); - handles.push(handle); - Ok(()) - }) - .build()?; - assert_eq!(handles.len(), n_threads); - - pool.install(|| assert!(crate::current_thread_index().is_some())); - drop(pool); - - // Wait for all threads to make their assertions and exit - for handle in handles { - handle.join().unwrap(); - } - - Ok(()) -} diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs deleted file mode 100644 index c37826e..0000000 --- a/vendor/rayon-core/src/thread_pool/mod.rs +++ /dev/null @@ -1,471 +0,0 @@ -//! Contains support for user-managed thread pools, represented by the -//! the [`ThreadPool`] type (see that struct for details). -//! -//! [`ThreadPool`]: struct.ThreadPool.html - -use crate::broadcast::{self, BroadcastContext}; -use crate::join; -use crate::registry::{Registry, ThreadSpawn, WorkerThread}; -use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; -use crate::spawn; -use crate::{scope, Scope}; -use crate::{scope_fifo, ScopeFifo}; -use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; -use std::error::Error; -use std::fmt; -use std::sync::Arc; - -mod test; - -/// Represents a user created [thread-pool]. -/// -/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads -/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then -/// execute functions explicitly within this [`ThreadPool`] using -/// [`ThreadPool::install()`]. By contrast, top level rayon functions -/// (like `join()`) will execute implicitly within the current thread-pool. -/// -/// -/// ## Creating a ThreadPool -/// -/// ```rust -/// # use rayon_core as rayon; -/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); -/// ``` -/// -/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s -/// threads. In addition, any other rayon operations called inside of `install()` will also -/// execute in the context of the `ThreadPool`. -/// -/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, -/// they will complete executing any remaining work that you have spawned, and automatically -/// terminate. -/// -/// -/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool -/// [`ThreadPool`]: struct.ThreadPool.html -/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new -/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html -/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build -/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install -pub struct ThreadPool { - registry: Arc<Registry>, -} - -impl ThreadPool { - #[deprecated(note = "Use `ThreadPoolBuilder::build`")] - #[allow(deprecated)] - /// Deprecated in favor of `ThreadPoolBuilder::build`. - pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { - Self::build(configuration.into_builder()).map_err(Box::from) - } - - pub(super) fn build<S>( - builder: ThreadPoolBuilder<S>, - ) -> Result<ThreadPool, ThreadPoolBuildError> - where - S: ThreadSpawn, - { - let registry = Registry::new(builder)?; - Ok(ThreadPool { registry }) - } - - /// Executes `op` within the threadpool. Any attempts to use - /// `join`, `scope`, or parallel iterators will then operate - /// within that threadpool. - /// - /// # Warning: thread-local data - /// - /// Because `op` is executing within the Rayon thread-pool, - /// thread-local data from the current thread will not be - /// accessible. - /// - /// # Panics - /// - /// If `op` should panic, that panic will be propagated. - /// - /// ## Using `install()` - /// - /// ```rust - /// # use rayon_core as rayon; - /// fn main() { - /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); - /// let n = pool.install(|| fib(20)); - /// println!("{}", n); - /// } - /// - /// fn fib(n: usize) -> usize { - /// if n == 0 || n == 1 { - /// return n; - /// } - /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` - /// return a + b; - /// } - /// ``` - pub fn install<OP, R>(&self, op: OP) -> R - where - OP: FnOnce() -> R + Send, - R: Send, - { - self.registry.in_worker(|_, _| op()) - } - - /// Executes `op` within every thread in the threadpool. Any attempts to use - /// `join`, `scope`, or parallel iterators will then operate within that - /// threadpool. - /// - /// Broadcasts are executed on each thread after they have exhausted their - /// local work queue, before they attempt work-stealing from other threads. - /// The goal of that strategy is to run everywhere in a timely manner - /// *without* being too disruptive to current work. There may be alternative - /// broadcast styles added in the future for more or less aggressive - /// injection, if the need arises. - /// - /// # Warning: thread-local data - /// - /// Because `op` is executing within the Rayon thread-pool, - /// thread-local data from the current thread will not be - /// accessible. - /// - /// # Panics - /// - /// If `op` should panic on one or more threads, exactly one panic - /// will be propagated, only after all threads have completed - /// (or panicked) their own `op`. - /// - /// # Examples - /// - /// ``` - /// # use rayon_core as rayon; - /// use std::sync::atomic::{AtomicUsize, Ordering}; - /// - /// fn main() { - /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); - /// - /// // The argument gives context, including the index of each thread. - /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); - /// assert_eq!(v, &[0, 1, 4, 9, 16]); - /// - /// // The closure can reference the local stack - /// let count = AtomicUsize::new(0); - /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); - /// assert_eq!(count.into_inner(), 5); - /// } - /// ``` - pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> - where - OP: Fn(BroadcastContext<'_>) -> R + Sync, - R: Send, - { - // We assert that `self.registry` has not terminated. - unsafe { broadcast::broadcast_in(op, &self.registry) } - } - - /// Returns the (current) number of threads in the thread pool. - /// - /// # Future compatibility note - /// - /// Note that unless this thread-pool was created with a - /// [`ThreadPoolBuilder`] that specifies the number of threads, - /// then this number may vary over time in future versions (see [the - /// `num_threads()` method for details][snt]). - /// - /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads - /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html - #[inline] - pub fn current_num_threads(&self) -> usize { - self.registry.num_threads() - } - - /// If called from a Rayon worker thread in this thread-pool, - /// returns the index of that thread; if not called from a Rayon - /// thread, or called from a Rayon thread that belongs to a - /// different thread-pool, returns `None`. - /// - /// The index for a given thread will not change over the thread's - /// lifetime. However, multiple threads may share the same index if - /// they are in distinct thread-pools. - /// - /// # Future compatibility note - /// - /// Currently, every thread-pool (including the global - /// thread-pool) has a fixed number of threads, but this may - /// change in future Rayon versions (see [the `num_threads()` method - /// for details][snt]). In that case, the index for a - /// thread would not change during its lifetime, but thread - /// indices may wind up being reused if threads are terminated and - /// restarted. - /// - /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads - #[inline] - pub fn current_thread_index(&self) -> Option<usize> { - let curr = self.registry.current_thread()?; - Some(curr.index()) - } - - /// Returns true if the current worker thread currently has "local - /// tasks" pending. This can be useful as part of a heuristic for - /// deciding whether to spawn a new task or execute code on the - /// current thread, particularly in breadth-first - /// schedulers. However, keep in mind that this is an inherently - /// racy check, as other worker threads may be actively "stealing" - /// tasks from our local deque. - /// - /// **Background:** Rayon's uses a [work-stealing] scheduler. The - /// key idea is that each thread has its own [deque] of - /// tasks. Whenever a new task is spawned -- whether through - /// `join()`, `Scope::spawn()`, or some other means -- that new - /// task is pushed onto the thread's *local* deque. Worker threads - /// have a preference for executing their own tasks; if however - /// they run out of tasks, they will go try to "steal" tasks from - /// other threads. This function therefore has an inherent race - /// with other active worker threads, which may be removing items - /// from the local deque. - /// - /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing - /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue - #[inline] - pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { - let curr = self.registry.current_thread()?; - Some(!curr.local_deque_is_empty()) - } - - /// Execute `oper_a` and `oper_b` in the thread-pool and return - /// the results. Equivalent to `self.install(|| join(oper_a, - /// oper_b))`. - pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) - where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, - { - self.install(|| join(oper_a, oper_b)) - } - - /// Creates a scope that executes within this thread-pool. - /// Equivalent to `self.install(|| scope(...))`. - /// - /// See also: [the `scope()` function][scope]. - /// - /// [scope]: fn.scope.html - pub fn scope<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&Scope<'scope>) -> R + Send, - R: Send, - { - self.install(|| scope(op)) - } - - /// Creates a scope that executes within this thread-pool. - /// Spawns from the same thread are prioritized in relative FIFO order. - /// Equivalent to `self.install(|| scope_fifo(...))`. - /// - /// See also: [the `scope_fifo()` function][scope_fifo]. - /// - /// [scope_fifo]: fn.scope_fifo.html - pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, - R: Send, - { - self.install(|| scope_fifo(op)) - } - - /// Creates a scope that spawns work into this thread-pool. - /// - /// See also: [the `in_place_scope()` function][in_place_scope]. - /// - /// [in_place_scope]: fn.in_place_scope.html - pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&Scope<'scope>) -> R, - { - do_in_place_scope(Some(&self.registry), op) - } - - /// Creates a scope that spawns work into this thread-pool in FIFO order. - /// - /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. - /// - /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html - pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&ScopeFifo<'scope>) -> R, - { - do_in_place_scope_fifo(Some(&self.registry), op) - } - - /// Spawns an asynchronous task in this thread-pool. This task will - /// run in the implicit, global scope, which means that it may outlast - /// the current stack frame -- therefore, it cannot capture any references - /// onto the stack (you will likely need a `move` closure). - /// - /// See also: [the `spawn()` function defined on scopes][spawn]. - /// - /// [spawn]: struct.Scope.html#method.spawn - pub fn spawn<OP>(&self, op: OP) - where - OP: FnOnce() + Send + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { spawn::spawn_in(op, &self.registry) } - } - - /// Spawns an asynchronous task in this thread-pool. This task will - /// run in the implicit, global scope, which means that it may outlast - /// the current stack frame -- therefore, it cannot capture any references - /// onto the stack (you will likely need a `move` closure). - /// - /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. - /// - /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo - pub fn spawn_fifo<OP>(&self, op: OP) - where - OP: FnOnce() + Send + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { spawn::spawn_fifo_in(op, &self.registry) } - } - - /// Spawns an asynchronous task on every thread in this thread-pool. This task - /// will run in the implicit, global scope, which means that it may outlast the - /// current stack frame -- therefore, it cannot capture any references onto the - /// stack (you will likely need a `move` closure). - pub fn spawn_broadcast<OP>(&self, op: OP) - where - OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } - } - - /// Cooperatively yields execution to Rayon. - /// - /// This is similar to the general [`yield_now()`], but only if the current - /// thread is part of *this* thread pool. - /// - /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if - /// nothing was available, or `None` if the current thread is not part this pool. - pub fn yield_now(&self) -> Option<Yield> { - let curr = self.registry.current_thread()?; - Some(curr.yield_now()) - } - - /// Cooperatively yields execution to local Rayon work. - /// - /// This is similar to the general [`yield_local()`], but only if the current - /// thread is part of *this* thread pool. - /// - /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if - /// nothing was available, or `None` if the current thread is not part this pool. - pub fn yield_local(&self) -> Option<Yield> { - let curr = self.registry.current_thread()?; - Some(curr.yield_local()) - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - self.registry.terminate(); - } -} - -impl fmt::Debug for ThreadPool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ThreadPool") - .field("num_threads", &self.current_num_threads()) - .field("id", &self.registry.id()) - .finish() - } -} - -/// If called from a Rayon worker thread, returns the index of that -/// thread within its current pool; if not called from a Rayon thread, -/// returns `None`. -/// -/// The index for a given thread will not change over the thread's -/// lifetime. However, multiple threads may share the same index if -/// they are in distinct thread-pools. -/// -/// See also: [the `ThreadPool::current_thread_index()` method]. -/// -/// [m]: struct.ThreadPool.html#method.current_thread_index -/// -/// # Future compatibility note -/// -/// Currently, every thread-pool (including the global -/// thread-pool) has a fixed number of threads, but this may -/// change in future Rayon versions (see [the `num_threads()` method -/// for details][snt]). In that case, the index for a -/// thread would not change during its lifetime, but thread -/// indices may wind up being reused if threads are terminated and -/// restarted. -/// -/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads -#[inline] -pub fn current_thread_index() -> Option<usize> { - unsafe { - let curr = WorkerThread::current().as_ref()?; - Some(curr.index()) - } -} - -/// If called from a Rayon worker thread, indicates whether that -/// thread's local deque still has pending tasks. Otherwise, returns -/// `None`. For more information, see [the -/// `ThreadPool::current_thread_has_pending_tasks()` method][m]. -/// -/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks -#[inline] -pub fn current_thread_has_pending_tasks() -> Option<bool> { - unsafe { - let curr = WorkerThread::current().as_ref()?; - Some(!curr.local_deque_is_empty()) - } -} - -/// Cooperatively yields execution to Rayon. -/// -/// If the current thread is part of a rayon thread pool, this looks for a -/// single unit of pending work in the pool, then executes it. Completion of -/// that work might include nested work or further work stealing. -/// -/// This is similar to [`std::thread::yield_now()`], but does not literally make -/// that call. If you are implementing a polling loop, you may want to also -/// yield to the OS scheduler yourself if no Rayon work was found. -/// -/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if -/// nothing was available, or `None` if this thread is not part of any pool at all. -pub fn yield_now() -> Option<Yield> { - unsafe { - let thread = WorkerThread::current().as_ref()?; - Some(thread.yield_now()) - } -} - -/// Cooperatively yields execution to local Rayon work. -/// -/// If the current thread is part of a rayon thread pool, this looks for a -/// single unit of pending work in this thread's queue, then executes it. -/// Completion of that work might include nested work or further work stealing. -/// -/// This is similar to [`yield_now()`], but does not steal from other threads. -/// -/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if -/// nothing was available, or `None` if this thread is not part of any pool at all. -pub fn yield_local() -> Option<Yield> { - unsafe { - let thread = WorkerThread::current().as_ref()?; - Some(thread.yield_local()) - } -} - -/// Result of [`yield_now()`] or [`yield_local()`]. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Yield { - /// Work was found and executed. - Executed, - /// No available work was found. - Idle, -} diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs deleted file mode 100644 index 88b3628..0000000 --- a/vendor/rayon-core/src/thread_pool/test.rs +++ /dev/null @@ -1,418 +0,0 @@ -#![cfg(test)] - -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; - -use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate() { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - thread_pool.install(|| { - panic!("Hello, world!"); - }); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn workers_stop() { - let registry; - - { - // once we exit this block, thread-pool will be dropped - let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - registry = thread_pool.install(|| { - // do some work on these threads - join_a_lot(22); - - Arc::clone(&thread_pool.registry) - }); - assert_eq!(registry.num_threads(), 22); - } - - // once thread-pool is dropped, registry should terminate, which - // should lead to worker threads stopping - registry.wait_until_stopped(); -} - -fn join_a_lot(n: usize) { - if n > 0 { - join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn sleeper_stop() { - use std::{thread, time}; - - let registry; - - { - // once we exit this block, thread-pool will be dropped - let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - registry = Arc::clone(&thread_pool.registry); - - // Give time for at least some of the thread pool to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - } - - // once thread-pool is dropped, registry should terminate, which - // should lead to worker threads stopping - registry.wait_until_stopped(); -} - -/// Creates a start/exit handler that increments an atomic counter. -fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) { - let count = Arc::new(AtomicUsize::new(0)); - (Arc::clone(&count), move |_| { - count.fetch_add(1, Ordering::SeqCst); - }) -} - -/// Wait until a counter is no longer shared, then return its value. -fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { - use std::{thread, time}; - - for _ in 0..60 { - counter = match Arc::try_unwrap(counter) { - Ok(counter) => return counter.into_inner(), - Err(counter) => { - thread::sleep(time::Duration::from_secs(1)); - counter - } - }; - } - - // That's too long! - panic!("Counter is still shared!"); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn failed_thread_stack() { - // Note: we first tried to force failure with a `usize::MAX` stack, but - // macOS and Windows weren't fazed, or at least didn't fail the way we want. - // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a - // 2GB stack, so it might not fail until the second thread. - let stack_size = ::std::isize::MAX as usize; - - let (start_count, start_handler) = count_handler(); - let (exit_count, exit_handler) = count_handler(); - let builder = ThreadPoolBuilder::new() - .num_threads(10) - .stack_size(stack_size) - .start_handler(start_handler) - .exit_handler(exit_handler); - - let pool = builder.build(); - assert!(pool.is_err(), "thread stack should have failed!"); - - // With such a huge stack, 64-bit will probably fail on the first thread; - // 32-bit might manage the first 2GB, but certainly fail the second. - let start_count = wait_for_counter(start_count); - assert!(start_count <= 1); - assert_eq!(start_count, wait_for_counter(exit_count)); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_thread_name() { - let (start_count, start_handler) = count_handler(); - let (exit_count, exit_handler) = count_handler(); - let builder = ThreadPoolBuilder::new() - .num_threads(10) - .start_handler(start_handler) - .exit_handler(exit_handler) - .thread_name(|i| { - if i >= 5 { - panic!(); - } - format!("panic_thread_name#{}", i) - }); - - let pool = crate::unwind::halt_unwinding(|| builder.build()); - assert!(pool.is_err(), "thread-name panic should propagate!"); - - // Assuming they're created in order, threads 0 through 4 should have - // been started already, and then terminated by the panic. - assert_eq!(5, wait_for_counter(start_count)); - assert_eq!(5, wait_for_counter(exit_count)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn self_install() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - // If the inner `install` blocks, then nothing will actually run it! - assert!(pool.install(|| pool.install(|| true))); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mutual_install() { - let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - let ok = pool1.install(|| { - // This creates a dependency from `pool1` -> `pool2` - pool2.install(|| { - // This creates a dependency from `pool2` -> `pool1` - pool1.install(|| { - // If they blocked on inter-pool installs, there would be no - // threads left to run this! - true - }) - }) - }); - assert!(ok); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mutual_install_sleepy() { - use std::{thread, time}; - - let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - let ok = pool1.install(|| { - // This creates a dependency from `pool1` -> `pool2` - pool2.install(|| { - // Give `pool1` time to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - - // This creates a dependency from `pool2` -> `pool1` - pool1.install(|| { - // Give `pool2` time to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - - // If they blocked on inter-pool installs, there would be no - // threads left to run this! - true - }) - }) - }); - assert!(ok); -} - -#[test] -#[allow(deprecated)] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn check_thread_pool_new() { - let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); - assert_eq!(pool.current_num_threads(), 22); -} - -macro_rules! test_scope_order { - ($scope:ident => $spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let vec = Mutex::new(vec![]); - pool.$scope(|scope| { - let vec = &vec; - for i in 0..10 { - scope.$spawn(move |_| { - vec.lock().unwrap().push(i); - }); - } - }); - vec.into_inner().unwrap() - }) - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_lifo_order() { - let vec = test_scope_order!(scope => spawn); - let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_fifo_order() { - let vec = test_scope_order!(scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -macro_rules! test_spawn_order { - ($spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = &builder.build().unwrap(); - let (tx, rx) = channel(); - pool.install(move || { - for i in 0..10 { - let tx = tx.clone(); - pool.$spawn(move || { - tx.send(i).unwrap(); - }); - } - }); - rx.iter().collect::<Vec<i32>>() - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_lifo_order() { - let vec = test_spawn_order!(spawn); - let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_fifo_order() { - let vec = test_spawn_order!(spawn_fifo); - let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_scopes() { - // Create matching scopes for every thread pool. - fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) - where - OP: FnOnce(&[&Scope<'scope>]) + Send, - { - if let Some((pool, tail)) = pools.split_first() { - pool.scope(move |s| { - // This move reduces the reference lifetimes by variance to match s, - // but the actual scopes are still tied to the invariant 'scope. - let mut scopes = scopes; - scopes.push(s); - nest(tail, scopes, op) - }) - } else { - (op)(&scopes) - } - } - - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); - - let counter = AtomicUsize::new(0); - nest(&pools, vec![], |scopes| { - for &s in scopes { - s.spawn(|_| { - // Our 'scope lets us borrow the counter in every pool. - counter.fetch_add(1, Ordering::Relaxed); - }); - } - }); - assert_eq!(counter.into_inner(), pools.len()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_fifo_scopes() { - // Create matching fifo scopes for every thread pool. - fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) - where - OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, - { - if let Some((pool, tail)) = pools.split_first() { - pool.scope_fifo(move |s| { - // This move reduces the reference lifetimes by variance to match s, - // but the actual scopes are still tied to the invariant 'scope. - let mut scopes = scopes; - scopes.push(s); - nest(tail, scopes, op) - }) - } else { - (op)(&scopes) - } - } - - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); - - let counter = AtomicUsize::new(0); - nest(&pools, vec![], |scopes| { - for &s in scopes { - s.spawn_fifo(|_| { - // Our 'scope lets us borrow the counter in every pool. - counter.fetch_add(1, Ordering::Relaxed); - }); - } - }); - assert_eq!(counter.into_inner(), pools.len()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn in_place_scope_no_deadlock() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (tx, rx) = channel(); - let rx_ref = ℞ - pool.in_place_scope(move |s| { - // With regular scopes this closure would never run because this scope op - // itself would block the only worker thread. - s.spawn(move |_| { - tx.send(()).unwrap(); - }); - rx_ref.recv().unwrap(); - }); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn in_place_scope_fifo_no_deadlock() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (tx, rx) = channel(); - let rx_ref = ℞ - pool.in_place_scope_fifo(move |s| { - // With regular scopes this closure would never run because this scope op - // itself would block the only worker thread. - s.spawn_fifo(move |_| { - tx.send(()).unwrap(); - }); - rx_ref.recv().unwrap(); - }); -} - -#[test] -fn yield_now_to_spawn() { - let (tx, rx) = channel(); - - // Queue a regular spawn. - crate::spawn(move || tx.send(22).unwrap()); - - // The single-threaded fallback mode (for wasm etc.) won't - // get a chance to run the spawn if we never yield to it. - crate::registry::in_worker(move |_, _| { - crate::yield_now(); - }); - - // The spawn **must** have started by now, but we still might have to wait - // for it to finish if a different thread stole it first. - assert_eq!(22, rx.recv().unwrap()); -} - -#[test] -fn yield_local_to_spawn() { - let (tx, rx) = channel(); - - // Queue a regular spawn. - crate::spawn(move || tx.send(22).unwrap()); - - // The single-threaded fallback mode (for wasm etc.) won't - // get a chance to run the spawn if we never yield to it. - crate::registry::in_worker(move |_, _| { - crate::yield_local(); - }); - - // The spawn **must** have started by now, but we still might have to wait - // for it to finish if a different thread stole it first. - assert_eq!(22, rx.recv().unwrap()); -} diff --git a/vendor/rayon-core/src/unwind.rs b/vendor/rayon-core/src/unwind.rs deleted file mode 100644 index 9671fa5..0000000 --- a/vendor/rayon-core/src/unwind.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! Package up unwind recovery. Note that if you are in some sensitive -//! place, you can use the `AbortIfPanic` helper to protect against -//! accidental panics in the rayon code itself. - -use std::any::Any; -use std::panic::{self, AssertUnwindSafe}; -use std::thread; - -/// Executes `f` and captures any panic, translating that panic into a -/// `Err` result. The assumption is that any panic will be propagated -/// later with `resume_unwinding`, and hence `f` can be treated as -/// exception safe. -pub(super) fn halt_unwinding<F, R>(func: F) -> thread::Result<R> -where - F: FnOnce() -> R, -{ - panic::catch_unwind(AssertUnwindSafe(func)) -} - -pub(super) fn resume_unwinding(payload: Box<dyn Any + Send>) -> ! { - panic::resume_unwind(payload) -} - -pub(super) struct AbortIfPanic; - -impl Drop for AbortIfPanic { - fn drop(&mut self) { - eprintln!("Rayon: detected unexpected panic; aborting"); - ::std::process::abort(); - } -} |