aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src')
-rw-r--r--vendor/rayon-core/src/broadcast/mod.rs150
-rw-r--r--vendor/rayon-core/src/broadcast/test.rs263
-rw-r--r--vendor/rayon-core/src/compile_fail/mod.rs7
-rw-r--r--vendor/rayon-core/src/compile_fail/quicksort_race1.rs28
-rw-r--r--vendor/rayon-core/src/compile_fail/quicksort_race2.rs28
-rw-r--r--vendor/rayon-core/src/compile_fail/quicksort_race3.rs28
-rw-r--r--vendor/rayon-core/src/compile_fail/rc_return.rs17
-rw-r--r--vendor/rayon-core/src/compile_fail/rc_upvar.rs9
-rw-r--r--vendor/rayon-core/src/compile_fail/scope_join_bad.rs24
-rw-r--r--vendor/rayon-core/src/job.rs270
-rw-r--r--vendor/rayon-core/src/join/mod.rs188
-rw-r--r--vendor/rayon-core/src/join/test.rs151
-rw-r--r--vendor/rayon-core/src/latch.rs460
-rw-r--r--vendor/rayon-core/src/lib.rs869
-rw-r--r--vendor/rayon-core/src/private.rs26
-rw-r--r--vendor/rayon-core/src/registry.rs995
-rw-r--r--vendor/rayon-core/src/scope/mod.rs769
-rw-r--r--vendor/rayon-core/src/scope/test.rs619
-rw-r--r--vendor/rayon-core/src/sleep/README.md219
-rw-r--r--vendor/rayon-core/src/sleep/counters.rs277
-rw-r--r--vendor/rayon-core/src/sleep/mod.rs325
-rw-r--r--vendor/rayon-core/src/spawn/mod.rs163
-rw-r--r--vendor/rayon-core/src/spawn/test.rs255
-rw-r--r--vendor/rayon-core/src/test.rs200
-rw-r--r--vendor/rayon-core/src/thread_pool/mod.rs471
-rw-r--r--vendor/rayon-core/src/thread_pool/test.rs418
-rw-r--r--vendor/rayon-core/src/unwind.rs31
27 files changed, 0 insertions, 7260 deletions
diff --git a/vendor/rayon-core/src/broadcast/mod.rs b/vendor/rayon-core/src/broadcast/mod.rs
deleted file mode 100644
index 96611e4..0000000
--- a/vendor/rayon-core/src/broadcast/mod.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-use crate::job::{ArcJob, StackJob};
-use crate::latch::{CountLatch, LatchRef};
-use crate::registry::{Registry, WorkerThread};
-use std::fmt;
-use std::marker::PhantomData;
-use std::sync::Arc;
-
-mod test;
-
-/// Executes `op` within every thread in the current threadpool. If this is
-/// called from a non-Rayon thread, it will execute in the global threadpool.
-/// Any attempts to use `join`, `scope`, or parallel iterators will then operate
-/// within that threadpool. When the call has completed on each thread, returns
-/// a vector containing all of their return values.
-///
-/// For more information, see the [`ThreadPool::broadcast()`][m] method.
-///
-/// [m]: struct.ThreadPool.html#method.broadcast
-pub fn broadcast<OP, R>(op: OP) -> Vec<R>
-where
- OP: Fn(BroadcastContext<'_>) -> R + Sync,
- R: Send,
-{
- // We assert that current registry has not terminated.
- unsafe { broadcast_in(op, &Registry::current()) }
-}
-
-/// Spawns an asynchronous task on every thread in this thread-pool. This task
-/// will run in the implicit, global scope, which means that it may outlast the
-/// current stack frame -- therefore, it cannot capture any references onto the
-/// stack (you will likely need a `move` closure).
-///
-/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method.
-///
-/// [m]: struct.ThreadPool.html#method.spawn_broadcast
-pub fn spawn_broadcast<OP>(op: OP)
-where
- OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
-{
- // We assert that current registry has not terminated.
- unsafe { spawn_broadcast_in(op, &Registry::current()) }
-}
-
-/// Provides context to a closure called by `broadcast`.
-pub struct BroadcastContext<'a> {
- worker: &'a WorkerThread,
-
- /// Make sure to prevent auto-traits like `Send` and `Sync`.
- _marker: PhantomData<&'a mut dyn Fn()>,
-}
-
-impl<'a> BroadcastContext<'a> {
- pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
- let worker_thread = WorkerThread::current();
- assert!(!worker_thread.is_null());
- f(BroadcastContext {
- worker: unsafe { &*worker_thread },
- _marker: PhantomData,
- })
- }
-
- /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
- #[inline]
- pub fn index(&self) -> usize {
- self.worker.index()
- }
-
- /// The number of threads receiving the broadcast in the thread pool.
- ///
- /// # Future compatibility note
- ///
- /// Future versions of Rayon might vary the number of threads over time, but
- /// this method will always return the number of threads which are actually
- /// receiving your particular `broadcast` call.
- #[inline]
- pub fn num_threads(&self) -> usize {
- self.worker.registry().num_threads()
- }
-}
-
-impl<'a> fmt::Debug for BroadcastContext<'a> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("BroadcastContext")
- .field("index", &self.index())
- .field("num_threads", &self.num_threads())
- .field("pool_id", &self.worker.registry().id())
- .finish()
- }
-}
-
-/// Execute `op` on every thread in the pool. It will be executed on each
-/// thread when they have nothing else to do locally, before they try to
-/// steal work from other threads. This function will not return until all
-/// threads have completed the `op`.
-///
-/// Unsafe because `registry` must not yet have terminated.
-pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R>
-where
- OP: Fn(BroadcastContext<'_>) -> R + Sync,
- R: Send,
-{
- let f = move |injected: bool| {
- debug_assert!(injected);
- BroadcastContext::with(&op)
- };
-
- let n_threads = registry.num_threads();
- let current_thread = WorkerThread::current().as_ref();
- let latch = CountLatch::with_count(n_threads, current_thread);
- let jobs: Vec<_> = (0..n_threads)
- .map(|_| StackJob::new(&f, LatchRef::new(&latch)))
- .collect();
- let job_refs = jobs.iter().map(|job| job.as_job_ref());
-
- registry.inject_broadcast(job_refs);
-
- // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
- latch.wait(current_thread);
- jobs.into_iter().map(|job| job.into_result()).collect()
-}
-
-/// Execute `op` on every thread in the pool. It will be executed on each
-/// thread when they have nothing else to do locally, before they try to
-/// steal work from other threads. This function returns immediately after
-/// injecting the jobs.
-///
-/// Unsafe because `registry` must not yet have terminated.
-pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>)
-where
- OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
-{
- let job = ArcJob::new({
- let registry = Arc::clone(registry);
- move || {
- registry.catch_unwind(|| BroadcastContext::with(&op));
- registry.terminate(); // (*) permit registry to terminate now
- }
- });
-
- let n_threads = registry.num_threads();
- let job_refs = (0..n_threads).map(|_| {
- // Ensure that registry cannot terminate until this job has executed
- // on each thread. This ref is decremented at the (*) above.
- registry.increment_terminate_count();
-
- ArcJob::as_static_job_ref(&job)
- });
-
- registry.inject_broadcast(job_refs);
-}
diff --git a/vendor/rayon-core/src/broadcast/test.rs b/vendor/rayon-core/src/broadcast/test.rs
deleted file mode 100644
index 00ab4ad..0000000
--- a/vendor/rayon-core/src/broadcast/test.rs
+++ /dev/null
@@ -1,263 +0,0 @@
-#![cfg(test)]
-
-use crate::ThreadPoolBuilder;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::channel;
-use std::sync::Arc;
-use std::{thread, time};
-
-#[test]
-fn broadcast_global() {
- let v = crate::broadcast(|ctx| ctx.index());
- assert!(v.into_iter().eq(0..crate::current_num_threads()));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_broadcast_global() {
- let (tx, rx) = channel();
- crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
-
- let mut v: Vec<_> = rx.into_iter().collect();
- v.sort_unstable();
- assert!(v.into_iter().eq(0..crate::current_num_threads()));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn broadcast_pool() {
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let v = pool.broadcast(|ctx| ctx.index());
- assert!(v.into_iter().eq(0..7));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_broadcast_pool() {
- let (tx, rx) = channel();
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
-
- let mut v: Vec<_> = rx.into_iter().collect();
- v.sort_unstable();
- assert!(v.into_iter().eq(0..7));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn broadcast_self() {
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
- assert!(v.into_iter().eq(0..7));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_broadcast_self() {
- let (tx, rx) = channel();
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
-
- let mut v: Vec<_> = rx.into_iter().collect();
- v.sort_unstable();
- assert!(v.into_iter().eq(0..7));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn broadcast_mutual() {
- let count = AtomicUsize::new(0);
- let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
- let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool1.install(|| {
- pool2.broadcast(|_| {
- pool1.broadcast(|_| {
- count.fetch_add(1, Ordering::Relaxed);
- })
- })
- });
- assert_eq!(count.into_inner(), 3 * 7);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_broadcast_mutual() {
- let (tx, rx) = channel();
- let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
- let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool1.spawn({
- let pool1 = Arc::clone(&pool1);
- move || {
- pool2.spawn_broadcast(move |_| {
- let tx = tx.clone();
- pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
- })
- }
- });
- assert_eq!(rx.into_iter().count(), 3 * 7);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn broadcast_mutual_sleepy() {
- let count = AtomicUsize::new(0);
- let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
- let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool1.install(|| {
- thread::sleep(time::Duration::from_secs(1));
- pool2.broadcast(|_| {
- thread::sleep(time::Duration::from_secs(1));
- pool1.broadcast(|_| {
- thread::sleep(time::Duration::from_millis(100));
- count.fetch_add(1, Ordering::Relaxed);
- })
- })
- });
- assert_eq!(count.into_inner(), 3 * 7);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_broadcast_mutual_sleepy() {
- let (tx, rx) = channel();
- let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
- let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool1.spawn({
- let pool1 = Arc::clone(&pool1);
- move || {
- thread::sleep(time::Duration::from_secs(1));
- pool2.spawn_broadcast(move |_| {
- let tx = tx.clone();
- thread::sleep(time::Duration::from_secs(1));
- pool1.spawn_broadcast(move |_| {
- thread::sleep(time::Duration::from_millis(100));
- tx.send(()).unwrap();
- })
- })
- }
- });
- assert_eq!(rx.into_iter().count(), 3 * 7);
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn broadcast_panic_one() {
- let count = AtomicUsize::new(0);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let result = crate::unwind::halt_unwinding(|| {
- pool.broadcast(|ctx| {
- count.fetch_add(1, Ordering::Relaxed);
- if ctx.index() == 3 {
- panic!("Hello, world!");
- }
- })
- });
- assert_eq!(count.into_inner(), 7);
- assert!(result.is_err(), "broadcast panic should propagate!");
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn spawn_broadcast_panic_one() {
- let (tx, rx) = channel();
- let (panic_tx, panic_rx) = channel();
- let pool = ThreadPoolBuilder::new()
- .num_threads(7)
- .panic_handler(move |e| panic_tx.send(e).unwrap())
- .build()
- .unwrap();
- pool.spawn_broadcast(move |ctx| {
- tx.send(()).unwrap();
- if ctx.index() == 3 {
- panic!("Hello, world!");
- }
- });
- drop(pool); // including panic_tx
- assert_eq!(rx.into_iter().count(), 7);
- assert_eq!(panic_rx.into_iter().count(), 1);
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn broadcast_panic_many() {
- let count = AtomicUsize::new(0);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let result = crate::unwind::halt_unwinding(|| {
- pool.broadcast(|ctx| {
- count.fetch_add(1, Ordering::Relaxed);
- if ctx.index() % 2 == 0 {
- panic!("Hello, world!");
- }
- })
- });
- assert_eq!(count.into_inner(), 7);
- assert!(result.is_err(), "broadcast panic should propagate!");
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn spawn_broadcast_panic_many() {
- let (tx, rx) = channel();
- let (panic_tx, panic_rx) = channel();
- let pool = ThreadPoolBuilder::new()
- .num_threads(7)
- .panic_handler(move |e| panic_tx.send(e).unwrap())
- .build()
- .unwrap();
- pool.spawn_broadcast(move |ctx| {
- tx.send(()).unwrap();
- if ctx.index() % 2 == 0 {
- panic!("Hello, world!");
- }
- });
- drop(pool); // including panic_tx
- assert_eq!(rx.into_iter().count(), 7);
- assert_eq!(panic_rx.into_iter().count(), 4);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn broadcast_sleep_race() {
- let test_duration = time::Duration::from_secs(1);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let start = time::Instant::now();
- while start.elapsed() < test_duration {
- pool.broadcast(|ctx| {
- // A slight spread of sleep duration increases the chance that one
- // of the threads will race in the pool's idle sleep afterward.
- thread::sleep(time::Duration::from_micros(ctx.index() as u64));
- });
- }
-}
-
-#[test]
-fn broadcast_after_spawn_broadcast() {
- let (tx, rx) = channel();
-
- // Queue a non-blocking spawn_broadcast.
- crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
-
- // This blocking broadcast runs after all prior broadcasts.
- crate::broadcast(|_| {});
-
- // The spawn_broadcast **must** have run by now on all threads.
- let mut v: Vec<_> = rx.try_iter().collect();
- v.sort_unstable();
- assert!(v.into_iter().eq(0..crate::current_num_threads()));
-}
-
-#[test]
-fn broadcast_after_spawn() {
- let (tx, rx) = channel();
-
- // Queue a regular spawn on a thread-local deque.
- crate::registry::in_worker(move |_, _| {
- crate::spawn(move || tx.send(22).unwrap());
- });
-
- // Broadcast runs after the local deque is empty.
- crate::broadcast(|_| {});
-
- // The spawn **must** have run by now.
- assert_eq!(22, rx.try_recv().unwrap());
-}
diff --git a/vendor/rayon-core/src/compile_fail/mod.rs b/vendor/rayon-core/src/compile_fail/mod.rs
deleted file mode 100644
index f2ec646..0000000
--- a/vendor/rayon-core/src/compile_fail/mod.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-// These modules contain `compile_fail` doc tests.
-mod quicksort_race1;
-mod quicksort_race2;
-mod quicksort_race3;
-mod rc_return;
-mod rc_upvar;
-mod scope_join_bad;
diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race1.rs b/vendor/rayon-core/src/compile_fail/quicksort_race1.rs
deleted file mode 100644
index 5615033..0000000
--- a/vendor/rayon-core/src/compile_fail/quicksort_race1.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*! ```compile_fail,E0524
-
-fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
- if v.len() <= 1 {
- return;
- }
-
- let mid = partition(v);
- let (lo, _hi) = v.split_at_mut(mid);
- rayon_core::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR
-}
-
-fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
- let pivot = v.len() - 1;
- let mut i = 0;
- for j in 0..pivot {
- if v[j] <= v[pivot] {
- v.swap(i, j);
- i += 1;
- }
- }
- v.swap(i, pivot);
- i
-}
-
-fn main() { }
-
-``` */
diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race2.rs b/vendor/rayon-core/src/compile_fail/quicksort_race2.rs
deleted file mode 100644
index 020589c..0000000
--- a/vendor/rayon-core/src/compile_fail/quicksort_race2.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*! ```compile_fail,E0500
-
-fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
- if v.len() <= 1 {
- return;
- }
-
- let mid = partition(v);
- let (lo, _hi) = v.split_at_mut(mid);
- rayon_core::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR
-}
-
-fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
- let pivot = v.len() - 1;
- let mut i = 0;
- for j in 0..pivot {
- if v[j] <= v[pivot] {
- v.swap(i, j);
- i += 1;
- }
- }
- v.swap(i, pivot);
- i
-}
-
-fn main() { }
-
-``` */
diff --git a/vendor/rayon-core/src/compile_fail/quicksort_race3.rs b/vendor/rayon-core/src/compile_fail/quicksort_race3.rs
deleted file mode 100644
index 16fbf3b..0000000
--- a/vendor/rayon-core/src/compile_fail/quicksort_race3.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*! ```compile_fail,E0524
-
-fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
- if v.len() <= 1 {
- return;
- }
-
- let mid = partition(v);
- let (_lo, hi) = v.split_at_mut(mid);
- rayon_core::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR
-}
-
-fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
- let pivot = v.len() - 1;
- let mut i = 0;
- for j in 0..pivot {
- if v[j] <= v[pivot] {
- v.swap(i, j);
- i += 1;
- }
- }
- v.swap(i, pivot);
- i
-}
-
-fn main() { }
-
-``` */
diff --git a/vendor/rayon-core/src/compile_fail/rc_return.rs b/vendor/rayon-core/src/compile_fail/rc_return.rs
deleted file mode 100644
index 93e3a60..0000000
--- a/vendor/rayon-core/src/compile_fail/rc_return.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-/** ```compile_fail,E0277
-
-use std::rc::Rc;
-
-rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
-
-``` */
-mod left {}
-
-/** ```compile_fail,E0277
-
-use std::rc::Rc;
-
-rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
-
-``` */
-mod right {}
diff --git a/vendor/rayon-core/src/compile_fail/rc_upvar.rs b/vendor/rayon-core/src/compile_fail/rc_upvar.rs
deleted file mode 100644
index d8aebcf..0000000
--- a/vendor/rayon-core/src/compile_fail/rc_upvar.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-/*! ```compile_fail,E0277
-
-use std::rc::Rc;
-
-let r = Rc::new(22);
-rayon_core::join(|| r.clone(), || r.clone());
-//~^ ERROR
-
-``` */
diff --git a/vendor/rayon-core/src/compile_fail/scope_join_bad.rs b/vendor/rayon-core/src/compile_fail/scope_join_bad.rs
deleted file mode 100644
index 75e4c5c..0000000
--- a/vendor/rayon-core/src/compile_fail/scope_join_bad.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-/*! ```compile_fail,E0373
-
-fn bad_scope<F>(f: F)
- where F: FnOnce(&i32) + Send,
-{
- rayon_core::scope(|s| {
- let x = 22;
- s.spawn(|_| f(&x)); //~ ERROR `x` does not live long enough
- });
-}
-
-fn good_scope<F>(f: F)
- where F: FnOnce(&i32) + Send,
-{
- let x = 22;
- rayon_core::scope(|s| {
- s.spawn(|_| f(&x));
- });
-}
-
-fn main() {
-}
-
-``` */
diff --git a/vendor/rayon-core/src/job.rs b/vendor/rayon-core/src/job.rs
deleted file mode 100644
index 5664bb3..0000000
--- a/vendor/rayon-core/src/job.rs
+++ /dev/null
@@ -1,270 +0,0 @@
-use crate::latch::Latch;
-use crate::unwind;
-use crossbeam_deque::{Injector, Steal};
-use std::any::Any;
-use std::cell::UnsafeCell;
-use std::mem;
-use std::sync::Arc;
-
-pub(super) enum JobResult<T> {
- None,
- Ok(T),
- Panic(Box<dyn Any + Send>),
-}
-
-/// A `Job` is used to advertise work for other threads that they may
-/// want to steal. In accordance with time honored tradition, jobs are
-/// arranged in a deque, so that thieves can take from the top of the
-/// deque while the main worker manages the bottom of the deque. This
-/// deque is managed by the `thread_pool` module.
-pub(super) trait Job {
- /// Unsafe: this may be called from a different thread than the one
- /// which scheduled the job, so the implementer must ensure the
- /// appropriate traits are met, whether `Send`, `Sync`, or both.
- unsafe fn execute(this: *const ());
-}
-
-/// Effectively a Job trait object. Each JobRef **must** be executed
-/// exactly once, or else data may leak.
-///
-/// Internally, we store the job's data in a `*const ()` pointer. The
-/// true type is something like `*const StackJob<...>`, but we hide
-/// it. We also carry the "execute fn" from the `Job` trait.
-pub(super) struct JobRef {
- pointer: *const (),
- execute_fn: unsafe fn(*const ()),
-}
-
-unsafe impl Send for JobRef {}
-unsafe impl Sync for JobRef {}
-
-impl JobRef {
- /// Unsafe: caller asserts that `data` will remain valid until the
- /// job is executed.
- pub(super) unsafe fn new<T>(data: *const T) -> JobRef
- where
- T: Job,
- {
- // erase types:
- JobRef {
- pointer: data as *const (),
- execute_fn: <T as Job>::execute,
- }
- }
-
- /// Returns an opaque handle that can be saved and compared,
- /// without making `JobRef` itself `Copy + Eq`.
- #[inline]
- pub(super) fn id(&self) -> impl Eq {
- (self.pointer, self.execute_fn)
- }
-
- #[inline]
- pub(super) unsafe fn execute(self) {
- (self.execute_fn)(self.pointer)
- }
-}
-
-/// A job that will be owned by a stack slot. This means that when it
-/// executes it need not free any heap data, the cleanup occurs when
-/// the stack frame is later popped. The function parameter indicates
-/// `true` if the job was stolen -- executed on a different thread.
-pub(super) struct StackJob<L, F, R>
-where
- L: Latch + Sync,
- F: FnOnce(bool) -> R + Send,
- R: Send,
-{
- pub(super) latch: L,
- func: UnsafeCell<Option<F>>,
- result: UnsafeCell<JobResult<R>>,
-}
-
-impl<L, F, R> StackJob<L, F, R>
-where
- L: Latch + Sync,
- F: FnOnce(bool) -> R + Send,
- R: Send,
-{
- pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
- StackJob {
- latch,
- func: UnsafeCell::new(Some(func)),
- result: UnsafeCell::new(JobResult::None),
- }
- }
-
- pub(super) unsafe fn as_job_ref(&self) -> JobRef {
- JobRef::new(self)
- }
-
- pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
- self.func.into_inner().unwrap()(stolen)
- }
-
- pub(super) unsafe fn into_result(self) -> R {
- self.result.into_inner().into_return_value()
- }
-}
-
-impl<L, F, R> Job for StackJob<L, F, R>
-where
- L: Latch + Sync,
- F: FnOnce(bool) -> R + Send,
- R: Send,
-{
- unsafe fn execute(this: *const ()) {
- let this = &*(this as *const Self);
- let abort = unwind::AbortIfPanic;
- let func = (*this.func.get()).take().unwrap();
- (*this.result.get()) = JobResult::call(func);
- Latch::set(&this.latch);
- mem::forget(abort);
- }
-}
-
-/// Represents a job stored in the heap. Used to implement
-/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
-/// invokes a closure, which then triggers the appropriate logic to
-/// signal that the job executed.
-///
-/// (Probably `StackJob` should be refactored in a similar fashion.)
-pub(super) struct HeapJob<BODY>
-where
- BODY: FnOnce() + Send,
-{
- job: BODY,
-}
-
-impl<BODY> HeapJob<BODY>
-where
- BODY: FnOnce() + Send,
-{
- pub(super) fn new(job: BODY) -> Box<Self> {
- Box::new(HeapJob { job })
- }
-
- /// Creates a `JobRef` from this job -- note that this hides all
- /// lifetimes, so it is up to you to ensure that this JobRef
- /// doesn't outlive any data that it closes over.
- pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
- JobRef::new(Box::into_raw(self))
- }
-
- /// Creates a static `JobRef` from this job.
- pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
- where
- BODY: 'static,
- {
- unsafe { self.into_job_ref() }
- }
-}
-
-impl<BODY> Job for HeapJob<BODY>
-where
- BODY: FnOnce() + Send,
-{
- unsafe fn execute(this: *const ()) {
- let this = Box::from_raw(this as *mut Self);
- (this.job)();
- }
-}
-
-/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
-/// be turned into multiple `JobRef`s and called multiple times.
-pub(super) struct ArcJob<BODY>
-where
- BODY: Fn() + Send + Sync,
-{
- job: BODY,
-}
-
-impl<BODY> ArcJob<BODY>
-where
- BODY: Fn() + Send + Sync,
-{
- pub(super) fn new(job: BODY) -> Arc<Self> {
- Arc::new(ArcJob { job })
- }
-
- /// Creates a `JobRef` from this job -- note that this hides all
- /// lifetimes, so it is up to you to ensure that this JobRef
- /// doesn't outlive any data that it closes over.
- pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
- JobRef::new(Arc::into_raw(Arc::clone(this)))
- }
-
- /// Creates a static `JobRef` from this job.
- pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
- where
- BODY: 'static,
- {
- unsafe { Self::as_job_ref(this) }
- }
-}
-
-impl<BODY> Job for ArcJob<BODY>
-where
- BODY: Fn() + Send + Sync,
-{
- unsafe fn execute(this: *const ()) {
- let this = Arc::from_raw(this as *mut Self);
- (this.job)();
- }
-}
-
-impl<T> JobResult<T> {
- fn call(func: impl FnOnce(bool) -> T) -> Self {
- match unwind::halt_unwinding(|| func(true)) {
- Ok(x) => JobResult::Ok(x),
- Err(x) => JobResult::Panic(x),
- }
- }
-
- /// Convert the `JobResult` for a job that has finished (and hence
- /// its JobResult is populated) into its return value.
- ///
- /// NB. This will panic if the job panicked.
- pub(super) fn into_return_value(self) -> T {
- match self {
- JobResult::None => unreachable!(),
- JobResult::Ok(x) => x,
- JobResult::Panic(x) => unwind::resume_unwinding(x),
- }
- }
-}
-
-/// Indirect queue to provide FIFO job priority.
-pub(super) struct JobFifo {
- inner: Injector<JobRef>,
-}
-
-impl JobFifo {
- pub(super) fn new() -> Self {
- JobFifo {
- inner: Injector::new(),
- }
- }
-
- pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
- // A little indirection ensures that spawns are always prioritized in FIFO order. The
- // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
- // (FIFO), but either way they will end up popping from the front of this queue.
- self.inner.push(job_ref);
- JobRef::new(self)
- }
-}
-
-impl Job for JobFifo {
- unsafe fn execute(this: *const ()) {
- // We "execute" a queue by executing its first job, FIFO.
- let this = &*(this as *const Self);
- loop {
- match this.inner.steal() {
- Steal::Success(job_ref) => break job_ref.execute(),
- Steal::Empty => panic!("FIFO is empty"),
- Steal::Retry => {}
- }
- }
- }
-}
diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs
deleted file mode 100644
index 5ab9f6b..0000000
--- a/vendor/rayon-core/src/join/mod.rs
+++ /dev/null
@@ -1,188 +0,0 @@
-use crate::job::StackJob;
-use crate::latch::SpinLatch;
-use crate::registry::{self, WorkerThread};
-use crate::unwind;
-use std::any::Any;
-
-use crate::FnContext;
-
-#[cfg(test)]
-mod test;
-
-/// Takes two closures and *potentially* runs them in parallel. It
-/// returns a pair of the results from those closures.
-///
-/// Conceptually, calling `join()` is similar to spawning two threads,
-/// one executing each of the two closures. However, the
-/// implementation is quite different and incurs very low
-/// overhead. The underlying technique is called "work stealing": the
-/// Rayon runtime uses a fixed pool of worker threads and attempts to
-/// only execute code in parallel when there are idle CPUs to handle
-/// it.
-///
-/// When `join` is called from outside the thread pool, the calling
-/// thread will block while the closures execute in the pool. When
-/// `join` is called within the pool, the calling thread still actively
-/// participates in the thread pool. It will begin by executing closure
-/// A (on the current thread). While it is doing that, it will advertise
-/// closure B as being available for other threads to execute. Once closure A
-/// has completed, the current thread will try to execute closure B;
-/// if however closure B has been stolen, then it will look for other work
-/// while waiting for the thief to fully execute closure B. (This is the
-/// typical work-stealing strategy).
-///
-/// # Examples
-///
-/// This example uses join to perform a quick-sort (note this is not a
-/// particularly optimized implementation: if you **actually** want to
-/// sort for real, you should prefer [the `par_sort` method] offered
-/// by Rayon).
-///
-/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let mut v = vec![5, 1, 8, 22, 0, 44];
-/// quick_sort(&mut v);
-/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
-///
-/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
-/// if v.len() > 1 {
-/// let mid = partition(v);
-/// let (lo, hi) = v.split_at_mut(mid);
-/// rayon::join(|| quick_sort(lo),
-/// || quick_sort(hi));
-/// }
-/// }
-///
-/// // Partition rearranges all items `<=` to the pivot
-/// // item (arbitrary selected to be the last item in the slice)
-/// // to the first half of the slice. It then returns the
-/// // "dividing point" where the pivot is placed.
-/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
-/// let pivot = v.len() - 1;
-/// let mut i = 0;
-/// for j in 0..pivot {
-/// if v[j] <= v[pivot] {
-/// v.swap(i, j);
-/// i += 1;
-/// }
-/// }
-/// v.swap(i, pivot);
-/// i
-/// }
-/// ```
-///
-/// # Warning about blocking I/O
-///
-/// The assumption is that the closures given to `join()` are
-/// CPU-bound tasks that do not perform I/O or other blocking
-/// operations. If you do perform I/O, and that I/O should block
-/// (e.g., waiting for a network request), the overall performance may
-/// be poor. Moreover, if you cause one closure to be blocked waiting
-/// on another (for example, using a channel), that could lead to a
-/// deadlock.
-///
-/// # Panics
-///
-/// No matter what happens, both closures will always be executed. If
-/// a single closure panics, whether it be the first or second
-/// closure, that panic will be propagated and hence `join()` will
-/// panic with the same panic value. If both closures panic, `join()`
-/// will panic with the panic value from the first closure.
-pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
-where
- A: FnOnce() -> RA + Send,
- B: FnOnce() -> RB + Send,
- RA: Send,
- RB: Send,
-{
- #[inline]
- fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
- move |_| f()
- }
-
- join_context(call(oper_a), call(oper_b))
-}
-
-/// Identical to `join`, except that the closures have a parameter
-/// that provides context for the way the closure has been called,
-/// especially indicating whether they're executing on a different
-/// thread than where `join_context` was called. This will occur if
-/// the second job is stolen by a different thread, or if
-/// `join_context` was called from outside the thread pool to begin
-/// with.
-pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
-where
- A: FnOnce(FnContext) -> RA + Send,
- B: FnOnce(FnContext) -> RB + Send,
- RA: Send,
- RB: Send,
-{
- #[inline]
- fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
- move || f(FnContext::new(injected))
- }
-
- #[inline]
- fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
- move |migrated| f(FnContext::new(migrated))
- }
-
- registry::in_worker(|worker_thread, injected| unsafe {
- // Create virtual wrapper for task b; this all has to be
- // done here so that the stack frame can keep it all live
- // long enough.
- let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
- let job_b_ref = job_b.as_job_ref();
- let job_b_id = job_b_ref.id();
- worker_thread.push(job_b_ref);
-
- // Execute task a; hopefully b gets stolen in the meantime.
- let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
- let result_a = match status_a {
- Ok(v) => v,
- Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
- };
-
- // Now that task A has finished, try to pop job B from the
- // local stack. It may already have been popped by job A; it
- // may also have been stolen. There may also be some tasks
- // pushed on top of it in the stack, and we will have to pop
- // those off to get to it.
- while !job_b.latch.probe() {
- if let Some(job) = worker_thread.take_local_job() {
- if job_b_id == job.id() {
- // Found it! Let's run it.
- //
- // Note that this could panic, but it's ok if we unwind here.
- let result_b = job_b.run_inline(injected);
- return (result_a, result_b);
- } else {
- worker_thread.execute(job);
- }
- } else {
- // Local deque is empty. Time to steal from other
- // threads.
- worker_thread.wait_until(&job_b.latch);
- debug_assert!(job_b.latch.probe());
- break;
- }
- }
-
- (result_a, job_b.into_result())
- })
-}
-
-/// If job A panics, we still cannot return until we are sure that job
-/// B is complete. This is because it may contain references into the
-/// enclosing stack frame(s).
-#[cold] // cold path
-unsafe fn join_recover_from_panic(
- worker_thread: &WorkerThread,
- job_b_latch: &SpinLatch<'_>,
- err: Box<dyn Any + Send>,
-) -> ! {
- worker_thread.wait_until(job_b_latch);
- unwind::resume_unwinding(err)
-}
diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs
deleted file mode 100644
index b303dbc..0000000
--- a/vendor/rayon-core/src/join/test.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-//! Tests for the join code.
-
-use crate::join::*;
-use crate::unwind;
-use crate::ThreadPoolBuilder;
-use rand::distributions::Standard;
-use rand::{Rng, SeedableRng};
-use rand_xorshift::XorShiftRng;
-
-fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) {
- if v.len() <= 1 {
- return;
- }
-
- let mid = partition(v);
- let (lo, hi) = v.split_at_mut(mid);
- join(|| quick_sort(lo), || quick_sort(hi));
-}
-
-fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize {
- let pivot = v.len() - 1;
- let mut i = 0;
- for j in 0..pivot {
- if v[j] <= v[pivot] {
- v.swap(i, j);
- i += 1;
- }
- }
- v.swap(i, pivot);
- i
-}
-
-fn seeded_rng() -> XorShiftRng {
- let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
- (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
- XorShiftRng::from_seed(seed)
-}
-
-#[test]
-fn sort() {
- let rng = seeded_rng();
- let mut data: Vec<u32> = rng.sample_iter(&Standard).take(6 * 1024).collect();
- let mut sorted_data = data.clone();
- sorted_data.sort();
- quick_sort(&mut data);
- assert_eq!(data, sorted_data);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn sort_in_pool() {
- let rng = seeded_rng();
- let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
-
- let pool = ThreadPoolBuilder::new().build().unwrap();
- let mut sorted_data = data.clone();
- sorted_data.sort();
- pool.install(|| quick_sort(&mut data));
- assert_eq!(data, sorted_data);
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_a() {
- join(|| panic!("Hello, world!"), || ());
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_b() {
- join(|| (), || panic!("Hello, world!"));
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_both() {
- join(|| panic!("Hello, world!"), || panic!("Goodbye, world!"));
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_b_still_executes() {
- let mut x = false;
- match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
- Ok(_) => panic!("failed to propagate panic from closure A,"),
- Err(_) => assert!(x, "closure b failed to execute"),
- }
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_both() {
- // If we're not in a pool, both should be marked stolen as they're injected.
- let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
- assert!(a_migrated);
- assert!(b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_neither() {
- // If we're already in a 1-thread pool, neither job should be stolen.
- let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let (a_migrated, b_migrated) =
- pool.install(|| join_context(|a| a.migrated(), |b| b.migrated()));
- assert!(!a_migrated);
- assert!(!b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_second() {
- use std::sync::Barrier;
-
- // If we're already in a 2-thread pool, the second job should be stolen.
- let barrier = Barrier::new(2);
- let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
- let (a_migrated, b_migrated) = pool.install(|| {
- join_context(
- |a| {
- barrier.wait();
- a.migrated()
- },
- |b| {
- barrier.wait();
- b.migrated()
- },
- )
- });
- assert!(!a_migrated);
- assert!(b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_counter_overflow() {
- const MAX: u32 = 500_000;
-
- let mut i = 0;
- let mut j = 0;
- let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
-
- // Hammer on join a bunch of times -- used to hit overflow debug-assertions
- // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797
- for _ in 0..MAX {
- pool.join(|| i += 1, || j += 1);
- }
-
- assert_eq!(i, MAX);
- assert_eq!(j, MAX);
-}
diff --git a/vendor/rayon-core/src/latch.rs b/vendor/rayon-core/src/latch.rs
deleted file mode 100644
index b0cbbd8..0000000
--- a/vendor/rayon-core/src/latch.rs
+++ /dev/null
@@ -1,460 +0,0 @@
-use std::marker::PhantomData;
-use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Condvar, Mutex};
-use std::usize;
-
-use crate::registry::{Registry, WorkerThread};
-
-/// We define various kinds of latches, which are all a primitive signaling
-/// mechanism. A latch starts as false. Eventually someone calls `set()` and
-/// it becomes true. You can test if it has been set by calling `probe()`.
-///
-/// Some kinds of latches, but not all, support a `wait()` operation
-/// that will wait until the latch is set, blocking efficiently. That
-/// is not part of the trait since it is not possibly to do with all
-/// latches.
-///
-/// The intention is that `set()` is called once, but `probe()` may be
-/// called any number of times. Once `probe()` returns true, the memory
-/// effects that occurred before `set()` become visible.
-///
-/// It'd probably be better to refactor the API into two paired types,
-/// but that's a bit of work, and this is not a public API.
-///
-/// ## Memory ordering
-///
-/// Latches need to guarantee two things:
-///
-/// - Once `probe()` returns true, all memory effects from the `set()`
-/// are visible (in other words, the set should synchronize-with
-/// the probe).
-/// - Once `set()` occurs, the next `probe()` *will* observe it. This
-/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
-/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
-pub(super) trait Latch {
- /// Set the latch, signalling others.
- ///
- /// # WARNING
- ///
- /// Setting a latch triggers other threads to wake up and (in some
- /// cases) complete. This may, in turn, cause memory to be
- /// deallocated and so forth. One must be very careful about this,
- /// and it's typically better to read all the fields you will need
- /// to access *before* a latch is set!
- ///
- /// This function operates on `*const Self` instead of `&self` to allow it
- /// to become dangling during this call. The caller must ensure that the
- /// pointer is valid upon entry, and not invalidated during the call by any
- /// actions other than `set` itself.
- unsafe fn set(this: *const Self);
-}
-
-pub(super) trait AsCoreLatch {
- fn as_core_latch(&self) -> &CoreLatch;
-}
-
-/// Latch is not set, owning thread is awake
-const UNSET: usize = 0;
-
-/// Latch is not set, owning thread is going to sleep on this latch
-/// (but has not yet fallen asleep).
-const SLEEPY: usize = 1;
-
-/// Latch is not set, owning thread is asleep on this latch and
-/// must be awoken.
-const SLEEPING: usize = 2;
-
-/// Latch is set.
-const SET: usize = 3;
-
-/// Spin latches are the simplest, most efficient kind, but they do
-/// not support a `wait()` operation. They just have a boolean flag
-/// that becomes true when `set()` is called.
-#[derive(Debug)]
-pub(super) struct CoreLatch {
- state: AtomicUsize,
-}
-
-impl CoreLatch {
- #[inline]
- fn new() -> Self {
- Self {
- state: AtomicUsize::new(0),
- }
- }
-
- /// Invoked by owning thread as it prepares to sleep. Returns true
- /// if the owning thread may proceed to fall asleep, false if the
- /// latch was set in the meantime.
- #[inline]
- pub(super) fn get_sleepy(&self) -> bool {
- self.state
- .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
- .is_ok()
- }
-
- /// Invoked by owning thread as it falls asleep sleep. Returns
- /// true if the owning thread should block, or false if the latch
- /// was set in the meantime.
- #[inline]
- pub(super) fn fall_asleep(&self) -> bool {
- self.state
- .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
- .is_ok()
- }
-
- /// Invoked by owning thread as it falls asleep sleep. Returns
- /// true if the owning thread should block, or false if the latch
- /// was set in the meantime.
- #[inline]
- pub(super) fn wake_up(&self) {
- if !self.probe() {
- let _ =
- self.state
- .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
- }
- }
-
- /// Set the latch. If this returns true, the owning thread was sleeping
- /// and must be awoken.
- ///
- /// This is private because, typically, setting a latch involves
- /// doing some wakeups; those are encapsulated in the surrounding
- /// latch code.
- #[inline]
- unsafe fn set(this: *const Self) -> bool {
- let old_state = (*this).state.swap(SET, Ordering::AcqRel);
- old_state == SLEEPING
- }
-
- /// Test if this latch has been set.
- #[inline]
- pub(super) fn probe(&self) -> bool {
- self.state.load(Ordering::Acquire) == SET
- }
-}
-
-impl AsCoreLatch for CoreLatch {
- #[inline]
- fn as_core_latch(&self) -> &CoreLatch {
- self
- }
-}
-
-/// Spin latches are the simplest, most efficient kind, but they do
-/// not support a `wait()` operation. They just have a boolean flag
-/// that becomes true when `set()` is called.
-pub(super) struct SpinLatch<'r> {
- core_latch: CoreLatch,
- registry: &'r Arc<Registry>,
- target_worker_index: usize,
- cross: bool,
-}
-
-impl<'r> SpinLatch<'r> {
- /// Creates a new spin latch that is owned by `thread`. This means
- /// that `thread` is the only thread that should be blocking on
- /// this latch -- it also means that when the latch is set, we
- /// will wake `thread` if it is sleeping.
- #[inline]
- pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
- SpinLatch {
- core_latch: CoreLatch::new(),
- registry: thread.registry(),
- target_worker_index: thread.index(),
- cross: false,
- }
- }
-
- /// Creates a new spin latch for cross-threadpool blocking. Notably, we
- /// need to make sure the registry is kept alive after setting, so we can
- /// safely call the notification.
- #[inline]
- pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
- SpinLatch {
- cross: true,
- ..SpinLatch::new(thread)
- }
- }
-
- #[inline]
- pub(super) fn probe(&self) -> bool {
- self.core_latch.probe()
- }
-}
-
-impl<'r> AsCoreLatch for SpinLatch<'r> {
- #[inline]
- fn as_core_latch(&self) -> &CoreLatch {
- &self.core_latch
- }
-}
-
-impl<'r> Latch for SpinLatch<'r> {
- #[inline]
- unsafe fn set(this: *const Self) {
- let cross_registry;
-
- let registry: &Registry = if (*this).cross {
- // Ensure the registry stays alive while we notify it.
- // Otherwise, it would be possible that we set the spin
- // latch and the other thread sees it and exits, causing
- // the registry to be deallocated, all before we get a
- // chance to invoke `registry.notify_worker_latch_is_set`.
- cross_registry = Arc::clone((*this).registry);
- &cross_registry
- } else {
- // If this is not a "cross-registry" spin-latch, then the
- // thread which is performing `set` is itself ensuring
- // that the registry stays alive. However, that doesn't
- // include this *particular* `Arc` handle if the waiting
- // thread then exits, so we must completely dereference it.
- (*this).registry
- };
- let target_worker_index = (*this).target_worker_index;
-
- // NOTE: Once we `set`, the target may proceed and invalidate `this`!
- if CoreLatch::set(&(*this).core_latch) {
- // Subtle: at this point, we can no longer read from
- // `self`, because the thread owning this spin latch may
- // have awoken and deallocated the latch. Therefore, we
- // only use fields whose values we already read.
- registry.notify_worker_latch_is_set(target_worker_index);
- }
- }
-}
-
-/// A Latch starts as false and eventually becomes true. You can block
-/// until it becomes true.
-#[derive(Debug)]
-pub(super) struct LockLatch {
- m: Mutex<bool>,
- v: Condvar,
-}
-
-impl LockLatch {
- #[inline]
- pub(super) fn new() -> LockLatch {
- LockLatch {
- m: Mutex::new(false),
- v: Condvar::new(),
- }
- }
-
- /// Block until latch is set, then resets this lock latch so it can be reused again.
- pub(super) fn wait_and_reset(&self) {
- let mut guard = self.m.lock().unwrap();
- while !*guard {
- guard = self.v.wait(guard).unwrap();
- }
- *guard = false;
- }
-
- /// Block until latch is set.
- pub(super) fn wait(&self) {
- let mut guard = self.m.lock().unwrap();
- while !*guard {
- guard = self.v.wait(guard).unwrap();
- }
- }
-}
-
-impl Latch for LockLatch {
- #[inline]
- unsafe fn set(this: *const Self) {
- let mut guard = (*this).m.lock().unwrap();
- *guard = true;
- (*this).v.notify_all();
- }
-}
-
-/// Once latches are used to implement one-time blocking, primarily
-/// for the termination flag of the threads in the pool.
-///
-/// Note: like a `SpinLatch`, once-latches are always associated with
-/// some registry that is probing them, which must be tickled when
-/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
-/// reference to that registry. This is because in some cases the
-/// registry owns the once-latch, and that would create a cycle. So a
-/// `OnceLatch` must be given a reference to its owning registry when
-/// it is set. For this reason, it does not implement the `Latch`
-/// trait (but it doesn't have to, as it is not used in those generic
-/// contexts).
-#[derive(Debug)]
-pub(super) struct OnceLatch {
- core_latch: CoreLatch,
-}
-
-impl OnceLatch {
- #[inline]
- pub(super) fn new() -> OnceLatch {
- Self {
- core_latch: CoreLatch::new(),
- }
- }
-
- /// Set the latch, then tickle the specific worker thread,
- /// which should be the one that owns this latch.
- #[inline]
- pub(super) unsafe fn set_and_tickle_one(
- this: *const Self,
- registry: &Registry,
- target_worker_index: usize,
- ) {
- if CoreLatch::set(&(*this).core_latch) {
- registry.notify_worker_latch_is_set(target_worker_index);
- }
- }
-}
-
-impl AsCoreLatch for OnceLatch {
- #[inline]
- fn as_core_latch(&self) -> &CoreLatch {
- &self.core_latch
- }
-}
-
-/// Counting latches are used to implement scopes. They track a
-/// counter. Unlike other latches, calling `set()` does not
-/// necessarily make the latch be considered `set()`; instead, it just
-/// decrements the counter. The latch is only "set" (in the sense that
-/// `probe()` returns true) once the counter reaches zero.
-#[derive(Debug)]
-pub(super) struct CountLatch {
- counter: AtomicUsize,
- kind: CountLatchKind,
-}
-
-enum CountLatchKind {
- /// A latch for scopes created on a rayon thread which will participate in work-
- /// stealing while it waits for completion. This thread is not necessarily part
- /// of the same registry as the scope itself!
- Stealing {
- latch: CoreLatch,
- /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
- /// with registry B, when a job completes in a thread of registry B, we may
- /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
- /// That means we need a reference to registry A (since at that point we will
- /// only have a reference to registry B), so we stash it here.
- registry: Arc<Registry>,
- /// The index of the worker to wake in `registry`
- worker_index: usize,
- },
-
- /// A latch for scopes created on a non-rayon thread which will block to wait.
- Blocking { latch: LockLatch },
-}
-
-impl std::fmt::Debug for CountLatchKind {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- CountLatchKind::Stealing { latch, .. } => {
- f.debug_tuple("Stealing").field(latch).finish()
- }
- CountLatchKind::Blocking { latch, .. } => {
- f.debug_tuple("Blocking").field(latch).finish()
- }
- }
- }
-}
-
-impl CountLatch {
- pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
- Self::with_count(1, owner)
- }
-
- pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
- Self {
- counter: AtomicUsize::new(count),
- kind: match owner {
- Some(owner) => CountLatchKind::Stealing {
- latch: CoreLatch::new(),
- registry: Arc::clone(owner.registry()),
- worker_index: owner.index(),
- },
- None => CountLatchKind::Blocking {
- latch: LockLatch::new(),
- },
- },
- }
- }
-
- #[inline]
- pub(super) fn increment(&self) {
- let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
- debug_assert!(old_counter != 0);
- }
-
- pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
- match &self.kind {
- CountLatchKind::Stealing {
- latch,
- registry,
- worker_index,
- } => unsafe {
- let owner = owner.expect("owner thread");
- debug_assert_eq!(registry.id(), owner.registry().id());
- debug_assert_eq!(*worker_index, owner.index());
- owner.wait_until(latch);
- },
- CountLatchKind::Blocking { latch } => latch.wait(),
- }
- }
-}
-
-impl Latch for CountLatch {
- #[inline]
- unsafe fn set(this: *const Self) {
- if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- // NOTE: Once we call `set` on the internal `latch`,
- // the target may proceed and invalidate `this`!
- match (*this).kind {
- CountLatchKind::Stealing {
- ref latch,
- ref registry,
- worker_index,
- } => {
- let registry = Arc::clone(registry);
- if CoreLatch::set(latch) {
- registry.notify_worker_latch_is_set(worker_index);
- }
- }
- CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
- }
- }
- }
-}
-
-/// `&L` without any implication of `dereferenceable` for `Latch::set`
-pub(super) struct LatchRef<'a, L> {
- inner: *const L,
- marker: PhantomData<&'a L>,
-}
-
-impl<L> LatchRef<'_, L> {
- pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
- LatchRef {
- inner,
- marker: PhantomData,
- }
- }
-}
-
-unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
-
-impl<L> Deref for LatchRef<'_, L> {
- type Target = L;
-
- fn deref(&self) -> &L {
- // SAFETY: if we have &self, the inner latch is still alive
- unsafe { &*self.inner }
- }
-}
-
-impl<L: Latch> Latch for LatchRef<'_, L> {
- #[inline]
- unsafe fn set(this: *const Self) {
- L::set((*this).inner);
- }
-}
diff --git a/vendor/rayon-core/src/lib.rs b/vendor/rayon-core/src/lib.rs
deleted file mode 100644
index 7001c8c..0000000
--- a/vendor/rayon-core/src/lib.rs
+++ /dev/null
@@ -1,869 +0,0 @@
-//! Rayon-core houses the core stable APIs of Rayon.
-//!
-//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
-//!
-//! [`join`] is used to take two closures and potentially run them in parallel.
-//! - It will run in parallel if task B gets stolen before task A can finish.
-//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
-//!
-//! [`scope`] creates a scope in which you can run any number of parallel tasks.
-//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
-//! The scope will exist until all tasks spawned within the scope have been completed.
-//!
-//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
-//!
-//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
-//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
-//! where it becomes available for work stealing from other threads in the local threadpool.
-//!
-//! [`join`]: fn.join.html
-//! [`scope`]: fn.scope.html
-//! [`scope()`]: fn.scope.html
-//! [`spawn`]: fn.spawn.html
-//! [`ThreadPool`]: struct.threadpool.html
-//! [`install()`]: struct.ThreadPool.html#method.install
-//! [`spawn()`]: struct.ThreadPool.html#method.spawn
-//! [`join()`]: struct.ThreadPool.html#method.join
-//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
-//!
-//! # Global fallback when threading is unsupported
-//!
-//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
-//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
-//! targets are notable examples of this. Rather than panicking on the unsupported error when
-//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
-//!
-//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
-//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
-//! there is no other thread to share the work. However, since the pool is not running independent
-//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
-//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
-//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
-//! can also volunteer execution time.
-//!
-//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
-//!
-//! # Restricting multiple versions
-//!
-//! In order to ensure proper coordination between threadpools, and especially
-//! to make sure there's only one global threadpool, `rayon-core` is actively
-//! restricted from building multiple versions of itself into a single target.
-//! You may see a build error like this in violation:
-//!
-//! ```text
-//! error: native library `rayon-core` is being linked to by more
-//! than one package, and can only be linked to by one package
-//! ```
-//!
-//! While we strive to keep `rayon-core` semver-compatible, it's still
-//! possible to arrive at this situation if different crates have overly
-//! restrictive tilde or inequality requirements for `rayon-core`. The
-//! conflicting requirements will need to be resolved before the build will
-//! succeed.
-
-#![deny(missing_debug_implementations)]
-#![deny(missing_docs)]
-#![deny(unreachable_pub)]
-#![warn(rust_2018_idioms)]
-
-use std::any::Any;
-use std::env;
-use std::error::Error;
-use std::fmt;
-use std::io;
-use std::marker::PhantomData;
-use std::str::FromStr;
-use std::thread;
-
-#[macro_use]
-mod private;
-
-mod broadcast;
-mod job;
-mod join;
-mod latch;
-mod registry;
-mod scope;
-mod sleep;
-mod spawn;
-mod thread_pool;
-mod unwind;
-
-mod compile_fail;
-mod test;
-
-pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
-pub use self::join::{join, join_context};
-pub use self::registry::ThreadBuilder;
-pub use self::scope::{in_place_scope, scope, Scope};
-pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
-pub use self::spawn::{spawn, spawn_fifo};
-pub use self::thread_pool::current_thread_has_pending_tasks;
-pub use self::thread_pool::current_thread_index;
-pub use self::thread_pool::ThreadPool;
-pub use self::thread_pool::{yield_local, yield_now, Yield};
-
-use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
-
-/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
-///
-/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
-/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
-///
-/// The value may vary between different targets, and is subject to change in new Rayon versions.
-pub fn max_num_threads() -> usize {
- // We are limited by the bits available in the sleep counter's `AtomicUsize`.
- crate::sleep::THREADS_MAX
-}
-
-/// Returns the number of threads in the current registry. If this
-/// code is executing within a Rayon thread-pool, then this will be
-/// the number of threads for the thread-pool of the current
-/// thread. Otherwise, it will be the number of threads for the global
-/// thread-pool.
-///
-/// This can be useful when trying to judge how many times to split
-/// parallel work (the parallel iterator traits use this value
-/// internally for this purpose).
-///
-/// # Future compatibility note
-///
-/// Note that unless this thread-pool was created with a
-/// builder that specifies the number of threads, then this
-/// number may vary over time in future versions (see [the
-/// `num_threads()` method for details][snt]).
-///
-/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
-pub fn current_num_threads() -> usize {
- crate::registry::Registry::current_num_threads()
-}
-
-/// Error when initializing a thread pool.
-#[derive(Debug)]
-pub struct ThreadPoolBuildError {
- kind: ErrorKind,
-}
-
-#[derive(Debug)]
-enum ErrorKind {
- GlobalPoolAlreadyInitialized,
- CurrentThreadAlreadyInPool,
- IOError(io::Error),
-}
-
-/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
-/// ## Creating a ThreadPool
-/// The following creates a thread pool with 22 threads.
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
-/// ```
-///
-/// To instead configure the global thread pool, use [`build_global()`]:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
-/// ```
-///
-/// [`ThreadPool`]: struct.ThreadPool.html
-/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
-pub struct ThreadPoolBuilder<S = DefaultSpawn> {
- /// The number of threads in the rayon thread pool.
- /// If zero will use the RAYON_NUM_THREADS environment variable.
- /// If RAYON_NUM_THREADS is invalid or zero will use the default.
- num_threads: usize,
-
- /// The thread we're building *from* will also be part of the pool.
- use_current_thread: bool,
-
- /// Custom closure, if any, to handle a panic that we cannot propagate
- /// anywhere else.
- panic_handler: Option<Box<PanicHandler>>,
-
- /// Closure to compute the name of a thread.
- get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
-
- /// The stack size for the created worker threads
- stack_size: Option<usize>,
-
- /// Closure invoked on worker thread start.
- start_handler: Option<Box<StartHandler>>,
-
- /// Closure invoked on worker thread exit.
- exit_handler: Option<Box<ExitHandler>>,
-
- /// Closure invoked to spawn threads.
- spawn_handler: S,
-
- /// If false, worker threads will execute spawned jobs in a
- /// "depth-first" fashion. If true, they will do a "breadth-first"
- /// fashion. Depth-first is the default.
- breadth_first: bool,
-}
-
-/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
-///
-/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
-#[deprecated(note = "Use `ThreadPoolBuilder`")]
-#[derive(Default)]
-pub struct Configuration {
- builder: ThreadPoolBuilder,
-}
-
-/// The type for a panic handling closure. Note that this same closure
-/// may be invoked multiple times in parallel.
-type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
-
-/// The type for a closure that gets invoked when a thread starts. The
-/// closure is passed the index of the thread on which it is invoked.
-/// Note that this same closure may be invoked multiple times in parallel.
-type StartHandler = dyn Fn(usize) + Send + Sync;
-
-/// The type for a closure that gets invoked when a thread exits. The
-/// closure is passed the index of the thread on which is is invoked.
-/// Note that this same closure may be invoked multiple times in parallel.
-type ExitHandler = dyn Fn(usize) + Send + Sync;
-
-// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
-impl Default for ThreadPoolBuilder {
- fn default() -> Self {
- ThreadPoolBuilder {
- num_threads: 0,
- use_current_thread: false,
- panic_handler: None,
- get_thread_name: None,
- stack_size: None,
- start_handler: None,
- exit_handler: None,
- spawn_handler: DefaultSpawn,
- breadth_first: false,
- }
- }
-}
-
-impl ThreadPoolBuilder {
- /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
- pub fn new() -> Self {
- Self::default()
- }
-}
-
-/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
-/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
-impl<S> ThreadPoolBuilder<S>
-where
- S: ThreadSpawn,
-{
- /// Creates a new `ThreadPool` initialized using this configuration.
- pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
- ThreadPool::build(self)
- }
-
- /// Initializes the global thread pool. This initialization is
- /// **optional**. If you do not call this function, the thread pool
- /// will be automatically initialized with the default
- /// configuration. Calling `build_global` is not recommended, except
- /// in two scenarios:
- ///
- /// - You wish to change the default configuration.
- /// - You are running a benchmark, in which case initializing may
- /// yield slightly more consistent results, since the worker threads
- /// will already be ready to go even in the first iteration. But
- /// this cost is minimal.
- ///
- /// Initialization of the global thread pool happens exactly
- /// once. Once started, the configuration cannot be
- /// changed. Therefore, if you call `build_global` a second time, it
- /// will return an error. An `Ok` result indicates that this
- /// is the first initialization of the thread pool.
- pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
- let registry = registry::init_global_registry(self)?;
- registry.wait_until_primed();
- Ok(())
- }
-}
-
-impl ThreadPoolBuilder {
- /// Creates a scoped `ThreadPool` initialized using this configuration.
- ///
- /// This is a convenience function for building a pool using [`std::thread::scope`]
- /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
- /// The threads in this pool will start by calling `wrapper`, which should
- /// do initialization and continue by calling `ThreadBuilder::run()`.
- ///
- /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
- ///
- /// # Examples
- ///
- /// A scoped pool may be useful in combination with scoped thread-local variables.
- ///
- /// ```
- /// # use rayon_core as rayon;
- ///
- /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
- ///
- /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
- /// let pool_data = vec![1, 2, 3];
- ///
- /// // We haven't assigned any TLS data yet.
- /// assert!(!POOL_DATA.is_set());
- ///
- /// rayon::ThreadPoolBuilder::new()
- /// .build_scoped(
- /// // Borrow `pool_data` in TLS for each thread.
- /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
- /// // Do some work that needs the TLS data.
- /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
- /// )?;
- ///
- /// // Once we've returned, `pool_data` is no longer borrowed.
- /// drop(pool_data);
- /// Ok(())
- /// }
- /// ```
- pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
- where
- W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
- F: FnOnce(&ThreadPool) -> R,
- {
- std::thread::scope(|scope| {
- let pool = self
- .spawn_handler(|thread| {
- let mut builder = std::thread::Builder::new();
- if let Some(name) = thread.name() {
- builder = builder.name(name.to_string());
- }
- if let Some(size) = thread.stack_size() {
- builder = builder.stack_size(size);
- }
- builder.spawn_scoped(scope, || wrapper(thread))?;
- Ok(())
- })
- .build()?;
- Ok(with_pool(&pool))
- })
- }
-}
-
-impl<S> ThreadPoolBuilder<S> {
- /// Sets a custom function for spawning threads.
- ///
- /// Note that the threads will not exit until after the pool is dropped. It
- /// is up to the caller to wait for thread termination if that is important
- /// for any invariants. For instance, threads created in [`std::thread::scope`]
- /// will be joined before that scope returns, and this will block indefinitely
- /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
- /// until the entire process exits!
- ///
- /// # Examples
- ///
- /// A minimal spawn handler just needs to call `run()` from an independent thread.
- ///
- /// ```
- /// # use rayon_core as rayon;
- /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
- /// let pool = rayon::ThreadPoolBuilder::new()
- /// .spawn_handler(|thread| {
- /// std::thread::spawn(|| thread.run());
- /// Ok(())
- /// })
- /// .build()?;
- ///
- /// pool.install(|| println!("Hello from my custom thread!"));
- /// Ok(())
- /// }
- /// ```
- ///
- /// The default spawn handler sets the name and stack size if given, and propagates
- /// any errors from the thread builder.
- ///
- /// ```
- /// # use rayon_core as rayon;
- /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
- /// let pool = rayon::ThreadPoolBuilder::new()
- /// .spawn_handler(|thread| {
- /// let mut b = std::thread::Builder::new();
- /// if let Some(name) = thread.name() {
- /// b = b.name(name.to_owned());
- /// }
- /// if let Some(stack_size) = thread.stack_size() {
- /// b = b.stack_size(stack_size);
- /// }
- /// b.spawn(|| thread.run())?;
- /// Ok(())
- /// })
- /// .build()?;
- ///
- /// pool.install(|| println!("Hello from my fully custom thread!"));
- /// Ok(())
- /// }
- /// ```
- ///
- /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
- /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
- /// [`build_scoped`](#method.build_scoped).
- ///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
- /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
- ///
- /// ```
- /// # use rayon_core as rayon;
- /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
- /// std::thread::scope(|scope| {
- /// let pool = rayon::ThreadPoolBuilder::new()
- /// .spawn_handler(|thread| {
- /// let mut builder = std::thread::Builder::new();
- /// if let Some(name) = thread.name() {
- /// builder = builder.name(name.to_string());
- /// }
- /// if let Some(size) = thread.stack_size() {
- /// builder = builder.stack_size(size);
- /// }
- /// builder.spawn_scoped(scope, || {
- /// // Add any scoped initialization here, then run!
- /// thread.run()
- /// })?;
- /// Ok(())
- /// })
- /// .build()?;
- ///
- /// pool.install(|| println!("Hello from my custom scoped thread!"));
- /// Ok(())
- /// })
- /// }
- /// ```
- pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
- where
- F: FnMut(ThreadBuilder) -> io::Result<()>,
- {
- ThreadPoolBuilder {
- spawn_handler: CustomSpawn::new(spawn),
- // ..self
- num_threads: self.num_threads,
- use_current_thread: self.use_current_thread,
- panic_handler: self.panic_handler,
- get_thread_name: self.get_thread_name,
- stack_size: self.stack_size,
- start_handler: self.start_handler,
- exit_handler: self.exit_handler,
- breadth_first: self.breadth_first,
- }
- }
-
- /// Returns a reference to the current spawn handler.
- fn get_spawn_handler(&mut self) -> &mut S {
- &mut self.spawn_handler
- }
-
- /// Get the number of threads that will be used for the thread
- /// pool. See `num_threads()` for more information.
- fn get_num_threads(&self) -> usize {
- if self.num_threads > 0 {
- self.num_threads
- } else {
- let default = || {
- thread::available_parallelism()
- .map(|n| n.get())
- .unwrap_or(1)
- };
-
- match env::var("RAYON_NUM_THREADS")
- .ok()
- .and_then(|s| usize::from_str(&s).ok())
- {
- Some(x @ 1..) => return x,
- Some(0) => return default(),
- _ => {}
- }
-
- // Support for deprecated `RAYON_RS_NUM_CPUS`.
- match env::var("RAYON_RS_NUM_CPUS")
- .ok()
- .and_then(|s| usize::from_str(&s).ok())
- {
- Some(x @ 1..) => x,
- _ => default(),
- }
- }
- }
-
- /// Get the thread name for the thread with the given index.
- fn get_thread_name(&mut self, index: usize) -> Option<String> {
- let f = self.get_thread_name.as_mut()?;
- Some(f(index))
- }
-
- /// Sets a closure which takes a thread index and returns
- /// the thread's name.
- pub fn thread_name<F>(mut self, closure: F) -> Self
- where
- F: FnMut(usize) -> String + 'static,
- {
- self.get_thread_name = Some(Box::new(closure));
- self
- }
-
- /// Sets the number of threads to be used in the rayon threadpool.
- ///
- /// If you specify a non-zero number of threads using this
- /// function, then the resulting thread-pools are guaranteed to
- /// start at most this number of threads.
- ///
- /// If `num_threads` is 0, or you do not call this function, then
- /// the Rayon runtime will select the number of threads
- /// automatically. At present, this is based on the
- /// `RAYON_NUM_THREADS` environment variable (if set),
- /// or the number of logical CPUs (otherwise).
- /// In the future, however, the default behavior may
- /// change to dynamically add or remove threads as needed.
- ///
- /// **Future compatibility warning:** Given the default behavior
- /// may change in the future, if you wish to rely on a fixed
- /// number of threads, you should use this function to specify
- /// that number. To reproduce the current default behavior, you
- /// may wish to use [`std::thread::available_parallelism`]
- /// to query the number of CPUs dynamically.
- ///
- /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
- /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
- /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
- /// be preferred.
- pub fn num_threads(mut self, num_threads: usize) -> Self {
- self.num_threads = num_threads;
- self
- }
-
- /// Use the current thread as one of the threads in the pool.
- ///
- /// The current thread is guaranteed to be at index 0, and since the thread is not managed by
- /// rayon, the spawn and exit handlers do not run for that thread.
- ///
- /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
- /// the thread-pool will generally not be picked up automatically by this thread unless you
- /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
- ///
- /// # Local thread-pools
- ///
- /// Using this in a local thread-pool means the registry will be leaked. In future versions
- /// there might be a way of cleaning up the current-thread state.
- pub fn use_current_thread(mut self) -> Self {
- self.use_current_thread = true;
- self
- }
-
- /// Returns a copy of the current panic handler.
- fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
- self.panic_handler.take()
- }
-
- /// Normally, whenever Rayon catches a panic, it tries to
- /// propagate it to someplace sensible, to try and reflect the
- /// semantics of sequential execution. But in some cases,
- /// particularly with the `spawn()` APIs, there is no
- /// obvious place where we should propagate the panic to.
- /// In that case, this panic handler is invoked.
- ///
- /// If no panic handler is set, the default is to abort the
- /// process, under the principle that panics should not go
- /// unobserved.
- ///
- /// If the panic handler itself panics, this will abort the
- /// process. To prevent this, wrap the body of your panic handler
- /// in a call to `std::panic::catch_unwind()`.
- pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
- where
- H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
- {
- self.panic_handler = Some(Box::new(panic_handler));
- self
- }
-
- /// Get the stack size of the worker threads
- fn get_stack_size(&self) -> Option<usize> {
- self.stack_size
- }
-
- /// Sets the stack size of the worker threads
- pub fn stack_size(mut self, stack_size: usize) -> Self {
- self.stack_size = Some(stack_size);
- self
- }
-
- /// **(DEPRECATED)** Suggest to worker threads that they execute
- /// spawned jobs in a "breadth-first" fashion.
- ///
- /// Typically, when a worker thread is idle or blocked, it will
- /// attempt to execute the job from the *top* of its local deque of
- /// work (i.e., the job most recently spawned). If this flag is set
- /// to true, however, workers will prefer to execute in a
- /// *breadth-first* fashion -- that is, they will search for jobs at
- /// the *bottom* of their local deque. (At present, workers *always*
- /// steal from the bottom of other workers' deques, regardless of
- /// the setting of this flag.)
- ///
- /// If you think of the tasks as a tree, where a parent task
- /// spawns its children in the tree, then this flag loosely
- /// corresponds to doing a breadth-first traversal of the tree,
- /// whereas the default would be to do a depth-first traversal.
- ///
- /// **Note that this is an "execution hint".** Rayon's task
- /// execution is highly dynamic and the precise order in which
- /// independent tasks are executed is not intended to be
- /// guaranteed.
- ///
- /// This `breadth_first()` method is now deprecated per [RFC #1],
- /// and in the future its effect may be removed. Consider using
- /// [`scope_fifo()`] for a similar effect.
- ///
- /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
- /// [`scope_fifo()`]: fn.scope_fifo.html
- #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
- pub fn breadth_first(mut self) -> Self {
- self.breadth_first = true;
- self
- }
-
- fn get_breadth_first(&self) -> bool {
- self.breadth_first
- }
-
- /// Takes the current thread start callback, leaving `None`.
- fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
- self.start_handler.take()
- }
-
- /// Sets a callback to be invoked on thread start.
- ///
- /// The closure is passed the index of the thread on which it is invoked.
- /// Note that this same closure may be invoked multiple times in parallel.
- /// If this closure panics, the panic will be passed to the panic handler.
- /// If that handler returns, then startup will continue normally.
- pub fn start_handler<H>(mut self, start_handler: H) -> Self
- where
- H: Fn(usize) + Send + Sync + 'static,
- {
- self.start_handler = Some(Box::new(start_handler));
- self
- }
-
- /// Returns a current thread exit callback, leaving `None`.
- fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
- self.exit_handler.take()
- }
-
- /// Sets a callback to be invoked on thread exit.
- ///
- /// The closure is passed the index of the thread on which it is invoked.
- /// Note that this same closure may be invoked multiple times in parallel.
- /// If this closure panics, the panic will be passed to the panic handler.
- /// If that handler returns, then the thread will exit normally.
- pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
- where
- H: Fn(usize) + Send + Sync + 'static,
- {
- self.exit_handler = Some(Box::new(exit_handler));
- self
- }
-}
-
-#[allow(deprecated)]
-impl Configuration {
- /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
- pub fn new() -> Configuration {
- Configuration {
- builder: ThreadPoolBuilder::new(),
- }
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::build`.
- pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
- self.builder.build().map_err(Box::from)
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
- pub fn thread_name<F>(mut self, closure: F) -> Self
- where
- F: FnMut(usize) -> String + 'static,
- {
- self.builder = self.builder.thread_name(closure);
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
- pub fn num_threads(mut self, num_threads: usize) -> Configuration {
- self.builder = self.builder.num_threads(num_threads);
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
- pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
- where
- H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
- {
- self.builder = self.builder.panic_handler(panic_handler);
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
- pub fn stack_size(mut self, stack_size: usize) -> Self {
- self.builder = self.builder.stack_size(stack_size);
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
- pub fn breadth_first(mut self) -> Self {
- self.builder = self.builder.breadth_first();
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
- pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
- where
- H: Fn(usize) + Send + Sync + 'static,
- {
- self.builder = self.builder.start_handler(start_handler);
- self
- }
-
- /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
- pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
- where
- H: Fn(usize) + Send + Sync + 'static,
- {
- self.builder = self.builder.exit_handler(exit_handler);
- self
- }
-
- /// Returns a ThreadPoolBuilder with identical parameters.
- fn into_builder(self) -> ThreadPoolBuilder {
- self.builder
- }
-}
-
-impl ThreadPoolBuildError {
- fn new(kind: ErrorKind) -> ThreadPoolBuildError {
- ThreadPoolBuildError { kind }
- }
-
- fn is_unsupported(&self) -> bool {
- matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
- }
-}
-
-const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
- "The global thread pool has already been initialized.";
-
-const CURRENT_THREAD_ALREADY_IN_POOL: &str =
- "The current thread is already part of another thread pool.";
-
-impl Error for ThreadPoolBuildError {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- match self.kind {
- ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
- ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
- ErrorKind::IOError(ref e) => e.description(),
- }
- }
-
- fn source(&self) -> Option<&(dyn Error + 'static)> {
- match &self.kind {
- ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
- ErrorKind::IOError(e) => Some(e),
- }
- }
-}
-
-impl fmt::Display for ThreadPoolBuildError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match &self.kind {
- ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
- ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
- ErrorKind::IOError(e) => e.fmt(f),
- }
- }
-}
-
-/// Deprecated in favor of `ThreadPoolBuilder::build_global`.
-#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
-#[allow(deprecated)]
-pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
- config.into_builder().build_global().map_err(Box::from)
-}
-
-impl<S> fmt::Debug for ThreadPoolBuilder<S> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let ThreadPoolBuilder {
- ref num_threads,
- ref use_current_thread,
- ref get_thread_name,
- ref panic_handler,
- ref stack_size,
- ref start_handler,
- ref exit_handler,
- spawn_handler: _,
- ref breadth_first,
- } = *self;
-
- // Just print `Some(<closure>)` or `None` to the debug
- // output.
- struct ClosurePlaceholder;
- impl fmt::Debug for ClosurePlaceholder {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.write_str("<closure>")
- }
- }
- let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
- let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
- let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
- let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
-
- f.debug_struct("ThreadPoolBuilder")
- .field("num_threads", num_threads)
- .field("use_current_thread", use_current_thread)
- .field("get_thread_name", &get_thread_name)
- .field("panic_handler", &panic_handler)
- .field("stack_size", &stack_size)
- .field("start_handler", &start_handler)
- .field("exit_handler", &exit_handler)
- .field("breadth_first", &breadth_first)
- .finish()
- }
-}
-
-#[allow(deprecated)]
-impl fmt::Debug for Configuration {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.builder.fmt(f)
- }
-}
-
-/// Provides the calling context to a closure called by `join_context`.
-#[derive(Debug)]
-pub struct FnContext {
- migrated: bool,
-
- /// disable `Send` and `Sync`, just for a little future-proofing.
- _marker: PhantomData<*mut ()>,
-}
-
-impl FnContext {
- #[inline]
- fn new(migrated: bool) -> Self {
- FnContext {
- migrated,
- _marker: PhantomData,
- }
- }
-}
-
-impl FnContext {
- /// Returns `true` if the closure was called from a different thread
- /// than it was provided from.
- #[inline]
- pub fn migrated(&self) -> bool {
- self.migrated
- }
-}
diff --git a/vendor/rayon-core/src/private.rs b/vendor/rayon-core/src/private.rs
deleted file mode 100644
index c85e77b..0000000
--- a/vendor/rayon-core/src/private.rs
+++ /dev/null
@@ -1,26 +0,0 @@
-//! The public parts of this private module are used to create traits
-//! that cannot be implemented outside of our own crate. This way we
-//! can feel free to extend those traits without worrying about it
-//! being a breaking change for other implementations.
-
-/// If this type is pub but not publicly reachable, third parties
-/// can't name it and can't implement traits using it.
-#[allow(missing_debug_implementations)]
-pub struct PrivateMarker;
-
-macro_rules! private_decl {
- () => {
- /// This trait is private; this method exists to make it
- /// impossible to implement outside the crate.
- #[doc(hidden)]
- fn __rayon_private__(&self) -> crate::private::PrivateMarker;
- };
-}
-
-macro_rules! private_impl {
- () => {
- fn __rayon_private__(&self) -> crate::private::PrivateMarker {
- crate::private::PrivateMarker
- }
- };
-}
diff --git a/vendor/rayon-core/src/registry.rs b/vendor/rayon-core/src/registry.rs
deleted file mode 100644
index e4f2ac7..0000000
--- a/vendor/rayon-core/src/registry.rs
+++ /dev/null
@@ -1,995 +0,0 @@
-use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
-use crate::sleep::Sleep;
-use crate::unwind;
-use crate::{
- ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
- Yield,
-};
-use crossbeam_deque::{Injector, Steal, Stealer, Worker};
-use std::cell::Cell;
-use std::collections::hash_map::DefaultHasher;
-use std::fmt;
-use std::hash::Hasher;
-use std::io;
-use std::mem;
-use std::ptr;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex, Once};
-use std::thread;
-use std::usize;
-
-/// Thread builder used for customization via
-/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
-pub struct ThreadBuilder {
- name: Option<String>,
- stack_size: Option<usize>,
- worker: Worker<JobRef>,
- stealer: Stealer<JobRef>,
- registry: Arc<Registry>,
- index: usize,
-}
-
-impl ThreadBuilder {
- /// Gets the index of this thread in the pool, within `0..num_threads`.
- pub fn index(&self) -> usize {
- self.index
- }
-
- /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
- pub fn name(&self) -> Option<&str> {
- self.name.as_deref()
- }
-
- /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
- pub fn stack_size(&self) -> Option<usize> {
- self.stack_size
- }
-
- /// Executes the main loop for this thread. This will not return until the
- /// thread pool is dropped.
- pub fn run(self) {
- unsafe { main_loop(self) }
- }
-}
-
-impl fmt::Debug for ThreadBuilder {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("ThreadBuilder")
- .field("pool", &self.registry.id())
- .field("index", &self.index)
- .field("name", &self.name)
- .field("stack_size", &self.stack_size)
- .finish()
- }
-}
-
-/// Generalized trait for spawning a thread in the `Registry`.
-///
-/// This trait is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-pub trait ThreadSpawn {
- private_decl! {}
-
- /// Spawn a thread with the `ThreadBuilder` parameters, and then
- /// call `ThreadBuilder::run()`.
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
-}
-
-/// Spawns a thread in the "normal" way with `std::thread::Builder`.
-///
-/// This type is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-#[derive(Debug, Default)]
-pub struct DefaultSpawn;
-
-impl ThreadSpawn for DefaultSpawn {
- private_impl! {}
-
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
- let mut b = thread::Builder::new();
- if let Some(name) = thread.name() {
- b = b.name(name.to_owned());
- }
- if let Some(stack_size) = thread.stack_size() {
- b = b.stack_size(stack_size);
- }
- b.spawn(|| thread.run())?;
- Ok(())
- }
-}
-
-/// Spawns a thread with a user's custom callback.
-///
-/// This type is pub-in-private -- E0445 forces us to make it public,
-/// but we don't actually want to expose these details in the API.
-#[derive(Debug)]
-pub struct CustomSpawn<F>(F);
-
-impl<F> CustomSpawn<F>
-where
- F: FnMut(ThreadBuilder) -> io::Result<()>,
-{
- pub(super) fn new(spawn: F) -> Self {
- CustomSpawn(spawn)
- }
-}
-
-impl<F> ThreadSpawn for CustomSpawn<F>
-where
- F: FnMut(ThreadBuilder) -> io::Result<()>,
-{
- private_impl! {}
-
- #[inline]
- fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
- (self.0)(thread)
- }
-}
-
-pub(super) struct Registry {
- thread_infos: Vec<ThreadInfo>,
- sleep: Sleep,
- injected_jobs: Injector<JobRef>,
- broadcasts: Mutex<Vec<Worker<JobRef>>>,
- panic_handler: Option<Box<PanicHandler>>,
- start_handler: Option<Box<StartHandler>>,
- exit_handler: Option<Box<ExitHandler>>,
-
- // When this latch reaches 0, it means that all work on this
- // registry must be complete. This is ensured in the following ways:
- //
- // - if this is the global registry, there is a ref-count that never
- // gets released.
- // - if this is a user-created thread-pool, then so long as the thread-pool
- // exists, it holds a reference.
- // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
- // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
- // return until the blocking job is complete, that ref will continue to be held.
- // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
- // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
- // and that job will keep the pool alive.
- terminate_count: AtomicUsize,
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-/// Initialization
-
-static mut THE_REGISTRY: Option<Arc<Registry>> = None;
-static THE_REGISTRY_SET: Once = Once::new();
-
-/// Starts the worker threads (if that has not already happened). If
-/// initialization has not already occurred, use the default
-/// configuration.
-pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(default_global_registry)
- .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
- .expect("The global thread pool has not been initialized.")
-}
-
-/// Starts the worker threads (if that has not already happened) with
-/// the given builder.
-pub(super) fn init_global_registry<S>(
- builder: ThreadPoolBuilder<S>,
-) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
-where
- S: ThreadSpawn,
-{
- set_global_registry(|| Registry::new(builder))
-}
-
-/// Starts the worker threads (if that has not already happened)
-/// by creating a registry with the given callback.
-fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
-where
- F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
-{
- let mut result = Err(ThreadPoolBuildError::new(
- ErrorKind::GlobalPoolAlreadyInitialized,
- ));
-
- THE_REGISTRY_SET.call_once(|| {
- result = registry()
- .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
- });
-
- result
-}
-
-fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
- let result = Registry::new(ThreadPoolBuilder::new());
-
- // If we're running in an environment that doesn't support threads at all, we can fall back to
- // using the current thread alone. This is crude, and probably won't work for non-blocking
- // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
- //
- // Notably, this allows current WebAssembly targets to work even though their threading support
- // is stubbed out, and we won't have to change anything if they do add real threading.
- let unsupported = matches!(&result, Err(e) if e.is_unsupported());
- if unsupported && WorkerThread::current().is_null() {
- let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread();
- let fallback_result = Registry::new(builder);
- if fallback_result.is_ok() {
- return fallback_result;
- }
- }
-
- result
-}
-
-struct Terminator<'a>(&'a Arc<Registry>);
-
-impl<'a> Drop for Terminator<'a> {
- fn drop(&mut self) {
- self.0.terminate()
- }
-}
-
-impl Registry {
- pub(super) fn new<S>(
- mut builder: ThreadPoolBuilder<S>,
- ) -> Result<Arc<Self>, ThreadPoolBuildError>
- where
- S: ThreadSpawn,
- {
- // Soft-limit the number of threads that we can actually support.
- let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
-
- let breadth_first = builder.get_breadth_first();
-
- let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
- .map(|_| {
- let worker = if breadth_first {
- Worker::new_fifo()
- } else {
- Worker::new_lifo()
- };
-
- let stealer = worker.stealer();
- (worker, stealer)
- })
- .unzip();
-
- let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
- .map(|_| {
- let worker = Worker::new_fifo();
- let stealer = worker.stealer();
- (worker, stealer)
- })
- .unzip();
-
- let registry = Arc::new(Registry {
- thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
- sleep: Sleep::new(n_threads),
- injected_jobs: Injector::new(),
- broadcasts: Mutex::new(broadcasts),
- terminate_count: AtomicUsize::new(1),
- panic_handler: builder.take_panic_handler(),
- start_handler: builder.take_start_handler(),
- exit_handler: builder.take_exit_handler(),
- });
-
- // If we return early or panic, make sure to terminate existing threads.
- let t1000 = Terminator(&registry);
-
- for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
- let thread = ThreadBuilder {
- name: builder.get_thread_name(index),
- stack_size: builder.get_stack_size(),
- registry: Arc::clone(&registry),
- worker,
- stealer,
- index,
- };
-
- if index == 0 && builder.use_current_thread {
- if !WorkerThread::current().is_null() {
- return Err(ThreadPoolBuildError::new(
- ErrorKind::CurrentThreadAlreadyInPool,
- ));
- }
- // Rather than starting a new thread, we're just taking over the current thread
- // *without* running the main loop, so we can still return from here.
- // The WorkerThread is leaked, but we never shutdown the global pool anyway.
- let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));
-
- unsafe {
- WorkerThread::set_current(worker_thread);
- Latch::set(&registry.thread_infos[index].primed);
- }
- continue;
- }
-
- if let Err(e) = builder.get_spawn_handler().spawn(thread) {
- return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
- }
- }
-
- // Returning normally now, without termination.
- mem::forget(t1000);
-
- Ok(registry)
- }
-
- pub(super) fn current() -> Arc<Registry> {
- unsafe {
- let worker_thread = WorkerThread::current();
- let registry = if worker_thread.is_null() {
- global_registry()
- } else {
- &(*worker_thread).registry
- };
- Arc::clone(registry)
- }
- }
-
- /// Returns the number of threads in the current registry. This
- /// is better than `Registry::current().num_threads()` because it
- /// avoids incrementing the `Arc`.
- pub(super) fn current_num_threads() -> usize {
- unsafe {
- let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- global_registry().num_threads()
- } else {
- (*worker_thread).registry.num_threads()
- }
- }
- }
-
- /// Returns the current `WorkerThread` if it's part of this `Registry`.
- pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
- unsafe {
- let worker = WorkerThread::current().as_ref()?;
- if worker.registry().id() == self.id() {
- Some(worker)
- } else {
- None
- }
- }
- }
-
- /// Returns an opaque identifier for this registry.
- pub(super) fn id(&self) -> RegistryId {
- // We can rely on `self` not to change since we only ever create
- // registries that are boxed up in an `Arc` (see `new()` above).
- RegistryId {
- addr: self as *const Self as usize,
- }
- }
-
- pub(super) fn num_threads(&self) -> usize {
- self.thread_infos.len()
- }
-
- pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
- if let Err(err) = unwind::halt_unwinding(f) {
- // If there is no handler, or if that handler itself panics, then we abort.
- let abort_guard = unwind::AbortIfPanic;
- if let Some(ref handler) = self.panic_handler {
- handler(err);
- mem::forget(abort_guard);
- }
- }
- }
-
- /// Waits for the worker threads to get up and running. This is
- /// meant to be used for benchmarking purposes, primarily, so that
- /// you can get more consistent numbers by having everything
- /// "ready to go".
- pub(super) fn wait_until_primed(&self) {
- for info in &self.thread_infos {
- info.primed.wait();
- }
- }
-
- /// Waits for the worker threads to stop. This is used for testing
- /// -- so we can check that termination actually works.
- #[cfg(test)]
- pub(super) fn wait_until_stopped(&self) {
- for info in &self.thread_infos {
- info.stopped.wait();
- }
- }
-
- /// ////////////////////////////////////////////////////////////////////////
- /// MAIN LOOP
- ///
- /// So long as all of the worker threads are hanging out in their
- /// top-level loop, there is no work to be done.
-
- /// Push a job into the given `registry`. If we are running on a
- /// worker thread for the registry, this will push onto the
- /// deque. Else, it will inject from the outside (which is slower).
- pub(super) fn inject_or_push(&self, job_ref: JobRef) {
- let worker_thread = WorkerThread::current();
- unsafe {
- if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
- (*worker_thread).push(job_ref);
- } else {
- self.inject(job_ref);
- }
- }
- }
-
- /// Push a job into the "external jobs" queue; it will be taken by
- /// whatever worker has nothing to do. Use this if you know that
- /// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_job: JobRef) {
- // It should not be possible for `state.terminate` to be true
- // here. It is only set to true when the user creates (and
- // drops) a `ThreadPool`; and, in that case, they cannot be
- // calling `inject()` later, since they dropped their
- // `ThreadPool`.
- debug_assert_ne!(
- self.terminate_count.load(Ordering::Acquire),
- 0,
- "inject() sees state.terminate as true"
- );
-
- let queue_was_empty = self.injected_jobs.is_empty();
-
- self.injected_jobs.push(injected_job);
- self.sleep.new_injected_jobs(1, queue_was_empty);
- }
-
- fn has_injected_job(&self) -> bool {
- !self.injected_jobs.is_empty()
- }
-
- fn pop_injected_job(&self) -> Option<JobRef> {
- loop {
- match self.injected_jobs.steal() {
- Steal::Success(job) => return Some(job),
- Steal::Empty => return None,
- Steal::Retry => {}
- }
- }
- }
-
- /// Push a job into each thread's own "external jobs" queue; it will be
- /// executed only on that thread, when it has nothing else to do locally,
- /// before it tries to steal other work.
- ///
- /// **Panics** if not given exactly as many jobs as there are threads.
- pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
- assert_eq!(self.num_threads(), injected_jobs.len());
- {
- let broadcasts = self.broadcasts.lock().unwrap();
-
- // It should not be possible for `state.terminate` to be true
- // here. It is only set to true when the user creates (and
- // drops) a `ThreadPool`; and, in that case, they cannot be
- // calling `inject_broadcast()` later, since they dropped their
- // `ThreadPool`.
- debug_assert_ne!(
- self.terminate_count.load(Ordering::Acquire),
- 0,
- "inject_broadcast() sees state.terminate as true"
- );
-
- assert_eq!(broadcasts.len(), injected_jobs.len());
- for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
- worker.push(job_ref);
- }
- }
- for i in 0..self.num_threads() {
- self.sleep.notify_worker_latch_is_set(i);
- }
- }
-
- /// If already in a worker-thread of this registry, just execute `op`.
- /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
- /// completes and return its return value. If `op` panics, that panic will
- /// be propagated as well. The second argument indicates `true` if injection
- /// was performed, `false` if executed directly.
- pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- unsafe {
- let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- self.in_worker_cold(op)
- } else if (*worker_thread).registry().id() != self.id() {
- self.in_worker_cross(&*worker_thread, op)
- } else {
- // Perfectly valid to give them a `&T`: this is the
- // current thread, so we know the data structure won't be
- // invalidated until we return.
- op(&*worker_thread, false)
- }
- }
- }
-
- #[cold]
- unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
-
- LOCK_LATCH.with(|l| {
- // This thread isn't a member of *any* thread pool, so just block.
- debug_assert!(WorkerThread::current().is_null());
- let job = StackJob::new(
- |injected| {
- let worker_thread = WorkerThread::current();
- assert!(injected && !worker_thread.is_null());
- op(&*worker_thread, true)
- },
- LatchRef::new(l),
- );
- self.inject(job.as_job_ref());
- job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
-
- job.into_result()
- })
- }
-
- #[cold]
- unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
- where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
- {
- // This thread is a member of a different pool, so let it process
- // other work while waiting for this `op` to complete.
- debug_assert!(current_thread.registry().id() != self.id());
- let latch = SpinLatch::cross(current_thread);
- let job = StackJob::new(
- |injected| {
- let worker_thread = WorkerThread::current();
- assert!(injected && !worker_thread.is_null());
- op(&*worker_thread, true)
- },
- latch,
- );
- self.inject(job.as_job_ref());
- current_thread.wait_until(&job.latch);
- job.into_result()
- }
-
- /// Increments the terminate counter. This increment should be
- /// balanced by a call to `terminate`, which will decrement. This
- /// is used when spawning asynchronous work, which needs to
- /// prevent the registry from terminating so long as it is active.
- ///
- /// Note that blocking functions such as `join` and `scope` do not
- /// need to concern themselves with this fn; their context is
- /// responsible for ensuring the current thread-pool will not
- /// terminate until they return.
- ///
- /// The global thread-pool always has an outstanding reference
- /// (the initial one). Custom thread-pools have one outstanding
- /// reference that is dropped when the `ThreadPool` is dropped:
- /// since installing the thread-pool blocks until any joins/scopes
- /// complete, this ensures that joins/scopes are covered.
- ///
- /// The exception is `::spawn()`, which can create a job outside
- /// of any blocking scope. In that case, the job itself holds a
- /// terminate count and is responsible for invoking `terminate()`
- /// when finished.
- pub(super) fn increment_terminate_count(&self) {
- let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
- debug_assert!(previous != 0, "registry ref count incremented from zero");
- assert!(
- previous != std::usize::MAX,
- "overflow in registry ref count"
- );
- }
-
- /// Signals that the thread-pool which owns this registry has been
- /// dropped. The worker threads will gradually terminate, once any
- /// extant work is completed.
- pub(super) fn terminate(&self) {
- if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
- for (i, thread_info) in self.thread_infos.iter().enumerate() {
- unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
- }
- }
- }
-
- /// Notify the worker that the latch they are sleeping on has been "set".
- pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
- self.sleep.notify_worker_latch_is_set(target_worker_index);
- }
-}
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
-pub(super) struct RegistryId {
- addr: usize,
-}
-
-struct ThreadInfo {
- /// Latch set once thread has started and we are entering into the
- /// main loop. Used to wait for worker threads to become primed,
- /// primarily of interest for benchmarking.
- primed: LockLatch,
-
- /// Latch is set once worker thread has completed. Used to wait
- /// until workers have stopped; only used for tests.
- stopped: LockLatch,
-
- /// The latch used to signal that terminated has been requested.
- /// This latch is *set* by the `terminate` method on the
- /// `Registry`, once the registry's main "terminate" counter
- /// reaches zero.
- terminate: OnceLatch,
-
- /// the "stealer" half of the worker's deque
- stealer: Stealer<JobRef>,
-}
-
-impl ThreadInfo {
- fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
- ThreadInfo {
- primed: LockLatch::new(),
- stopped: LockLatch::new(),
- terminate: OnceLatch::new(),
- stealer,
- }
- }
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-/// WorkerThread identifiers
-
-pub(super) struct WorkerThread {
- /// the "worker" half of our local deque
- worker: Worker<JobRef>,
-
- /// the "stealer" half of the worker's broadcast deque
- stealer: Stealer<JobRef>,
-
- /// local queue used for `spawn_fifo` indirection
- fifo: JobFifo,
-
- index: usize,
-
- /// A weak random number generator.
- rng: XorShift64Star,
-
- registry: Arc<Registry>,
-}
-
-// This is a bit sketchy, but basically: the WorkerThread is
-// allocated on the stack of the worker on entry and stored into this
-// thread local variable. So it will remain valid at least until the
-// worker is fully unwound. Using an unsafe pointer avoids the need
-// for a RefCell<T> etc.
-thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
-}
-
-impl From<ThreadBuilder> for WorkerThread {
- fn from(thread: ThreadBuilder) -> Self {
- Self {
- worker: thread.worker,
- stealer: thread.stealer,
- fifo: JobFifo::new(),
- index: thread.index,
- rng: XorShift64Star::new(),
- registry: thread.registry,
- }
- }
-}
-
-impl Drop for WorkerThread {
- fn drop(&mut self) {
- // Undo `set_current`
- WORKER_THREAD_STATE.with(|t| {
- assert!(t.get().eq(&(self as *const _)));
- t.set(ptr::null());
- });
- }
-}
-
-impl WorkerThread {
- /// Gets the `WorkerThread` index for the current thread; returns
- /// NULL if this is not a worker thread. This pointer is valid
- /// anywhere on the current thread.
- #[inline]
- pub(super) fn current() -> *const WorkerThread {
- WORKER_THREAD_STATE.with(Cell::get)
- }
-
- /// Sets `self` as the worker thread index for the current thread.
- /// This is done during worker thread startup.
- unsafe fn set_current(thread: *const WorkerThread) {
- WORKER_THREAD_STATE.with(|t| {
- assert!(t.get().is_null());
- t.set(thread);
- });
- }
-
- /// Returns the registry that owns this worker thread.
- #[inline]
- pub(super) fn registry(&self) -> &Arc<Registry> {
- &self.registry
- }
-
- /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
- #[inline]
- pub(super) fn index(&self) -> usize {
- self.index
- }
-
- #[inline]
- pub(super) unsafe fn push(&self, job: JobRef) {
- let queue_was_empty = self.worker.is_empty();
- self.worker.push(job);
- self.registry.sleep.new_internal_jobs(1, queue_was_empty);
- }
-
- #[inline]
- pub(super) unsafe fn push_fifo(&self, job: JobRef) {
- self.push(self.fifo.push(job));
- }
-
- #[inline]
- pub(super) fn local_deque_is_empty(&self) -> bool {
- self.worker.is_empty()
- }
-
- /// Attempts to obtain a "local" job -- typically this means
- /// popping from the top of the stack, though if we are configured
- /// for breadth-first execution, it would mean dequeuing from the
- /// bottom.
- #[inline]
- pub(super) fn take_local_job(&self) -> Option<JobRef> {
- let popped_job = self.worker.pop();
-
- if popped_job.is_some() {
- return popped_job;
- }
-
- loop {
- match self.stealer.steal() {
- Steal::Success(job) => return Some(job),
- Steal::Empty => return None,
- Steal::Retry => {}
- }
- }
- }
-
- fn has_injected_job(&self) -> bool {
- !self.stealer.is_empty() || self.registry.has_injected_job()
- }
-
- /// Wait until the latch is set. Try to keep busy by popping and
- /// stealing tasks as necessary.
- #[inline]
- pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
- let latch = latch.as_core_latch();
- if !latch.probe() {
- self.wait_until_cold(latch);
- }
- }
-
- #[cold]
- unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
- // the code below should swallow all panics and hence never
- // unwind; but if something does wrong, we want to abort,
- // because otherwise other code in rayon may assume that the
- // latch has been signaled, and that can lead to random memory
- // accesses, which would be *very bad*
- let abort_guard = unwind::AbortIfPanic;
-
- 'outer: while !latch.probe() {
- // Check for local work *before* we start marking ourself idle,
- // especially to avoid modifying shared sleep state.
- if let Some(job) = self.take_local_job() {
- self.execute(job);
- continue;
- }
-
- let mut idle_state = self.registry.sleep.start_looking(self.index);
- while !latch.probe() {
- if let Some(job) = self.find_work() {
- self.registry.sleep.work_found();
- self.execute(job);
- // The job might have injected local work, so go back to the outer loop.
- continue 'outer;
- } else {
- self.registry
- .sleep
- .no_work_found(&mut idle_state, latch, || self.has_injected_job())
- }
- }
-
- // If we were sleepy, we are not anymore. We "found work" --
- // whatever the surrounding thread was doing before it had to wait.
- self.registry.sleep.work_found();
- break;
- }
-
- mem::forget(abort_guard); // successful execution, do not abort
- }
-
- unsafe fn wait_until_out_of_work(&self) {
- debug_assert_eq!(self as *const _, WorkerThread::current());
- let registry = &*self.registry;
- let index = self.index;
-
- self.wait_until(&registry.thread_infos[index].terminate);
-
- // Should not be any work left in our queue.
- debug_assert!(self.take_local_job().is_none());
-
- // Let registry know we are done
- Latch::set(&registry.thread_infos[index].stopped);
- }
-
- fn find_work(&self) -> Option<JobRef> {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- self.take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job())
- }
-
- pub(super) fn yield_now(&self) -> Yield {
- match self.find_work() {
- Some(job) => unsafe {
- self.execute(job);
- Yield::Executed
- },
- None => Yield::Idle,
- }
- }
-
- pub(super) fn yield_local(&self) -> Yield {
- match self.take_local_job() {
- Some(job) => unsafe {
- self.execute(job);
- Yield::Executed
- },
- None => Yield::Idle,
- }
- }
-
- #[inline]
- pub(super) unsafe fn execute(&self, job: JobRef) {
- job.execute();
- }
-
- /// Try to steal a single job and return it.
- ///
- /// This should only be done as a last resort, when there is no
- /// local work to do.
- fn steal(&self) -> Option<JobRef> {
- // we only steal when we don't have any work to do locally
- debug_assert!(self.local_deque_is_empty());
-
- // otherwise, try to steal
- let thread_infos = &self.registry.thread_infos.as_slice();
- let num_threads = thread_infos.len();
- if num_threads <= 1 {
- return None;
- }
-
- loop {
- let mut retry = false;
- let start = self.rng.next_usize(num_threads);
- let job = (start..num_threads)
- .chain(0..start)
- .filter(move |&i| i != self.index)
- .find_map(|victim_index| {
- let victim = &thread_infos[victim_index];
- match victim.stealer.steal() {
- Steal::Success(job) => Some(job),
- Steal::Empty => None,
- Steal::Retry => {
- retry = true;
- None
- }
- }
- });
- if job.is_some() || !retry {
- return job;
- }
- }
- }
-}
-
-/// ////////////////////////////////////////////////////////////////////////
-
-unsafe fn main_loop(thread: ThreadBuilder) {
- let worker_thread = &WorkerThread::from(thread);
- WorkerThread::set_current(worker_thread);
- let registry = &*worker_thread.registry;
- let index = worker_thread.index;
-
- // let registry know we are ready to do work
- Latch::set(&registry.thread_infos[index].primed);
-
- // Worker threads should not panic. If they do, just abort, as the
- // internal state of the threadpool is corrupted. Note that if
- // **user code** panics, we should catch that and redirect.
- let abort_guard = unwind::AbortIfPanic;
-
- // Inform a user callback that we started a thread.
- if let Some(ref handler) = registry.start_handler {
- registry.catch_unwind(|| handler(index));
- }
-
- worker_thread.wait_until_out_of_work();
-
- // Normal termination, do not abort.
- mem::forget(abort_guard);
-
- // Inform a user callback that we exited a thread.
- if let Some(ref handler) = registry.exit_handler {
- registry.catch_unwind(|| handler(index));
- // We're already exiting the thread, there's nothing else to do.
- }
-}
-
-/// If already in a worker-thread, just execute `op`. Otherwise,
-/// execute `op` in the default thread-pool. Either way, block until
-/// `op` completes and return its return value. If `op` panics, that
-/// panic will be propagated as well. The second argument indicates
-/// `true` if injection was performed, `false` if executed directly.
-pub(super) fn in_worker<OP, R>(op: OP) -> R
-where
- OP: FnOnce(&WorkerThread, bool) -> R + Send,
- R: Send,
-{
- unsafe {
- let owner_thread = WorkerThread::current();
- if !owner_thread.is_null() {
- // Perfectly valid to give them a `&T`: this is the
- // current thread, so we know the data structure won't be
- // invalidated until we return.
- op(&*owner_thread, false)
- } else {
- global_registry().in_worker(op)
- }
- }
-}
-
-/// [xorshift*] is a fast pseudorandom number generator which will
-/// even tolerate weak seeding, as long as it's not zero.
-///
-/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
-struct XorShift64Star {
- state: Cell<u64>,
-}
-
-impl XorShift64Star {
- fn new() -> Self {
- // Any non-zero seed will do -- this uses the hash of a global counter.
- let mut seed = 0;
- while seed == 0 {
- let mut hasher = DefaultHasher::new();
- static COUNTER: AtomicUsize = AtomicUsize::new(0);
- hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
- seed = hasher.finish();
- }
-
- XorShift64Star {
- state: Cell::new(seed),
- }
- }
-
- fn next(&self) -> u64 {
- let mut x = self.state.get();
- debug_assert_ne!(x, 0);
- x ^= x >> 12;
- x ^= x << 25;
- x ^= x >> 27;
- self.state.set(x);
- x.wrapping_mul(0x2545_f491_4f6c_dd1d)
- }
-
- /// Return a value from `0..n`.
- fn next_usize(&self, n: usize) -> usize {
- (self.next() % n as u64) as usize
- }
-}
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
deleted file mode 100644
index b7163d1..0000000
--- a/vendor/rayon-core/src/scope/mod.rs
+++ /dev/null
@@ -1,769 +0,0 @@
-//! Methods for custom fork-join scopes, created by the [`scope()`]
-//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
-//!
-//! [`scope()`]: fn.scope.html
-//! [`in_place_scope()`]: fn.in_place_scope.html
-//! [`join()`]: ../join/join.fn.html
-
-use crate::broadcast::BroadcastContext;
-use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
-use crate::latch::{CountLatch, Latch};
-use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
-use crate::unwind;
-use std::any::Any;
-use std::fmt;
-use std::marker::PhantomData;
-use std::mem::ManuallyDrop;
-use std::ptr;
-use std::sync::atomic::{AtomicPtr, Ordering};
-use std::sync::Arc;
-
-#[cfg(test)]
-mod test;
-
-/// Represents a fork-join scope which can be used to spawn any number of tasks.
-/// See [`scope()`] for more information.
-///
-///[`scope()`]: fn.scope.html
-pub struct Scope<'scope> {
- base: ScopeBase<'scope>,
-}
-
-/// Represents a fork-join scope which can be used to spawn any number of tasks.
-/// Those spawned from the same thread are prioritized in relative FIFO order.
-/// See [`scope_fifo()`] for more information.
-///
-///[`scope_fifo()`]: fn.scope_fifo.html
-pub struct ScopeFifo<'scope> {
- base: ScopeBase<'scope>,
- fifos: Vec<JobFifo>,
-}
-
-struct ScopeBase<'scope> {
- /// thread registry where `scope()` was executed or where `in_place_scope()`
- /// should spawn jobs.
- registry: Arc<Registry>,
-
- /// if some job panicked, the error is stored here; it will be
- /// propagated to the one who created the scope
- panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
-
- /// latch to track job counts
- job_completed_latch: CountLatch,
-
- /// You can think of a scope as containing a list of closures to execute,
- /// all of which outlive `'scope`. They're not actually required to be
- /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
- /// the closures are only *moved* across threads to be executed.
- marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
-}
-
-/// Creates a "fork-join" scope `s` and invokes the closure with a
-/// reference to `s`. This closure can then spawn asynchronous tasks
-/// into `s`. Those tasks may run asynchronously with respect to the
-/// closure; they may themselves spawn additional tasks into `s`. When
-/// the closure returns, it will block until all tasks that have been
-/// spawned into `s` complete.
-///
-/// `scope()` is a more flexible building block compared to `join()`,
-/// since a loop can be used to spawn any number of tasks without
-/// recursing. However, that flexibility comes at a performance price:
-/// tasks spawned using `scope()` must be allocated onto the heap,
-/// whereas `join()` can make exclusive use of the stack. **Prefer
-/// `join()` (or, even better, parallel iterators) where possible.**
-///
-/// # Example
-///
-/// The Rayon `join()` function launches two closures and waits for them
-/// to stop. One could implement `join()` using a scope like so, although
-/// it would be less efficient than the real implementation:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
-/// where A: FnOnce() -> RA + Send,
-/// B: FnOnce() -> RB + Send,
-/// RA: Send,
-/// RB: Send,
-/// {
-/// let mut result_a: Option<RA> = None;
-/// let mut result_b: Option<RB> = None;
-/// rayon::scope(|s| {
-/// s.spawn(|_| result_a = Some(oper_a()));
-/// s.spawn(|_| result_b = Some(oper_b()));
-/// });
-/// (result_a.unwrap(), result_b.unwrap())
-/// }
-/// ```
-///
-/// # A note on threading
-///
-/// The closure given to `scope()` executes in the Rayon thread-pool,
-/// as do those given to `spawn()`. This means that you can't access
-/// thread-local variables (well, you can, but they may have
-/// unexpected values).
-///
-/// # Task execution
-///
-/// Task execution potentially starts as soon as `spawn()` is called.
-/// The task will end sometime before `scope()` returns. Note that the
-/// *closure* given to scope may return much earlier. In general
-/// the lifetime of a scope created like `scope(body)` goes something like this:
-///
-/// - Scope begins when `scope(body)` is called
-/// - Scope body `body()` is invoked
-/// - Scope tasks may be spawned
-/// - Scope body returns
-/// - Scope tasks execute, possibly spawning more tasks
-/// - Once all tasks are done, scope ends and `scope()` returns
-///
-/// To see how and when tasks are joined, consider this example:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// // point start
-/// rayon::scope(|s| {
-/// s.spawn(|s| { // task s.1
-/// s.spawn(|s| { // task s.1.1
-/// rayon::scope(|t| {
-/// t.spawn(|_| ()); // task t.1
-/// t.spawn(|_| ()); // task t.2
-/// });
-/// });
-/// });
-/// s.spawn(|s| { // task s.2
-/// });
-/// // point mid
-/// });
-/// // point end
-/// ```
-///
-/// The various tasks that are run will execute roughly like so:
-///
-/// ```notrust
-/// | (start)
-/// |
-/// | (scope `s` created)
-/// +-----------------------------------------------+ (task s.2)
-/// +-------+ (task s.1) |
-/// | | |
-/// | +---+ (task s.1.1) |
-/// | | | |
-/// | | | (scope `t` created) |
-/// | | +----------------+ (task t.2) |
-/// | | +---+ (task t.1) | |
-/// | (mid) | | | | |
-/// : | + <-+------------+ (scope `t` ends) |
-/// : | | |
-/// |<------+---+-----------------------------------+ (scope `s` ends)
-/// |
-/// | (end)
-/// ```
-///
-/// The point here is that everything spawned into scope `s` will
-/// terminate (at latest) at the same point -- right before the
-/// original call to `rayon::scope` returns. This includes new
-/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
-/// scope is created (such as `t`), the things spawned into that scope
-/// will be joined before that scope returns, which in turn occurs
-/// before the creating task (task `s.1.1` in this case) finishes.
-///
-/// There is no guaranteed order of execution for spawns in a scope,
-/// given that other threads may steal tasks at any time. However, they
-/// are generally prioritized in a LIFO order on the thread from which
-/// they were spawned. So in this example, absent any stealing, we can
-/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
-/// threads always steal from the other end of the deque, like FIFO
-/// order. The idea is that "recent" tasks are most likely to be fresh
-/// in the local CPU's cache, while other threads can steal older
-/// "stale" tasks. For an alternate approach, consider
-/// [`scope_fifo()`] instead.
-///
-/// [`scope_fifo()`]: fn.scope_fifo.html
-///
-/// # Accessing stack data
-///
-/// In general, spawned tasks may access stack data in place that
-/// outlives the scope itself. Other data must be fully owned by the
-/// spawned task.
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(|_| {
-/// // We can access `ok` because outlives the scope `s`.
-/// println!("ok: {:?}", ok);
-///
-/// // If we just try to use `bad` here, the closure will borrow `bad`
-/// // (because we are just printing it out, and that only requires a
-/// // borrow), which will result in a compilation error. Read on
-/// // for options.
-/// // println!("bad: {:?}", bad);
-/// });
-/// });
-/// ```
-///
-/// As the comments example above suggest, to reference `bad` we must
-/// take ownership of it. One way to do this is to detach the closure
-/// from the surrounding stack frame, using the `move` keyword. This
-/// will cause it to take ownership of *all* the variables it touches,
-/// in this case including both `ok` *and* `bad`:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(move |_| {
-/// println!("ok: {:?}", ok);
-/// println!("bad: {:?}", bad);
-/// });
-///
-/// // That closure is fine, but now we can't use `ok` anywhere else,
-/// // since it is owned by the previous task:
-/// // s.spawn(|_| println!("ok: {:?}", ok));
-/// });
-/// ```
-///
-/// While this works, it could be a problem if we want to use `ok` elsewhere.
-/// There are two choices. We can keep the closure as a `move` closure, but
-/// instead of referencing the variable `ok`, we create a shadowed variable that
-/// is a borrow of `ok` and capture *that*:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
-/// s.spawn(move |_| {
-/// println!("ok: {:?}", ok); // captures the shadowed version
-/// println!("bad: {:?}", bad);
-/// });
-///
-/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
-/// // can be shared freely. Note that we need a `move` closure here though,
-/// // because otherwise we'd be trying to borrow the shadowed `ok`,
-/// // and that doesn't outlive `scope`.
-/// s.spawn(move |_| println!("ok: {:?}", ok));
-/// });
-/// ```
-///
-/// Another option is not to use the `move` keyword but instead to take ownership
-/// of individual variables:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(|_| {
-/// // Transfer ownership of `bad` into a local variable (also named `bad`).
-/// // This will force the closure to take ownership of `bad` from the environment.
-/// let bad = bad;
-/// println!("ok: {:?}", ok); // `ok` is only borrowed.
-/// println!("bad: {:?}", bad); // refers to our local variable, above.
-/// });
-///
-/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
-/// });
-/// ```
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `scope()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `scope()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn()`, it will
-/// execute, even if the spawning task should later panic. `scope()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn scope<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R + Send,
- R: Send,
-{
- in_worker(|owner_thread, _| {
- let scope = Scope::<'scope>::new(Some(owner_thread), None);
- scope.base.complete(Some(owner_thread), || op(&scope))
- })
-}
-
-/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
-/// closure with a reference to `s`. This closure can then spawn
-/// asynchronous tasks into `s`. Those tasks may run asynchronously with
-/// respect to the closure; they may themselves spawn additional tasks
-/// into `s`. When the closure returns, it will block until all tasks
-/// that have been spawned into `s` complete.
-///
-/// # Task execution
-///
-/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
-/// difference in the order of execution. Consider a similar example:
-///
-/// [`scope()`]: fn.scope.html
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// // point start
-/// rayon::scope_fifo(|s| {
-/// s.spawn_fifo(|s| { // task s.1
-/// s.spawn_fifo(|s| { // task s.1.1
-/// rayon::scope_fifo(|t| {
-/// t.spawn_fifo(|_| ()); // task t.1
-/// t.spawn_fifo(|_| ()); // task t.2
-/// });
-/// });
-/// });
-/// s.spawn_fifo(|s| { // task s.2
-/// });
-/// // point mid
-/// });
-/// // point end
-/// ```
-///
-/// The various tasks that are run will execute roughly like so:
-///
-/// ```notrust
-/// | (start)
-/// |
-/// | (FIFO scope `s` created)
-/// +--------------------+ (task s.1)
-/// +-------+ (task s.2) |
-/// | | +---+ (task s.1.1)
-/// | | | |
-/// | | | | (FIFO scope `t` created)
-/// | | | +----------------+ (task t.1)
-/// | | | +---+ (task t.2) |
-/// | (mid) | | | | |
-/// : | | + <-+------------+ (scope `t` ends)
-/// : | | |
-/// |<------+------------+---+ (scope `s` ends)
-/// |
-/// | (end)
-/// ```
-///
-/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
-/// the thread from which they were spawned, as opposed to `scope()`'s
-/// LIFO. So in this example, we can expect `s.1` to execute before
-/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
-/// FIFO order, as usual. Overall, this has roughly the same order as
-/// the now-deprecated [`breadth_first`] option, except the effect is
-/// isolated to a particular scope. If spawns are intermingled from any
-/// combination of `scope()` and `scope_fifo()`, or from different
-/// threads, their order is only specified with respect to spawns in the
-/// same scope and thread.
-///
-/// For more details on this design, see Rayon [RFC #1].
-///
-/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
-/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `scope_fifo()` or
-/// in any of the spawned jobs, that panic will be propagated and the
-/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
-/// will execute, even if the spawning task should later panic.
-/// `scope_fifo()` returns once all spawned jobs have completed, and any
-/// panics are propagated at that point.
-pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
- R: Send,
-{
- in_worker(|owner_thread, _| {
- let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
- scope.base.complete(Some(owner_thread), || op(&scope))
- })
-}
-
-/// Creates a "fork-join" scope `s` and invokes the closure with a
-/// reference to `s`. This closure can then spawn asynchronous tasks
-/// into `s`. Those tasks may run asynchronously with respect to the
-/// closure; they may themselves spawn additional tasks into `s`. When
-/// the closure returns, it will block until all tasks that have been
-/// spawned into `s` complete.
-///
-/// This is just like `scope()` except the closure runs on the same thread
-/// that calls `in_place_scope()`. Only work that it spawns runs in the
-/// thread pool.
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `in_place_scope()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn()`, it will
-/// execute, even if the spawning task should later panic. `in_place_scope()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R,
-{
- do_in_place_scope(None, op)
-}
-
-pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R,
-{
- let thread = unsafe { WorkerThread::current().as_ref() };
- let scope = Scope::<'scope>::new(thread, registry);
- scope.base.complete(thread, || op(&scope))
-}
-
-/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
-/// closure with a reference to `s`. This closure can then spawn
-/// asynchronous tasks into `s`. Those tasks may run asynchronously with
-/// respect to the closure; they may themselves spawn additional tasks
-/// into `s`. When the closure returns, it will block until all tasks
-/// that have been spawned into `s` complete.
-///
-/// This is just like `scope_fifo()` except the closure runs on the same thread
-/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
-/// thread pool.
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
-/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R,
-{
- do_in_place_scope_fifo(None, op)
-}
-
-pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R,
-{
- let thread = unsafe { WorkerThread::current().as_ref() };
- let scope = ScopeFifo::<'scope>::new(thread, registry);
- scope.base.complete(thread, || op(&scope))
-}
-
-impl<'scope> Scope<'scope> {
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let base = ScopeBase::new(owner, registry);
- Scope { base }
- }
-
- /// Spawns a job into the fork-join scope `self`. This job will
- /// execute sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its
- /// own reference to the scope `self` as argument. This can be
- /// used to inject new jobs into `self`.
- ///
- /// # Returns
- ///
- /// Nothing. The spawned closures cannot pass back values to the
- /// caller directly, though they can write to local variables on
- /// the stack (if those variables outlive the scope) or
- /// communicate through shared channels.
- ///
- /// (The intention is to eventually integrate with Rust futures to
- /// support spawns of functions that compute a value.)
- ///
- /// # Examples
- ///
- /// ```rust
- /// # use rayon_core as rayon;
- /// let mut value_a = None;
- /// let mut value_b = None;
- /// let mut value_c = None;
- /// rayon::scope(|s| {
- /// s.spawn(|s1| {
- /// // ^ this is the same scope as `s`; this handle `s1`
- /// // is intended for use by the spawned task,
- /// // since scope handles cannot cross thread boundaries.
- ///
- /// value_a = Some(22);
- ///
- /// // the scope `s` will not end until all these tasks are done
- /// s1.spawn(|_| {
- /// value_b = Some(44);
- /// });
- /// });
- ///
- /// s.spawn(|_| {
- /// value_c = Some(66);
- /// });
- /// });
- /// assert_eq!(value_a, Some(22));
- /// assert_eq!(value_b, Some(44));
- /// assert_eq!(value_c, Some(66));
- /// ```
- ///
- /// # See also
- ///
- /// The [`scope` function] has more extensive documentation about
- /// task spawning.
- ///
- /// [`scope` function]: fn.scope.html
- pub fn spawn<BODY>(&self, body: BODY)
- where
- BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- ScopeBase::execute_job(&scope.base, move || body(scope))
- });
- let job_ref = self.base.heap_job_ref(job);
-
- // Since `Scope` implements `Sync`, we can't be sure that we're still in a
- // thread of this pool, so we can't just push to the local worker thread.
- // Also, this might be an in-place scope.
- self.base.registry.inject_or_push(job_ref);
- }
-
- /// Spawns a job into every thread of the fork-join scope `self`. This job will
- /// execute on each thread sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its own reference
- /// to the scope `self` as argument, as well as a `BroadcastContext`.
- pub fn spawn_broadcast<BODY>(&self, body: BODY)
- where
- BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- let body = &body;
- let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- ScopeBase::execute_job(&scope.base, func)
- });
- self.base.inject_broadcast(job)
- }
-}
-
-impl<'scope> ScopeFifo<'scope> {
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let base = ScopeBase::new(owner, registry);
- let num_threads = base.registry.num_threads();
- let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
- ScopeFifo { base, fifos }
- }
-
- /// Spawns a job into the fork-join scope `self`. This job will
- /// execute sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its
- /// own reference to the scope `self` as argument. This can be
- /// used to inject new jobs into `self`.
- ///
- /// # See also
- ///
- /// This method is akin to [`Scope::spawn()`], but with a FIFO
- /// priority. The [`scope_fifo` function] has more details about
- /// this distinction.
- ///
- /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
- /// [`scope_fifo` function]: fn.scope_fifo.html
- pub fn spawn_fifo<BODY>(&self, body: BODY)
- where
- BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- ScopeBase::execute_job(&scope.base, move || body(scope))
- });
- let job_ref = self.base.heap_job_ref(job);
-
- // If we're in the pool, use our scope's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match self.base.registry.current_thread() {
- Some(worker) => {
- let fifo = &self.fifos[worker.index()];
- // SAFETY: this job will execute before the scope ends.
- unsafe { worker.push(fifo.push(job_ref)) };
- }
- None => self.base.registry.inject(job_ref),
- }
- }
-
- /// Spawns a job into every thread of the fork-join scope `self`. This job will
- /// execute on each thread sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its own reference
- /// to the scope `self` as argument, as well as a `BroadcastContext`.
- pub fn spawn_broadcast<BODY>(&self, body: BODY)
- where
- BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- let body = &body;
- let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- ScopeBase::execute_job(&scope.base, func)
- });
- self.base.inject_broadcast(job)
- }
-}
-
-impl<'scope> ScopeBase<'scope> {
- /// Creates the base of a new scope for the given registry
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let registry = registry.unwrap_or_else(|| match owner {
- Some(owner) => owner.registry(),
- None => global_registry(),
- });
-
- ScopeBase {
- registry: Arc::clone(registry),
- panic: AtomicPtr::new(ptr::null_mut()),
- job_completed_latch: CountLatch::new(owner),
- marker: PhantomData,
- }
- }
-
- fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
- where
- FUNC: FnOnce() + Send + 'scope,
- {
- unsafe {
- self.job_completed_latch.increment();
- job.into_job_ref()
- }
- }
-
- fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
- where
- FUNC: Fn() + Send + Sync + 'scope,
- {
- let n_threads = self.registry.num_threads();
- let job_refs = (0..n_threads).map(|_| unsafe {
- self.job_completed_latch.increment();
- ArcJob::as_job_ref(&job)
- });
-
- self.registry.inject_broadcast(job_refs);
- }
-
- /// Executes `func` as a job, either aborting or executing as
- /// appropriate.
- fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
- where
- FUNC: FnOnce() -> R,
- {
- let result = unsafe { Self::execute_job_closure(self, func) };
- self.job_completed_latch.wait(owner);
- self.maybe_propagate_panic();
- result.unwrap() // only None if `op` panicked, and that would have been propagated
- }
-
- /// Executes `func` as a job, either aborting or executing as
- /// appropriate.
- unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
- where
- FUNC: FnOnce(),
- {
- let _: Option<()> = Self::execute_job_closure(this, func);
- }
-
- /// Executes `func` as a job in scope. Adjusts the "job completed"
- /// counters and also catches any panic and stores it into
- /// `scope`.
- unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
- where
- FUNC: FnOnce() -> R,
- {
- let result = match unwind::halt_unwinding(func) {
- Ok(r) => Some(r),
- Err(err) => {
- (*this).job_panicked(err);
- None
- }
- };
- Latch::set(&(*this).job_completed_latch);
- result
- }
-
- fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
- // capture the first error we see, free the rest
- if self.panic.load(Ordering::Relaxed).is_null() {
- let nil = ptr::null_mut();
- let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
- let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
- if self
- .panic
- .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- // ownership now transferred into self.panic
- } else {
- // another panic raced in ahead of us, so drop ours
- let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
- }
- }
- }
-
- fn maybe_propagate_panic(&self) {
- // propagate panic, if any occurred; at this point, all
- // outstanding jobs have completed, so we can use a relaxed
- // ordering:
- let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
- if !panic.is_null() {
- let value = unsafe { Box::from_raw(panic) };
- unwind::resume_unwinding(*value);
- }
- }
-}
-
-impl<'scope> fmt::Debug for Scope<'scope> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scope")
- .field("pool_id", &self.base.registry.id())
- .field("panic", &self.base.panic)
- .field("job_completed_latch", &self.base.job_completed_latch)
- .finish()
- }
-}
-
-impl<'scope> fmt::Debug for ScopeFifo<'scope> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("ScopeFifo")
- .field("num_fifos", &self.fifos.len())
- .field("pool_id", &self.base.registry.id())
- .field("panic", &self.base.panic)
- .field("job_completed_latch", &self.base.job_completed_latch)
- .finish()
- }
-}
-
-/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
-///
-/// Unsafe code is still required to dereference the pointer, but that's fine in
-/// scope jobs that are guaranteed to execute before the scope ends.
-struct ScopePtr<T>(*const T);
-
-// SAFETY: !Send for raw pointers is not for safety, just as a lint
-unsafe impl<T: Sync> Send for ScopePtr<T> {}
-
-// SAFETY: !Sync for raw pointers is not for safety, just as a lint
-unsafe impl<T: Sync> Sync for ScopePtr<T> {}
-
-impl<T> ScopePtr<T> {
- // Helper to avoid disjoint captures of `scope_ptr.0`
- unsafe fn as_ref(&self) -> &T {
- &*self.0
- }
-}
diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs
deleted file mode 100644
index ad8c4af..0000000
--- a/vendor/rayon-core/src/scope/test.rs
+++ /dev/null
@@ -1,619 +0,0 @@
-use crate::unwind;
-use crate::ThreadPoolBuilder;
-use crate::{scope, scope_fifo, Scope, ScopeFifo};
-use rand::{Rng, SeedableRng};
-use rand_xorshift::XorShiftRng;
-use std::cmp;
-use std::iter::once;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Barrier, Mutex};
-use std::vec;
-
-#[test]
-fn scope_empty() {
- scope(|_| {});
-}
-
-#[test]
-fn scope_result() {
- let x = scope(|_| 22);
- assert_eq!(x, 22);
-}
-
-#[test]
-fn scope_two() {
- let counter = &AtomicUsize::new(0);
- scope(|s| {
- s.spawn(move |_| {
- counter.fetch_add(1, Ordering::SeqCst);
- });
- s.spawn(move |_| {
- counter.fetch_add(10, Ordering::SeqCst);
- });
- });
-
- let v = counter.load(Ordering::SeqCst);
- assert_eq!(v, 11);
-}
-
-#[test]
-fn scope_divide_and_conquer() {
- let counter_p = &AtomicUsize::new(0);
- scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
-
- let counter_s = &AtomicUsize::new(0);
- divide_and_conquer_seq(counter_s, 1024);
-
- let p = counter_p.load(Ordering::SeqCst);
- let s = counter_s.load(Ordering::SeqCst);
- assert_eq!(p, s);
-}
-
-fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) {
- if size > 1 {
- scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
- scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
- } else {
- // count the leaves
- counter.fetch_add(1, Ordering::SeqCst);
- }
-}
-
-fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) {
- if size > 1 {
- divide_and_conquer_seq(counter, size / 2);
- divide_and_conquer_seq(counter, size / 2);
- } else {
- // count the leaves
- counter.fetch_add(1, Ordering::SeqCst);
- }
-}
-
-struct Tree<T: Send> {
- value: T,
- children: Vec<Tree<T>>,
-}
-
-impl<T: Send> Tree<T> {
- fn iter(&self) -> vec::IntoIter<&T> {
- once(&self.value)
- .chain(self.children.iter().flat_map(Tree::iter))
- .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
- .into_iter()
- }
-
- fn update<OP>(&mut self, op: OP)
- where
- OP: Fn(&mut T) + Sync,
- T: Send,
- {
- scope(|s| self.update_in_scope(&op, s));
- }
-
- fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>)
- where
- OP: Fn(&mut T) + Sync,
- {
- let Tree {
- ref mut value,
- ref mut children,
- } = *self;
- scope.spawn(move |scope| {
- for child in children {
- scope.spawn(move |scope| child.update_in_scope(op, scope));
- }
- });
-
- op(value);
- }
-}
-
-fn random_tree(depth: usize) -> Tree<u32> {
- assert!(depth > 0);
- let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
- (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
- let mut rng = XorShiftRng::from_seed(seed);
- random_tree1(depth, &mut rng)
-}
-
-fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> {
- let children = if depth == 0 {
- vec![]
- } else {
- (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level
- .map(|_| random_tree1(depth - 1, rng))
- .collect()
- };
-
- Tree {
- value: rng.gen_range(0..1_000_000),
- children,
- }
-}
-
-#[test]
-fn update_tree() {
- let mut tree: Tree<u32> = random_tree(10);
- let values: Vec<u32> = tree.iter().cloned().collect();
- tree.update(|v| *v += 1);
- let new_values: Vec<u32> = tree.iter().cloned().collect();
- assert_eq!(values.len(), new_values.len());
- for (&i, &j) in values.iter().zip(&new_values) {
- assert_eq!(i + 1, j);
- }
-}
-
-/// Check that if you have a chain of scoped tasks where T0 spawns T1
-/// spawns T2 and so forth down to Tn, the stack space should not grow
-/// linearly with N. We test this by some unsafe hackery and
-/// permitting an approx 10% change with a 10x input change.
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn linear_stack_growth() {
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- pool.install(|| {
- let mut max_diff = Mutex::new(0);
- let bottom_of_stack = 0;
- scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5));
- let diff_when_5 = *max_diff.get_mut().unwrap() as f64;
-
- scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500));
- let diff_when_500 = *max_diff.get_mut().unwrap() as f64;
-
- let ratio = diff_when_5 / diff_when_500;
- assert!(
- ratio > 0.9 && ratio < 1.1,
- "stack usage ratio out of bounds: {}",
- ratio
- );
- });
-}
-
-fn the_final_countdown<'scope>(
- s: &Scope<'scope>,
- bottom_of_stack: &'scope i32,
- max: &'scope Mutex<usize>,
- n: usize,
-) {
- let top_of_stack = 0;
- let p = bottom_of_stack as *const i32 as usize;
- let q = &top_of_stack as *const i32 as usize;
- let diff = if p > q { p - q } else { q - p };
-
- let mut data = max.lock().unwrap();
- *data = cmp::max(diff, *data);
-
- if n > 0 {
- s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1));
- }
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_scope() {
- scope(|_| panic!("Hello, world!"));
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_spawn() {
- scope(|s| s.spawn(|_| panic!("Hello, world!")));
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_nested_spawn() {
- scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!")))));
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_nested_scope_spawn() {
- scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!")))));
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_propagate_still_execute_1() {
- let mut x = false;
- match unwind::halt_unwinding(|| {
- scope(|s| {
- s.spawn(|_| panic!("Hello, world!")); // job A
- s.spawn(|_| x = true); // job B, should still execute even though A panics
- });
- }) {
- Ok(_) => panic!("failed to propagate panic"),
- Err(_) => assert!(x, "job b failed to execute"),
- }
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_propagate_still_execute_2() {
- let mut x = false;
- match unwind::halt_unwinding(|| {
- scope(|s| {
- s.spawn(|_| x = true); // job B, should still execute even though A panics
- s.spawn(|_| panic!("Hello, world!")); // job A
- });
- }) {
- Ok(_) => panic!("failed to propagate panic"),
- Err(_) => assert!(x, "job b failed to execute"),
- }
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_propagate_still_execute_3() {
- let mut x = false;
- match unwind::halt_unwinding(|| {
- scope(|s| {
- s.spawn(|_| x = true); // spawned job should still execute despite later panic
- panic!("Hello, world!");
- });
- }) {
- Ok(_) => panic!("failed to propagate panic"),
- Err(_) => assert!(x, "panic after spawn, spawn failed to execute"),
- }
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_propagate_still_execute_4() {
- let mut x = false;
- match unwind::halt_unwinding(|| {
- scope(|s| {
- s.spawn(|_| panic!("Hello, world!"));
- x = true;
- });
- }) {
- Ok(_) => panic!("failed to propagate panic"),
- Err(_) => assert!(x, "panic in spawn tainted scope"),
- }
-}
-
-macro_rules! test_order {
- ($scope:ident => $spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- pool.install(|| {
- let vec = Mutex::new(vec![]);
- $scope(|scope| {
- let vec = &vec;
- for i in 0..10 {
- scope.$spawn(move |scope| {
- for j in 0..10 {
- scope.$spawn(move |_| {
- vec.lock().unwrap().push(i * 10 + j);
- });
- }
- });
- }
- });
- vec.into_inner().unwrap()
- })
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn lifo_order() {
- // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
- let vec = test_order!(scope => spawn);
- let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn fifo_order() {
- // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
- let vec = test_order!(scope_fifo => spawn_fifo);
- let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
- assert_eq!(vec, expected);
-}
-
-macro_rules! test_nested_order {
- ($outer_scope:ident => $outer_spawn:ident,
- $inner_scope:ident => $inner_spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- pool.install(|| {
- let vec = Mutex::new(vec![]);
- $outer_scope(|scope| {
- let vec = &vec;
- for i in 0..10 {
- scope.$outer_spawn(move |_| {
- $inner_scope(|scope| {
- for j in 0..10 {
- scope.$inner_spawn(move |_| {
- vec.lock().unwrap().push(i * 10 + j);
- });
- }
- });
- });
- }
- });
- vec.into_inner().unwrap()
- })
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_lifo_order() {
- // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
- let vec = test_nested_order!(scope => spawn, scope => spawn);
- let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_fifo_order() {
- // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
- let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
- let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_lifo_fifo_order() {
- // LIFO on the outside, FIFO on the inside
- let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
- let expected: Vec<i32> = (0..10)
- .rev()
- .flat_map(|i| (0..10).map(move |j| i * 10 + j))
- .collect();
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_fifo_lifo_order() {
- // FIFO on the outside, LIFO on the inside
- let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
- let expected: Vec<i32> = (0..10)
- .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
- .collect();
- assert_eq!(vec, expected);
-}
-
-macro_rules! spawn_push {
- ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{
- $scope.$spawn(move |_| $vec.lock().unwrap().push($i));
- }};
-}
-
-/// Test spawns pushing a series of numbers, interleaved
-/// such that negative values are using an inner scope.
-macro_rules! test_mixed_order {
- ($outer_scope:ident => $outer_spawn:ident,
- $inner_scope:ident => $inner_spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- pool.install(|| {
- let vec = Mutex::new(vec![]);
- $outer_scope(|outer_scope| {
- let vec = &vec;
- spawn_push!(outer_scope.$outer_spawn, vec, 0);
- $inner_scope(|inner_scope| {
- spawn_push!(inner_scope.$inner_spawn, vec, -1);
- spawn_push!(outer_scope.$outer_spawn, vec, 1);
- spawn_push!(inner_scope.$inner_spawn, vec, -2);
- spawn_push!(outer_scope.$outer_spawn, vec, 2);
- spawn_push!(inner_scope.$inner_spawn, vec, -3);
- });
- spawn_push!(outer_scope.$outer_spawn, vec, 3);
- });
- vec.into_inner().unwrap()
- })
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_lifo_order() {
- // NB: the end of the inner scope makes us execute some of the outer scope
- // before they've all been spawned, so they're not perfectly LIFO.
- let vec = test_mixed_order!(scope => spawn, scope => spawn);
- let expected = vec![-3, 2, -2, 1, -1, 3, 0];
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_fifo_order() {
- let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
- let expected = vec![-1, 0, -2, 1, -3, 2, 3];
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_lifo_fifo_order() {
- // NB: the end of the inner scope makes us execute some of the outer scope
- // before they've all been spawned, so they're not perfectly LIFO.
- let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo);
- let expected = vec![-1, 2, -2, 1, -3, 3, 0];
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_fifo_lifo_order() {
- let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
- let expected = vec![-3, 0, -2, 1, -1, 2, 3];
- assert_eq!(vec, expected);
-}
-
-#[test]
-fn static_scope() {
- static COUNTER: AtomicUsize = AtomicUsize::new(0);
-
- let mut range = 0..100;
- let sum = range.clone().sum();
- let iter = &mut range;
-
- COUNTER.store(0, Ordering::Relaxed);
- scope(|s: &Scope<'static>| {
- // While we're allowed the locally borrowed iterator,
- // the spawns must be static.
- for i in iter {
- s.spawn(move |_| {
- COUNTER.fetch_add(i, Ordering::Relaxed);
- });
- }
- });
-
- assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
-}
-
-#[test]
-fn static_scope_fifo() {
- static COUNTER: AtomicUsize = AtomicUsize::new(0);
-
- let mut range = 0..100;
- let sum = range.clone().sum();
- let iter = &mut range;
-
- COUNTER.store(0, Ordering::Relaxed);
- scope_fifo(|s: &ScopeFifo<'static>| {
- // While we're allowed the locally borrowed iterator,
- // the spawns must be static.
- for i in iter {
- s.spawn_fifo(move |_| {
- COUNTER.fetch_add(i, Ordering::Relaxed);
- });
- }
- });
-
- assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
-}
-
-#[test]
-fn mixed_lifetime_scope() {
- fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
- scope(move |s: &Scope<'counter>| {
- // We can borrow 'slice here, but the spawns can only borrow 'counter.
- for &c in counters {
- s.spawn(move |_| {
- c.fetch_add(1, Ordering::Relaxed);
- });
- }
- });
- }
-
- let counter = AtomicUsize::new(0);
- increment(&[&counter; 100]);
- assert_eq!(counter.into_inner(), 100);
-}
-
-#[test]
-fn mixed_lifetime_scope_fifo() {
- fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
- scope_fifo(move |s: &ScopeFifo<'counter>| {
- // We can borrow 'slice here, but the spawns can only borrow 'counter.
- for &c in counters {
- s.spawn_fifo(move |_| {
- c.fetch_add(1, Ordering::Relaxed);
- });
- }
- });
- }
-
- let counter = AtomicUsize::new(0);
- increment(&[&counter; 100]);
- assert_eq!(counter.into_inner(), 100);
-}
-
-#[test]
-fn scope_spawn_broadcast() {
- let sum = AtomicUsize::new(0);
- let n = scope(|s| {
- s.spawn_broadcast(|_, ctx| {
- sum.fetch_add(ctx.index(), Ordering::Relaxed);
- });
- crate::current_num_threads()
- });
- assert_eq!(sum.into_inner(), n * (n - 1) / 2);
-}
-
-#[test]
-fn scope_fifo_spawn_broadcast() {
- let sum = AtomicUsize::new(0);
- let n = scope_fifo(|s| {
- s.spawn_broadcast(|_, ctx| {
- sum.fetch_add(ctx.index(), Ordering::Relaxed);
- });
- crate::current_num_threads()
- });
- assert_eq!(sum.into_inner(), n * (n - 1) / 2);
-}
-
-#[test]
-fn scope_spawn_broadcast_nested() {
- let sum = AtomicUsize::new(0);
- let n = scope(|s| {
- s.spawn_broadcast(|s, _| {
- s.spawn_broadcast(|_, ctx| {
- sum.fetch_add(ctx.index(), Ordering::Relaxed);
- });
- });
- crate::current_num_threads()
- });
- assert_eq!(sum.into_inner(), n * n * (n - 1) / 2);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn scope_spawn_broadcast_barrier() {
- let barrier = Barrier::new(8);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- pool.in_place_scope(|s| {
- s.spawn_broadcast(|_, _| {
- barrier.wait();
- });
- barrier.wait();
- });
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn scope_spawn_broadcast_panic_one() {
- let count = AtomicUsize::new(0);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let result = crate::unwind::halt_unwinding(|| {
- pool.scope(|s| {
- s.spawn_broadcast(|_, ctx| {
- count.fetch_add(1, Ordering::Relaxed);
- if ctx.index() == 3 {
- panic!("Hello, world!");
- }
- });
- });
- });
- assert_eq!(count.into_inner(), 7);
- assert!(result.is_err(), "broadcast panic should propagate!");
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn scope_spawn_broadcast_panic_many() {
- let count = AtomicUsize::new(0);
- let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
- let result = crate::unwind::halt_unwinding(|| {
- pool.scope(|s| {
- s.spawn_broadcast(|_, ctx| {
- count.fetch_add(1, Ordering::Relaxed);
- if ctx.index() % 2 == 0 {
- panic!("Hello, world!");
- }
- });
- });
- });
- assert_eq!(count.into_inner(), 7);
- assert!(result.is_err(), "broadcast panic should propagate!");
-}
diff --git a/vendor/rayon-core/src/sleep/README.md b/vendor/rayon-core/src/sleep/README.md
deleted file mode 100644
index 55426c8..0000000
--- a/vendor/rayon-core/src/sleep/README.md
+++ /dev/null
@@ -1,219 +0,0 @@
-# Introduction: the sleep module
-
-The code in this module governs when worker threads should go to
-sleep. The system used in this code was introduced in [Rayon RFC #5].
-There is also a [video walkthrough] available. Both of those may be
-valuable resources to understanding the code, though naturally they
-will also grow stale over time. The comments in this file are
-extracted from the RFC and meant to be kept up to date.
-
-[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5
-[video walkthrough]: https://youtu.be/HvmQsE5M4cY
-
-# The `Sleep` struct
-
-The `Sleep` struct is embedded into each registry. It performs several functions:
-
-* It tracks when workers are awake or asleep.
-* It decides how long a worker should look for work before it goes to sleep,
- via a callback that is invoked periodically from the worker's search loop.
-* It is notified when latches are set, jobs are published, or other
- events occur, and it will go and wake the appropriate threads if
- they are sleeping.
-
-# Thread states
-
-There are three main thread states:
-
-* An **active** thread is one that is actively executing a job.
-* An **idle** thread is one that is searching for work to do. It will be
- trying to steal work or pop work from the global injector queue.
-* A **sleeping** thread is one that is blocked on a condition variable,
- waiting to be awoken.
-
-We sometimes refer to the final two states collectively as **inactive**.
-Threads begin as idle but transition to idle and finally sleeping when
-they're unable to find work to do.
-
-## Sleepy threads
-
-There is one other special state worth mentioning. During the idle state,
-threads can get **sleepy**. A sleepy thread is still idle, in that it is still
-searching for work, but it is *about* to go to sleep after it does one more
-search (or some other number, potentially). When a thread enters the sleepy
-state, it signals (via the **jobs event counter**, described below) that it is
-about to go to sleep. If new work is published, this will lead to the counter
-being adjusted. When the thread actually goes to sleep, it will (hopefully, but
-not guaranteed) see that the counter has changed and elect not to sleep, but
-instead to search again. See the section on the **jobs event counter** for more
-details.
-
-# The counters
-
-One of the key structs in the sleep module is `AtomicCounters`, found in
-`counters.rs`. It packs three counters into one atomically managed value:
-
-* Two **thread counters**, which track the number of threads in a particular state.
-* The **jobs event counter**, which is used to signal when new work is available.
- It (sort of) tracks the number of jobs posted, but not quite, and it can rollover.
-
-## Thread counters
-
-There are two thread counters, one that tracks **inactive** threads and one that
-tracks **sleeping** threads. From this, one can deduce the number of threads
-that are idle by subtracting sleeping threads from inactive threads. We track
-the counters in this way because it permits simpler atomic operations. One can
-increment the number of sleeping threads (and thus decrease the number of idle
-threads) simply by doing one atomic increment, for example. Similarly, one can
-decrease the number of sleeping threads (and increase the number of idle
-threads) through one atomic decrement.
-
-These counters are adjusted as follows:
-
-* When a thread enters the idle state: increment the inactive thread counter.
-* When a thread enters the sleeping state: increment the sleeping thread counter.
-* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
- * Subtle point: the thread that *awakens* the sleeping thread decrements the
- counter, not the thread that is *sleeping*. This is because there is a delay
- between signaling a thread to wake and the thread actually waking:
- decrementing the counter when awakening the thread means that other threads
- that may be posting work will see the up-to-date value that much faster.
-* When a thread finds work, exiting the idle state: decrement the inactive
- thread counter.
-
-## Jobs event counter
-
-The final counter is the **jobs event counter**. The role of this counter is to
-help sleepy threads detect when new work is posted in a lightweight fashion. In
-its simplest form, we would simply have a counter that gets incremented each
-time a new job is posted. This way, when a thread gets sleepy, it could read the
-counter, and then compare to see if the value has changed before it actually
-goes to sleep. But this [turns out to be too expensive] in practice, so we use a
-somewhat more complex scheme.
-
-[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747
-
-The idea is that the counter toggles between two states, depending on whether
-its value is even or odd (or, equivalently, on the value of its low bit):
-
-* Even -- If the low bit is zero, then it means that there has been no new work
- since the last thread got sleepy.
-* Odd -- If the low bit is one, then it means that new work was posted since
- the last thread got sleepy.
-
-### New work is posted
-
-When new work is posted, we check the value of the counter: if it is even,
-then we increment it by one, so that it becomes odd.
-
-### Worker thread gets sleepy
-
-When a worker thread gets sleepy, it will read the value of the counter. If the
-counter is odd, it will increment the counter so that it is even. Either way, it
-remembers the final value of the counter. The final value will be used later,
-when the thread is going to sleep. If at that time the counter has not changed,
-then we can assume no new jobs have been posted (though note the remote
-possibility of rollover, discussed in detail below).
-
-# Protocol for a worker thread to post work
-
-The full protocol for a thread to post work is as follows
-
-* If the work is posted into the injection queue, then execute a seq-cst fence (see below).
-* Load the counters, incrementing the JEC if it is even so that it is odd.
-* Check if there are idle threads available to handle this new job. If not,
- and there are sleeping threads, then wake one or more threads.
-
-# Protocol for a worker thread to fall asleep
-
-The full protocol for a thread to fall asleep is as follows:
-
-* After completing all its jobs, the worker goes idle and begins to
- search for work. As it searches, it counts "rounds". In each round,
- it searches all other work threads' queues, plus the 'injector queue' for
- work injected from the outside. If work is found in this search, the thread
- becomes active again and hence restarts this protocol from the top.
-* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy`
- above, remembering the `final_value` of the JEC. It does one more search for work.
-* If no work is found, the thread atomically:
- * Checks the JEC to see that it has not changed from `final_value`.
- * If it has, then the thread goes back to searching for work. We reset to
- just before we got sleepy, so that we will do one more search
- before attending to sleep again (rather than searching for many rounds).
- * Increments the number of sleeping threads by 1.
-* The thread then executes a seq-cst fence operation (see below).
-* The thread then does one final check for injected jobs (see below). If any
- are available, it returns to the 'pre-sleepy' state as if the JEC had changed.
-* The thread waits to be signaled. Once signaled, it returns to the idle state.
-
-# The jobs event counter and deadlock
-
-As described in the section on the JEC, the main concern around going to sleep
-is avoiding a race condition wherein:
-
-* Thread A looks for work, finds none.
-* Thread B posts work but sees no sleeping threads.
-* Thread A goes to sleep.
-
-The JEC protocol largely prevents this, but due to rollover, this prevention is
-not complete. It is possible -- if unlikely -- that enough activity occurs for
-Thread A to observe the same JEC value that it saw when getting sleepy. If the
-new work being published came from *inside* the thread-pool, then this race
-condition isn't too harmful. It means that we have fewer workers processing the
-work then we should, but we won't deadlock. This seems like an acceptable risk
-given that this is unlikely in practice.
-
-However, if the work was posted as an *external* job, that is a problem. In that
-case, it's possible that all of our workers could go to sleep, and the external
-job would never get processed. To prevent that, the sleeping protocol includes
-one final check to see if the injector queue is empty before fully falling
-asleep. Note that this final check occurs **after** the number of sleeping
-threads has been incremented. We are not concerned therefore with races against
-injections that occur after that increment, only before.
-
-Unfortunately, there is one rather subtle point concerning this final check:
-we wish to avoid the possibility that:
-
-* work is pushed into the injection queue by an outside thread X,
-* the sleepy thread S sees the JEC but it has rolled over and is equal
-* the sleepy thread S reads the injection queue but does not see the work posted by X.
-
-This is possible because the C++ memory model typically offers guarantees of the
-form "if you see the access A, then you must see those other accesses" -- but it
-doesn't guarantee that you will see the access A (i.e., if you think of
-processors with independent caches, you may be operating on very out of date
-cache state).
-
-## Using seq-cst fences to prevent deadlock
-
-To overcome this problem, we have inserted two sequentially consistent fence
-operations into the protocols above:
-
-* One fence occurs after work is posted into the injection queue, but before the
- counters are read (including the number of sleeping threads).
- * Note that no fence is needed for work posted to internal queues, since it is ok
- to overlook work in that case.
-* One fence occurs after the number of sleeping threads is incremented, but
- before the injection queue is read.
-
-### Proof sketch
-
-What follows is a "proof sketch" that the protocol is deadlock free. We model
-two relevant bits of memory, the job injector queue J and the atomic counters C.
-
-Consider the actions of the injecting thread:
-
-* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics.
-* PushFence: A sequentially consistent fence is executed.
-* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first).
-
-Meanwhile, the sleepy thread does the following:
-
-* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C.
-* SleepFence: A sequentially consistent fence is executed.
-* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics.
-
-Either PushFence or SleepFence must come first:
-
-* If PushFence comes first, then PushJob must be visible to ReadJob.
-* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file
diff --git a/vendor/rayon-core/src/sleep/counters.rs b/vendor/rayon-core/src/sleep/counters.rs
deleted file mode 100644
index 53d2c55..0000000
--- a/vendor/rayon-core/src/sleep/counters.rs
+++ /dev/null
@@ -1,277 +0,0 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-pub(super) struct AtomicCounters {
- /// Packs together a number of counters. The counters are ordered as
- /// follows, from least to most significant bits (here, we assuming
- /// that [`THREADS_BITS`] is equal to 10):
- ///
- /// * Bits 0..10: Stores the number of **sleeping threads**
- /// * Bits 10..20: Stores the number of **inactive threads**
- /// * Bits 20..: Stores the **job event counter** (JEC)
- ///
- /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
- /// that the total number of bits (and hence the number of bits used for the
- /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
- value: AtomicUsize,
-}
-
-#[derive(Copy, Clone)]
-pub(super) struct Counters {
- word: usize,
-}
-
-/// A value read from the **Jobs Event Counter**.
-/// See the [`README.md`](README.md) for more
-/// coverage of how the jobs event counter works.
-#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
-pub(super) struct JobsEventCounter(usize);
-
-impl JobsEventCounter {
- pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
-
- #[inline]
- pub(super) fn as_usize(self) -> usize {
- self.0
- }
-
- /// The JEC "is sleepy" if the last thread to increment it was in the
- /// process of becoming sleepy. This is indicated by its value being *even*.
- /// When new jobs are posted, they check if the JEC is sleepy, and if so
- /// they incremented it.
- #[inline]
- pub(super) fn is_sleepy(self) -> bool {
- (self.as_usize() & 1) == 0
- }
-
- /// The JEC "is active" if the last thread to increment it was posting new
- /// work. This is indicated by its value being *odd*. When threads get
- /// sleepy, they will check if the JEC is active, and increment it.
- #[inline]
- pub(super) fn is_active(self) -> bool {
- !self.is_sleepy()
- }
-}
-
-/// Number of bits used for the thread counters.
-#[cfg(target_pointer_width = "64")]
-const THREADS_BITS: usize = 16;
-
-#[cfg(target_pointer_width = "32")]
-const THREADS_BITS: usize = 8;
-
-/// Bits to shift to select the sleeping threads
-/// (used with `select_bits`).
-#[allow(clippy::erasing_op)]
-const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
-
-/// Bits to shift to select the inactive threads
-/// (used with `select_bits`).
-#[allow(clippy::identity_op)]
-const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
-
-/// Bits to shift to select the JEC
-/// (use JOBS_BITS).
-const JEC_SHIFT: usize = 2 * THREADS_BITS;
-
-/// Max value for the thread counters.
-pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
-
-/// Constant that can be added to add one sleeping thread.
-const ONE_SLEEPING: usize = 1;
-
-/// Constant that can be added to add one inactive thread.
-/// An inactive thread is either idle, sleepy, or sleeping.
-const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
-
-/// Constant that can be added to add one to the JEC.
-const ONE_JEC: usize = 1 << JEC_SHIFT;
-
-impl AtomicCounters {
- #[inline]
- pub(super) fn new() -> AtomicCounters {
- AtomicCounters {
- value: AtomicUsize::new(0),
- }
- }
-
- /// Load and return the current value of the various counters.
- /// This value can then be given to other method which will
- /// attempt to update the counters via compare-and-swap.
- #[inline]
- pub(super) fn load(&self, ordering: Ordering) -> Counters {
- Counters::new(self.value.load(ordering))
- }
-
- #[inline]
- fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
- self.value
- .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
- .is_ok()
- }
-
- /// Adds an inactive thread. This cannot fail.
- ///
- /// This should be invoked when a thread enters its idle loop looking
- /// for work. It is decremented when work is found. Note that it is
- /// not decremented if the thread transitions from idle to sleepy or sleeping;
- /// so the number of inactive threads is always greater-than-or-equal
- /// to the number of sleeping threads.
- #[inline]
- pub(super) fn add_inactive_thread(&self) {
- self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
- }
-
- /// Increments the jobs event counter if `increment_when`, when applied to
- /// the current value, is true. Used to toggle the JEC from even (sleepy) to
- /// odd (active) or vice versa. Returns the final value of the counters, for
- /// which `increment_when` is guaranteed to return false.
- pub(super) fn increment_jobs_event_counter_if(
- &self,
- increment_when: impl Fn(JobsEventCounter) -> bool,
- ) -> Counters {
- loop {
- let old_value = self.load(Ordering::SeqCst);
- if increment_when(old_value.jobs_counter()) {
- let new_value = old_value.increment_jobs_counter();
- if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
- return new_value;
- }
- } else {
- return old_value;
- }
- }
- }
-
- /// Subtracts an inactive thread. This cannot fail. It is invoked
- /// when a thread finds work and hence becomes active. It returns the
- /// number of sleeping threads to wake up (if any).
- ///
- /// See `add_inactive_thread`.
- #[inline]
- pub(super) fn sub_inactive_thread(&self) -> usize {
- let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
- debug_assert!(
- old_value.inactive_threads() > 0,
- "sub_inactive_thread: old_value {:?} has no inactive threads",
- old_value,
- );
- debug_assert!(
- old_value.sleeping_threads() <= old_value.inactive_threads(),
- "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
- old_value,
- old_value.sleeping_threads(),
- old_value.inactive_threads(),
- );
-
- // Current heuristic: whenever an inactive thread goes away, if
- // there are any sleeping threads, wake 'em up.
- let sleeping_threads = old_value.sleeping_threads();
- std::cmp::min(sleeping_threads, 2)
- }
-
- /// Subtracts a sleeping thread. This cannot fail, but it is only
- /// safe to do if you you know the number of sleeping threads is
- /// non-zero (i.e., because you have just awoken a sleeping
- /// thread).
- #[inline]
- pub(super) fn sub_sleeping_thread(&self) {
- let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
- debug_assert!(
- old_value.sleeping_threads() > 0,
- "sub_sleeping_thread: old_value {:?} had no sleeping threads",
- old_value,
- );
- debug_assert!(
- old_value.sleeping_threads() <= old_value.inactive_threads(),
- "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
- old_value,
- old_value.sleeping_threads(),
- old_value.inactive_threads(),
- );
- }
-
- #[inline]
- pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
- debug_assert!(
- old_value.inactive_threads() > 0,
- "try_add_sleeping_thread: old_value {:?} has no inactive threads",
- old_value,
- );
- debug_assert!(
- old_value.sleeping_threads() < THREADS_MAX,
- "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
- old_value,
- );
-
- let mut new_value = old_value;
- new_value.word += ONE_SLEEPING;
-
- self.try_exchange(old_value, new_value, Ordering::SeqCst)
- }
-}
-
-#[inline]
-fn select_thread(word: usize, shift: usize) -> usize {
- (word >> shift) & THREADS_MAX
-}
-
-#[inline]
-fn select_jec(word: usize) -> usize {
- word >> JEC_SHIFT
-}
-
-impl Counters {
- #[inline]
- fn new(word: usize) -> Counters {
- Counters { word }
- }
-
- #[inline]
- fn increment_jobs_counter(self) -> Counters {
- // We can freely add to JEC because it occupies the most significant bits.
- // Thus it doesn't overflow into the other counters, just wraps itself.
- Counters {
- word: self.word.wrapping_add(ONE_JEC),
- }
- }
-
- #[inline]
- pub(super) fn jobs_counter(self) -> JobsEventCounter {
- JobsEventCounter(select_jec(self.word))
- }
-
- /// The number of threads that are not actively
- /// executing work. They may be idle, sleepy, or asleep.
- #[inline]
- pub(super) fn inactive_threads(self) -> usize {
- select_thread(self.word, INACTIVE_SHIFT)
- }
-
- #[inline]
- pub(super) fn awake_but_idle_threads(self) -> usize {
- debug_assert!(
- self.sleeping_threads() <= self.inactive_threads(),
- "sleeping threads: {} > raw idle threads {}",
- self.sleeping_threads(),
- self.inactive_threads()
- );
- self.inactive_threads() - self.sleeping_threads()
- }
-
- #[inline]
- pub(super) fn sleeping_threads(self) -> usize {
- select_thread(self.word, SLEEPING_SHIFT)
- }
-}
-
-impl std::fmt::Debug for Counters {
- fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let word = format!("{:016x}", self.word);
- fmt.debug_struct("Counters")
- .field("word", &word)
- .field("jobs", &self.jobs_counter().0)
- .field("inactive", &self.inactive_threads())
- .field("sleeping", &self.sleeping_threads())
- .finish()
- }
-}
diff --git a/vendor/rayon-core/src/sleep/mod.rs b/vendor/rayon-core/src/sleep/mod.rs
deleted file mode 100644
index 03d1077..0000000
--- a/vendor/rayon-core/src/sleep/mod.rs
+++ /dev/null
@@ -1,325 +0,0 @@
-//! Code that decides when workers should go to sleep. See README.md
-//! for an overview.
-
-use crate::latch::CoreLatch;
-use crossbeam_utils::CachePadded;
-use std::sync::atomic::Ordering;
-use std::sync::{Condvar, Mutex};
-use std::thread;
-use std::usize;
-
-mod counters;
-pub(crate) use self::counters::THREADS_MAX;
-use self::counters::{AtomicCounters, JobsEventCounter};
-
-/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
-/// of workers. It has callbacks that are invoked periodically at significant events,
-/// such as when workers are looping and looking for work, when latches are set, or when
-/// jobs are published, and it either blocks threads or wakes them in response to these
-/// events. See the [`README.md`] in this module for more details.
-///
-/// [`README.md`] README.md
-pub(super) struct Sleep {
- /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
- /// them block.
- worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
-
- counters: AtomicCounters,
-}
-
-/// An instance of this struct is created when a thread becomes idle.
-/// It is consumed when the thread finds work, and passed by `&mut`
-/// reference for operations that preserve the idle state. (In other
-/// words, producing one of these structs is evidence the thread is
-/// idle.) It tracks state such as how long the thread has been idle.
-pub(super) struct IdleState {
- /// What is worker index of the idle thread?
- worker_index: usize,
-
- /// How many rounds have we been circling without sleeping?
- rounds: u32,
-
- /// Once we become sleepy, what was the sleepy counter value?
- /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
- jobs_counter: JobsEventCounter,
-}
-
-/// The "sleep state" for an individual worker.
-#[derive(Default)]
-struct WorkerSleepState {
- /// Set to true when the worker goes to sleep; set to false when
- /// the worker is notified or when it wakes.
- is_blocked: Mutex<bool>,
-
- condvar: Condvar,
-}
-
-const ROUNDS_UNTIL_SLEEPY: u32 = 32;
-const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
-
-impl Sleep {
- pub(super) fn new(n_threads: usize) -> Sleep {
- assert!(n_threads <= THREADS_MAX);
- Sleep {
- worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
- counters: AtomicCounters::new(),
- }
- }
-
- #[inline]
- pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
- self.counters.add_inactive_thread();
-
- IdleState {
- worker_index,
- rounds: 0,
- jobs_counter: JobsEventCounter::DUMMY,
- }
- }
-
- #[inline]
- pub(super) fn work_found(&self) {
- // If we were the last idle thread and other threads are still sleeping,
- // then we should wake up another thread.
- let threads_to_wake = self.counters.sub_inactive_thread();
- self.wake_any_threads(threads_to_wake as u32);
- }
-
- #[inline]
- pub(super) fn no_work_found(
- &self,
- idle_state: &mut IdleState,
- latch: &CoreLatch,
- has_injected_jobs: impl FnOnce() -> bool,
- ) {
- if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
- thread::yield_now();
- idle_state.rounds += 1;
- } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
- idle_state.jobs_counter = self.announce_sleepy();
- idle_state.rounds += 1;
- thread::yield_now();
- } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
- idle_state.rounds += 1;
- thread::yield_now();
- } else {
- debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
- self.sleep(idle_state, latch, has_injected_jobs);
- }
- }
-
- #[cold]
- fn announce_sleepy(&self) -> JobsEventCounter {
- self.counters
- .increment_jobs_event_counter_if(JobsEventCounter::is_active)
- .jobs_counter()
- }
-
- #[cold]
- fn sleep(
- &self,
- idle_state: &mut IdleState,
- latch: &CoreLatch,
- has_injected_jobs: impl FnOnce() -> bool,
- ) {
- let worker_index = idle_state.worker_index;
-
- if !latch.get_sleepy() {
- return;
- }
-
- let sleep_state = &self.worker_sleep_states[worker_index];
- let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
- debug_assert!(!*is_blocked);
-
- // Our latch was signalled. We should wake back up fully as we
- // will have some stuff to do.
- if !latch.fall_asleep() {
- idle_state.wake_fully();
- return;
- }
-
- loop {
- let counters = self.counters.load(Ordering::SeqCst);
-
- // Check if the JEC has changed since we got sleepy.
- debug_assert!(idle_state.jobs_counter.is_sleepy());
- if counters.jobs_counter() != idle_state.jobs_counter {
- // JEC has changed, so a new job was posted, but for some reason
- // we didn't see it. We should return to just before the SLEEPY
- // state so we can do another search and (if we fail to find
- // work) go back to sleep.
- idle_state.wake_partly();
- latch.wake_up();
- return;
- }
-
- // Otherwise, let's move from IDLE to SLEEPING.
- if self.counters.try_add_sleeping_thread(counters) {
- break;
- }
- }
-
- // Successfully registered as asleep.
-
- // We have one last check for injected jobs to do. This protects against
- // deadlock in the very unlikely event that
- //
- // - an external job is being injected while we are sleepy
- // - that job triggers the rollover over the JEC such that we don't see it
- // - we are the last active worker thread
- std::sync::atomic::fence(Ordering::SeqCst);
- if has_injected_jobs() {
- // If we see an externally injected job, then we have to 'wake
- // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
- // the one that wakes us.)
- self.counters.sub_sleeping_thread();
- } else {
- // If we don't see an injected job (the normal case), then flag
- // ourselves as asleep and wait till we are notified.
- //
- // (Note that `is_blocked` is held under a mutex and the mutex was
- // acquired *before* we incremented the "sleepy counter". This means
- // that whomever is coming to wake us will have to wait until we
- // release the mutex in the call to `wait`, so they will see this
- // boolean as true.)
- *is_blocked = true;
- while *is_blocked {
- is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
- }
- }
-
- // Update other state:
- idle_state.wake_fully();
- latch.wake_up();
- }
-
- /// Notify the given thread that it should wake up (if it is
- /// sleeping). When this method is invoked, we typically know the
- /// thread is asleep, though in rare cases it could have been
- /// awoken by (e.g.) new work having been posted.
- pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
- self.wake_specific_thread(target_worker_index);
- }
-
- /// Signals that `num_jobs` new jobs were injected into the thread
- /// pool from outside. This function will ensure that there are
- /// threads available to process them, waking threads from sleep
- /// if necessary.
- ///
- /// # Parameters
- ///
- /// - `num_jobs` -- lower bound on number of jobs available for stealing.
- /// We'll try to get at least one thread per job.
- #[inline]
- pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
- // This fence is needed to guarantee that threads
- // as they are about to fall asleep, observe any
- // new jobs that may have been injected.
- std::sync::atomic::fence(Ordering::SeqCst);
-
- self.new_jobs(num_jobs, queue_was_empty)
- }
-
- /// Signals that `num_jobs` new jobs were pushed onto a thread's
- /// local deque. This function will try to ensure that there are
- /// threads available to process them, waking threads from sleep
- /// if necessary. However, this is not guaranteed: under certain
- /// race conditions, the function may fail to wake any new
- /// threads; in that case the existing thread should eventually
- /// pop the job.
- ///
- /// # Parameters
- ///
- /// - `num_jobs` -- lower bound on number of jobs available for stealing.
- /// We'll try to get at least one thread per job.
- #[inline]
- pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
- self.new_jobs(num_jobs, queue_was_empty)
- }
-
- /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
- #[inline]
- fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
- // Read the counters and -- if sleepy workers have announced themselves
- // -- announce that there is now work available. The final value of `counters`
- // with which we exit the loop thus corresponds to a state when
- let counters = self
- .counters
- .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
- let num_awake_but_idle = counters.awake_but_idle_threads();
- let num_sleepers = counters.sleeping_threads();
-
- if num_sleepers == 0 {
- // nobody to wake
- return;
- }
-
- // Promote from u16 to u32 so we can interoperate with
- // num_jobs more easily.
- let num_awake_but_idle = num_awake_but_idle as u32;
- let num_sleepers = num_sleepers as u32;
-
- // If the queue is non-empty, then we always wake up a worker
- // -- clearly the existing idle jobs aren't enough. Otherwise,
- // check to see if we have enough idle workers.
- if !queue_was_empty {
- let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
- self.wake_any_threads(num_to_wake);
- } else if num_awake_but_idle < num_jobs {
- let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
- self.wake_any_threads(num_to_wake);
- }
- }
-
- #[cold]
- fn wake_any_threads(&self, mut num_to_wake: u32) {
- if num_to_wake > 0 {
- for i in 0..self.worker_sleep_states.len() {
- if self.wake_specific_thread(i) {
- num_to_wake -= 1;
- if num_to_wake == 0 {
- return;
- }
- }
- }
- }
- }
-
- fn wake_specific_thread(&self, index: usize) -> bool {
- let sleep_state = &self.worker_sleep_states[index];
-
- let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
- if *is_blocked {
- *is_blocked = false;
- sleep_state.condvar.notify_one();
-
- // When the thread went to sleep, it will have incremented
- // this value. When we wake it, its our job to decrement
- // it. We could have the thread do it, but that would
- // introduce a delay between when the thread was
- // *notified* and when this counter was decremented. That
- // might mislead people with new work into thinking that
- // there are sleeping threads that they should try to
- // wake, when in fact there is nothing left for them to
- // do.
- self.counters.sub_sleeping_thread();
-
- true
- } else {
- false
- }
- }
-}
-
-impl IdleState {
- fn wake_fully(&mut self) {
- self.rounds = 0;
- self.jobs_counter = JobsEventCounter::DUMMY;
- }
-
- fn wake_partly(&mut self) {
- self.rounds = ROUNDS_UNTIL_SLEEPY;
- self.jobs_counter = JobsEventCounter::DUMMY;
- }
-}
diff --git a/vendor/rayon-core/src/spawn/mod.rs b/vendor/rayon-core/src/spawn/mod.rs
deleted file mode 100644
index 1aa9edb..0000000
--- a/vendor/rayon-core/src/spawn/mod.rs
+++ /dev/null
@@ -1,163 +0,0 @@
-use crate::job::*;
-use crate::registry::Registry;
-use crate::unwind;
-use std::mem;
-use std::sync::Arc;
-
-/// Fires off a task into the Rayon threadpool in the "static" or
-/// "global" scope. Just like a standard thread, this task is not
-/// tied to the current stack frame, and hence it cannot hold any
-/// references other than those with `'static` lifetime. If you want
-/// to spawn a task that references stack data, use [the `scope()`
-/// function][scope] to create a scope.
-///
-/// [scope]: fn.scope.html
-///
-/// Since tasks spawned with this function cannot hold references into
-/// the enclosing stack frame, you almost certainly want to use a
-/// `move` closure as their argument (otherwise, the closure will
-/// typically hold references to any variables from the enclosing
-/// function that you happen to use).
-///
-/// This API assumes that the closure is executed purely for its
-/// side-effects (i.e., it might send messages, modify data protected
-/// by a mutex, or some such thing).
-///
-/// There is no guaranteed order of execution for spawns, given that
-/// other threads may steal tasks at any time. However, they are
-/// generally prioritized in a LIFO order on the thread from which
-/// they were spawned. Other threads always steal from the other end of
-/// the deque, like FIFO order. The idea is that "recent" tasks are
-/// most likely to be fresh in the local CPU's cache, while other
-/// threads can steal older "stale" tasks. For an alternate approach,
-/// consider [`spawn_fifo()`] instead.
-///
-/// [`spawn_fifo()`]: fn.spawn_fifo.html
-///
-/// # Panic handling
-///
-/// If this closure should panic, the resulting panic will be
-/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
-/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
-/// details.
-///
-/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
-///
-/// # Examples
-///
-/// This code creates a Rayon task that increments a global counter.
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
-///
-/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
-///
-/// rayon::spawn(move || {
-/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
-/// });
-/// ```
-pub fn spawn<F>(func: F)
-where
- F: FnOnce() + Send + 'static,
-{
- // We assert that current registry has not terminated.
- unsafe { spawn_in(func, &Registry::current()) }
-}
-
-/// Spawns an asynchronous job in `registry.`
-///
-/// Unsafe because `registry` must not yet have terminated.
-pub(super) unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
-where
- F: FnOnce() + Send + 'static,
-{
- // We assert that this does not hold any references (we know
- // this because of the `'static` bound in the interface);
- // moreover, we assert that the code below is not supposed to
- // be able to panic, and hence the data won't leak but will be
- // enqueued into some deque for later execution.
- let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
- let job_ref = spawn_job(func, registry);
- registry.inject_or_push(job_ref);
- mem::forget(abort_guard);
-}
-
-unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef
-where
- F: FnOnce() + Send + 'static,
-{
- // Ensure that registry cannot terminate until this job has
- // executed. This ref is decremented at the (*) below.
- registry.increment_terminate_count();
-
- HeapJob::new({
- let registry = Arc::clone(registry);
- move || {
- registry.catch_unwind(func);
- registry.terminate(); // (*) permit registry to terminate now
- }
- })
- .into_static_job_ref()
-}
-
-/// Fires off a task into the Rayon threadpool in the "static" or
-/// "global" scope. Just like a standard thread, this task is not
-/// tied to the current stack frame, and hence it cannot hold any
-/// references other than those with `'static` lifetime. If you want
-/// to spawn a task that references stack data, use [the `scope_fifo()`
-/// function](fn.scope_fifo.html) to create a scope.
-///
-/// The behavior is essentially the same as [the `spawn`
-/// function](fn.spawn.html), except that calls from the same thread
-/// will be prioritized in FIFO order. This is similar to the now-
-/// deprecated [`breadth_first`] option, except the effect is isolated
-/// to relative `spawn_fifo` calls, not all threadpool tasks.
-///
-/// For more details on this design, see Rayon [RFC #1].
-///
-/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
-/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
-///
-/// # Panic handling
-///
-/// If this closure should panic, the resulting panic will be
-/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
-/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
-/// details.
-///
-/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
-pub fn spawn_fifo<F>(func: F)
-where
- F: FnOnce() + Send + 'static,
-{
- // We assert that current registry has not terminated.
- unsafe { spawn_fifo_in(func, &Registry::current()) }
-}
-
-/// Spawns an asynchronous FIFO job in `registry.`
-///
-/// Unsafe because `registry` must not yet have terminated.
-pub(super) unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>)
-where
- F: FnOnce() + Send + 'static,
-{
- // We assert that this does not hold any references (we know
- // this because of the `'static` bound in the interface);
- // moreover, we assert that the code below is not supposed to
- // be able to panic, and hence the data won't leak but will be
- // enqueued into some deque for later execution.
- let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
- let job_ref = spawn_job(func, registry);
-
- // If we're in the pool, use our thread's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match registry.current_thread() {
- Some(worker) => worker.push_fifo(job_ref),
- None => registry.inject(job_ref),
- }
- mem::forget(abort_guard);
-}
-
-#[cfg(test)]
-mod test;
diff --git a/vendor/rayon-core/src/spawn/test.rs b/vendor/rayon-core/src/spawn/test.rs
deleted file mode 100644
index b7a0535..0000000
--- a/vendor/rayon-core/src/spawn/test.rs
+++ /dev/null
@@ -1,255 +0,0 @@
-use crate::scope;
-use std::any::Any;
-use std::sync::mpsc::channel;
-use std::sync::Mutex;
-
-use super::{spawn, spawn_fifo};
-use crate::ThreadPoolBuilder;
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_then_join_in_worker() {
- let (tx, rx) = channel();
- scope(move |_| {
- spawn(move || tx.send(22).unwrap());
- });
- assert_eq!(22, rx.recv().unwrap());
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_then_join_outside_worker() {
- let (tx, rx) = channel();
- spawn(move || tx.send(22).unwrap());
- assert_eq!(22, rx.recv().unwrap());
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_fwd() {
- let (tx, rx) = channel();
-
- let tx = Mutex::new(tx);
- let panic_handler = move |err: Box<dyn Any + Send>| {
- let tx = tx.lock().unwrap();
- if let Some(&msg) = err.downcast_ref::<&str>() {
- if msg == "Hello, world!" {
- tx.send(1).unwrap();
- } else {
- tx.send(2).unwrap();
- }
- } else {
- tx.send(3).unwrap();
- }
- };
-
- let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
-
- builder
- .build()
- .unwrap()
- .spawn(move || panic!("Hello, world!"));
-
- assert_eq!(1, rx.recv().unwrap());
-}
-
-/// Test what happens when the thread-pool is dropped but there are
-/// still active asynchronous tasks. We expect the thread-pool to stay
-/// alive and executing until those threads are complete.
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn termination_while_things_are_executing() {
- let (tx0, rx0) = channel();
- let (tx1, rx1) = channel();
-
- // Create a thread-pool and spawn some code in it, but then drop
- // our reference to it.
- {
- let thread_pool = ThreadPoolBuilder::new().build().unwrap();
- thread_pool.spawn(move || {
- let data = rx0.recv().unwrap();
-
- // At this point, we know the "main" reference to the
- // `ThreadPool` has been dropped, but there are still
- // active threads. Launch one more.
- spawn(move || {
- tx1.send(data).unwrap();
- });
- });
- }
-
- tx0.send(22).unwrap();
- let v = rx1.recv().unwrap();
- assert_eq!(v, 22);
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn custom_panic_handler_and_spawn() {
- let (tx, rx) = channel();
-
- // Create a parallel closure that will send panics on the
- // channel; since the closure is potentially executed in parallel
- // with itself, we have to wrap `tx` in a mutex.
- let tx = Mutex::new(tx);
- let panic_handler = move |e: Box<dyn Any + Send>| {
- tx.lock().unwrap().send(e).unwrap();
- };
-
- // Execute an async that will panic.
- let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
- builder.build().unwrap().spawn(move || {
- panic!("Hello, world!");
- });
-
- // Check that we got back the panic we expected.
- let error = rx.recv().unwrap();
- if let Some(&msg) = error.downcast_ref::<&str>() {
- assert_eq!(msg, "Hello, world!");
- } else {
- panic!("did not receive a string from panic handler");
- }
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn custom_panic_handler_and_nested_spawn() {
- let (tx, rx) = channel();
-
- // Create a parallel closure that will send panics on the
- // channel; since the closure is potentially executed in parallel
- // with itself, we have to wrap `tx` in a mutex.
- let tx = Mutex::new(tx);
- let panic_handler = move |e| {
- tx.lock().unwrap().send(e).unwrap();
- };
-
- // Execute an async that will (eventually) panic.
- const PANICS: usize = 3;
- let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
- builder.build().unwrap().spawn(move || {
- // launch 3 nested spawn-asyncs; these should be in the same
- // thread-pool and hence inherit the same panic handler
- for _ in 0..PANICS {
- spawn(move || {
- panic!("Hello, world!");
- });
- }
- });
-
- // Check that we get back the panics we expected.
- for _ in 0..PANICS {
- let error = rx.recv().unwrap();
- if let Some(&msg) = error.downcast_ref::<&str>() {
- assert_eq!(msg, "Hello, world!");
- } else {
- panic!("did not receive a string from panic handler");
- }
- }
-}
-
-macro_rules! test_order {
- ($outer_spawn:ident, $inner_spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- let (tx, rx) = channel();
- pool.install(move || {
- for i in 0..10 {
- let tx = tx.clone();
- $outer_spawn(move || {
- for j in 0..10 {
- let tx = tx.clone();
- $inner_spawn(move || {
- tx.send(i * 10 + j).unwrap();
- });
- }
- });
- }
- });
- rx.iter().collect::<Vec<i32>>()
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn lifo_order() {
- // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
- let vec = test_order!(spawn, spawn);
- let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn fifo_order() {
- // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
- let vec = test_order!(spawn_fifo, spawn_fifo);
- let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn lifo_fifo_order() {
- // LIFO on the outside, FIFO on the inside
- let vec = test_order!(spawn, spawn_fifo);
- let expected: Vec<i32> = (0..10)
- .rev()
- .flat_map(|i| (0..10).map(move |j| i * 10 + j))
- .collect();
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn fifo_lifo_order() {
- // FIFO on the outside, LIFO on the inside
- let vec = test_order!(spawn_fifo, spawn);
- let expected: Vec<i32> = (0..10)
- .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
- .collect();
- assert_eq!(vec, expected);
-}
-
-macro_rules! spawn_send {
- ($spawn:ident, $tx:ident, $i:expr) => {{
- let tx = $tx.clone();
- $spawn(move || tx.send($i).unwrap());
- }};
-}
-
-/// Test mixed spawns pushing a series of numbers, interleaved such
-/// such that negative values are using the second kind of spawn.
-macro_rules! test_mixed_order {
- ($pos_spawn:ident, $neg_spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- let (tx, rx) = channel();
- pool.install(move || {
- spawn_send!($pos_spawn, tx, 0);
- spawn_send!($neg_spawn, tx, -1);
- spawn_send!($pos_spawn, tx, 1);
- spawn_send!($neg_spawn, tx, -2);
- spawn_send!($pos_spawn, tx, 2);
- spawn_send!($neg_spawn, tx, -3);
- spawn_send!($pos_spawn, tx, 3);
- });
- rx.iter().collect::<Vec<i32>>()
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_lifo_fifo_order() {
- let vec = test_mixed_order!(spawn, spawn_fifo);
- let expected = vec![3, -1, 2, -2, 1, -3, 0];
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mixed_fifo_lifo_order() {
- let vec = test_mixed_order!(spawn_fifo, spawn);
- let expected = vec![0, -3, 1, -2, 2, -1, 3];
- assert_eq!(vec, expected);
-}
diff --git a/vendor/rayon-core/src/test.rs b/vendor/rayon-core/src/test.rs
deleted file mode 100644
index 25b8487..0000000
--- a/vendor/rayon-core/src/test.rs
+++ /dev/null
@@ -1,200 +0,0 @@
-#![cfg(test)]
-
-use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Barrier};
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn worker_thread_index() {
- let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- assert_eq!(pool.current_num_threads(), 22);
- assert_eq!(pool.current_thread_index(), None);
- let index = pool.install(|| pool.current_thread_index().unwrap());
- assert!(index < 22);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn start_callback_called() {
- let n_threads = 16;
- let n_called = Arc::new(AtomicUsize::new(0));
- // Wait for all the threads in the pool plus the one running tests.
- let barrier = Arc::new(Barrier::new(n_threads + 1));
-
- let b = Arc::clone(&barrier);
- let nc = Arc::clone(&n_called);
- let start_handler = move |_| {
- nc.fetch_add(1, Ordering::SeqCst);
- b.wait();
- };
-
- let conf = ThreadPoolBuilder::new()
- .num_threads(n_threads)
- .start_handler(start_handler);
- let _ = conf.build().unwrap();
-
- // Wait for all the threads to have been scheduled to run.
- barrier.wait();
-
- // The handler must have been called on every started thread.
- assert_eq!(n_called.load(Ordering::SeqCst), n_threads);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn exit_callback_called() {
- let n_threads = 16;
- let n_called = Arc::new(AtomicUsize::new(0));
- // Wait for all the threads in the pool plus the one running tests.
- let barrier = Arc::new(Barrier::new(n_threads + 1));
-
- let b = Arc::clone(&barrier);
- let nc = Arc::clone(&n_called);
- let exit_handler = move |_| {
- nc.fetch_add(1, Ordering::SeqCst);
- b.wait();
- };
-
- let conf = ThreadPoolBuilder::new()
- .num_threads(n_threads)
- .exit_handler(exit_handler);
- {
- let _ = conf.build().unwrap();
- // Drop the pool so it stops the running threads.
- }
-
- // Wait for all the threads to have been scheduled to run.
- barrier.wait();
-
- // The handler must have been called on every exiting thread.
- assert_eq!(n_called.load(Ordering::SeqCst), n_threads);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn handler_panics_handled_correctly() {
- let n_threads = 16;
- let n_called = Arc::new(AtomicUsize::new(0));
- // Wait for all the threads in the pool plus the one running tests.
- let start_barrier = Arc::new(Barrier::new(n_threads + 1));
- let exit_barrier = Arc::new(Barrier::new(n_threads + 1));
-
- let start_handler = move |_| {
- panic!("ensure panic handler is called when starting");
- };
- let exit_handler = move |_| {
- panic!("ensure panic handler is called when exiting");
- };
-
- let sb = Arc::clone(&start_barrier);
- let eb = Arc::clone(&exit_barrier);
- let nc = Arc::clone(&n_called);
- let panic_handler = move |_| {
- let val = nc.fetch_add(1, Ordering::SeqCst);
- if val < n_threads {
- sb.wait();
- } else {
- eb.wait();
- }
- };
-
- let conf = ThreadPoolBuilder::new()
- .num_threads(n_threads)
- .start_handler(start_handler)
- .exit_handler(exit_handler)
- .panic_handler(panic_handler);
- {
- let _ = conf.build().unwrap();
-
- // Wait for all the threads to start, panic in the start handler,
- // and been taken care of by the panic handler.
- start_barrier.wait();
-
- // Drop the pool so it stops the running threads.
- }
-
- // Wait for all the threads to exit, panic in the exit handler,
- // and been taken care of by the panic handler.
- exit_barrier.wait();
-
- // The panic handler must have been called twice on every thread.
- assert_eq!(n_called.load(Ordering::SeqCst), 2 * n_threads);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn check_config_build() {
- let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- assert_eq!(pool.current_num_threads(), 22);
-}
-
-/// Helper used by check_error_send_sync to ensure ThreadPoolBuildError is Send + Sync
-fn _send_sync<T: Send + Sync>() {}
-
-#[test]
-fn check_error_send_sync() {
- _send_sync::<ThreadPoolBuildError>();
-}
-
-#[allow(deprecated)]
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn configuration() {
- let start_handler = move |_| {};
- let exit_handler = move |_| {};
- let panic_handler = move |_| {};
- let thread_name = move |i| format!("thread_name_{}", i);
-
- // Ensure we can call all public methods on Configuration
- crate::Configuration::new()
- .thread_name(thread_name)
- .num_threads(5)
- .panic_handler(panic_handler)
- .stack_size(4e6 as usize)
- .breadth_first()
- .start_handler(start_handler)
- .exit_handler(exit_handler)
- .build()
- .unwrap();
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn default_pool() {
- ThreadPoolBuilder::default().build().unwrap();
-}
-
-/// Test that custom spawned threads get their `WorkerThread` cleared once
-/// the pool is done with them, allowing them to be used with rayon again
-/// later. e.g. WebAssembly want to have their own pool of available threads.
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
- let n_threads = 5;
- let mut handles = vec![];
- let pool = ThreadPoolBuilder::new()
- .num_threads(n_threads)
- .spawn_handler(|thread| {
- let handle = std::thread::spawn(move || {
- thread.run();
-
- // Afterward, the current thread shouldn't be set anymore.
- assert_eq!(crate::current_thread_index(), None);
- });
- handles.push(handle);
- Ok(())
- })
- .build()?;
- assert_eq!(handles.len(), n_threads);
-
- pool.install(|| assert!(crate::current_thread_index().is_some()));
- drop(pool);
-
- // Wait for all threads to make their assertions and exit
- for handle in handles {
- handle.join().unwrap();
- }
-
- Ok(())
-}
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs
deleted file mode 100644
index c37826e..0000000
--- a/vendor/rayon-core/src/thread_pool/mod.rs
+++ /dev/null
@@ -1,471 +0,0 @@
-//! Contains support for user-managed thread pools, represented by the
-//! the [`ThreadPool`] type (see that struct for details).
-//!
-//! [`ThreadPool`]: struct.ThreadPool.html
-
-use crate::broadcast::{self, BroadcastContext};
-use crate::join;
-use crate::registry::{Registry, ThreadSpawn, WorkerThread};
-use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
-use crate::spawn;
-use crate::{scope, Scope};
-use crate::{scope_fifo, ScopeFifo};
-use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
-use std::error::Error;
-use std::fmt;
-use std::sync::Arc;
-
-mod test;
-
-/// Represents a user created [thread-pool].
-///
-/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
-/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
-/// execute functions explicitly within this [`ThreadPool`] using
-/// [`ThreadPool::install()`]. By contrast, top level rayon functions
-/// (like `join()`) will execute implicitly within the current thread-pool.
-///
-///
-/// ## Creating a ThreadPool
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
-/// ```
-///
-/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
-/// threads. In addition, any other rayon operations called inside of `install()` will also
-/// execute in the context of the `ThreadPool`.
-///
-/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
-/// they will complete executing any remaining work that you have spawned, and automatically
-/// terminate.
-///
-///
-/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
-/// [`ThreadPool`]: struct.ThreadPool.html
-/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
-/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
-/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
-/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
-pub struct ThreadPool {
- registry: Arc<Registry>,
-}
-
-impl ThreadPool {
- #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
- #[allow(deprecated)]
- /// Deprecated in favor of `ThreadPoolBuilder::build`.
- pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
- Self::build(configuration.into_builder()).map_err(Box::from)
- }
-
- pub(super) fn build<S>(
- builder: ThreadPoolBuilder<S>,
- ) -> Result<ThreadPool, ThreadPoolBuildError>
- where
- S: ThreadSpawn,
- {
- let registry = Registry::new(builder)?;
- Ok(ThreadPool { registry })
- }
-
- /// Executes `op` within the threadpool. Any attempts to use
- /// `join`, `scope`, or parallel iterators will then operate
- /// within that threadpool.
- ///
- /// # Warning: thread-local data
- ///
- /// Because `op` is executing within the Rayon thread-pool,
- /// thread-local data from the current thread will not be
- /// accessible.
- ///
- /// # Panics
- ///
- /// If `op` should panic, that panic will be propagated.
- ///
- /// ## Using `install()`
- ///
- /// ```rust
- /// # use rayon_core as rayon;
- /// fn main() {
- /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
- /// let n = pool.install(|| fib(20));
- /// println!("{}", n);
- /// }
- ///
- /// fn fib(n: usize) -> usize {
- /// if n == 0 || n == 1 {
- /// return n;
- /// }
- /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
- /// return a + b;
- /// }
- /// ```
- pub fn install<OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce() -> R + Send,
- R: Send,
- {
- self.registry.in_worker(|_, _| op())
- }
-
- /// Executes `op` within every thread in the threadpool. Any attempts to use
- /// `join`, `scope`, or parallel iterators will then operate within that
- /// threadpool.
- ///
- /// Broadcasts are executed on each thread after they have exhausted their
- /// local work queue, before they attempt work-stealing from other threads.
- /// The goal of that strategy is to run everywhere in a timely manner
- /// *without* being too disruptive to current work. There may be alternative
- /// broadcast styles added in the future for more or less aggressive
- /// injection, if the need arises.
- ///
- /// # Warning: thread-local data
- ///
- /// Because `op` is executing within the Rayon thread-pool,
- /// thread-local data from the current thread will not be
- /// accessible.
- ///
- /// # Panics
- ///
- /// If `op` should panic on one or more threads, exactly one panic
- /// will be propagated, only after all threads have completed
- /// (or panicked) their own `op`.
- ///
- /// # Examples
- ///
- /// ```
- /// # use rayon_core as rayon;
- /// use std::sync::atomic::{AtomicUsize, Ordering};
- ///
- /// fn main() {
- /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
- ///
- /// // The argument gives context, including the index of each thread.
- /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
- /// assert_eq!(v, &[0, 1, 4, 9, 16]);
- ///
- /// // The closure can reference the local stack
- /// let count = AtomicUsize::new(0);
- /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
- /// assert_eq!(count.into_inner(), 5);
- /// }
- /// ```
- pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
- where
- OP: Fn(BroadcastContext<'_>) -> R + Sync,
- R: Send,
- {
- // We assert that `self.registry` has not terminated.
- unsafe { broadcast::broadcast_in(op, &self.registry) }
- }
-
- /// Returns the (current) number of threads in the thread pool.
- ///
- /// # Future compatibility note
- ///
- /// Note that unless this thread-pool was created with a
- /// [`ThreadPoolBuilder`] that specifies the number of threads,
- /// then this number may vary over time in future versions (see [the
- /// `num_threads()` method for details][snt]).
- ///
- /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
- /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
- #[inline]
- pub fn current_num_threads(&self) -> usize {
- self.registry.num_threads()
- }
-
- /// If called from a Rayon worker thread in this thread-pool,
- /// returns the index of that thread; if not called from a Rayon
- /// thread, or called from a Rayon thread that belongs to a
- /// different thread-pool, returns `None`.
- ///
- /// The index for a given thread will not change over the thread's
- /// lifetime. However, multiple threads may share the same index if
- /// they are in distinct thread-pools.
- ///
- /// # Future compatibility note
- ///
- /// Currently, every thread-pool (including the global
- /// thread-pool) has a fixed number of threads, but this may
- /// change in future Rayon versions (see [the `num_threads()` method
- /// for details][snt]). In that case, the index for a
- /// thread would not change during its lifetime, but thread
- /// indices may wind up being reused if threads are terminated and
- /// restarted.
- ///
- /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
- #[inline]
- pub fn current_thread_index(&self) -> Option<usize> {
- let curr = self.registry.current_thread()?;
- Some(curr.index())
- }
-
- /// Returns true if the current worker thread currently has "local
- /// tasks" pending. This can be useful as part of a heuristic for
- /// deciding whether to spawn a new task or execute code on the
- /// current thread, particularly in breadth-first
- /// schedulers. However, keep in mind that this is an inherently
- /// racy check, as other worker threads may be actively "stealing"
- /// tasks from our local deque.
- ///
- /// **Background:** Rayon's uses a [work-stealing] scheduler. The
- /// key idea is that each thread has its own [deque] of
- /// tasks. Whenever a new task is spawned -- whether through
- /// `join()`, `Scope::spawn()`, or some other means -- that new
- /// task is pushed onto the thread's *local* deque. Worker threads
- /// have a preference for executing their own tasks; if however
- /// they run out of tasks, they will go try to "steal" tasks from
- /// other threads. This function therefore has an inherent race
- /// with other active worker threads, which may be removing items
- /// from the local deque.
- ///
- /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
- /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
- #[inline]
- pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
- let curr = self.registry.current_thread()?;
- Some(!curr.local_deque_is_empty())
- }
-
- /// Execute `oper_a` and `oper_b` in the thread-pool and return
- /// the results. Equivalent to `self.install(|| join(oper_a,
- /// oper_b))`.
- pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
- where
- A: FnOnce() -> RA + Send,
- B: FnOnce() -> RB + Send,
- RA: Send,
- RB: Send,
- {
- self.install(|| join(oper_a, oper_b))
- }
-
- /// Creates a scope that executes within this thread-pool.
- /// Equivalent to `self.install(|| scope(...))`.
- ///
- /// See also: [the `scope()` function][scope].
- ///
- /// [scope]: fn.scope.html
- pub fn scope<'scope, OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&Scope<'scope>) -> R + Send,
- R: Send,
- {
- self.install(|| scope(op))
- }
-
- /// Creates a scope that executes within this thread-pool.
- /// Spawns from the same thread are prioritized in relative FIFO order.
- /// Equivalent to `self.install(|| scope_fifo(...))`.
- ///
- /// See also: [the `scope_fifo()` function][scope_fifo].
- ///
- /// [scope_fifo]: fn.scope_fifo.html
- pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
- R: Send,
- {
- self.install(|| scope_fifo(op))
- }
-
- /// Creates a scope that spawns work into this thread-pool.
- ///
- /// See also: [the `in_place_scope()` function][in_place_scope].
- ///
- /// [in_place_scope]: fn.in_place_scope.html
- pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&Scope<'scope>) -> R,
- {
- do_in_place_scope(Some(&self.registry), op)
- }
-
- /// Creates a scope that spawns work into this thread-pool in FIFO order.
- ///
- /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
- ///
- /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
- pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
- where
- OP: FnOnce(&ScopeFifo<'scope>) -> R,
- {
- do_in_place_scope_fifo(Some(&self.registry), op)
- }
-
- /// Spawns an asynchronous task in this thread-pool. This task will
- /// run in the implicit, global scope, which means that it may outlast
- /// the current stack frame -- therefore, it cannot capture any references
- /// onto the stack (you will likely need a `move` closure).
- ///
- /// See also: [the `spawn()` function defined on scopes][spawn].
- ///
- /// [spawn]: struct.Scope.html#method.spawn
- pub fn spawn<OP>(&self, op: OP)
- where
- OP: FnOnce() + Send + 'static,
- {
- // We assert that `self.registry` has not terminated.
- unsafe { spawn::spawn_in(op, &self.registry) }
- }
-
- /// Spawns an asynchronous task in this thread-pool. This task will
- /// run in the implicit, global scope, which means that it may outlast
- /// the current stack frame -- therefore, it cannot capture any references
- /// onto the stack (you will likely need a `move` closure).
- ///
- /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
- ///
- /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
- pub fn spawn_fifo<OP>(&self, op: OP)
- where
- OP: FnOnce() + Send + 'static,
- {
- // We assert that `self.registry` has not terminated.
- unsafe { spawn::spawn_fifo_in(op, &self.registry) }
- }
-
- /// Spawns an asynchronous task on every thread in this thread-pool. This task
- /// will run in the implicit, global scope, which means that it may outlast the
- /// current stack frame -- therefore, it cannot capture any references onto the
- /// stack (you will likely need a `move` closure).
- pub fn spawn_broadcast<OP>(&self, op: OP)
- where
- OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
- {
- // We assert that `self.registry` has not terminated.
- unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
- }
-
- /// Cooperatively yields execution to Rayon.
- ///
- /// This is similar to the general [`yield_now()`], but only if the current
- /// thread is part of *this* thread pool.
- ///
- /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
- /// nothing was available, or `None` if the current thread is not part this pool.
- pub fn yield_now(&self) -> Option<Yield> {
- let curr = self.registry.current_thread()?;
- Some(curr.yield_now())
- }
-
- /// Cooperatively yields execution to local Rayon work.
- ///
- /// This is similar to the general [`yield_local()`], but only if the current
- /// thread is part of *this* thread pool.
- ///
- /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
- /// nothing was available, or `None` if the current thread is not part this pool.
- pub fn yield_local(&self) -> Option<Yield> {
- let curr = self.registry.current_thread()?;
- Some(curr.yield_local())
- }
-}
-
-impl Drop for ThreadPool {
- fn drop(&mut self) {
- self.registry.terminate();
- }
-}
-
-impl fmt::Debug for ThreadPool {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("ThreadPool")
- .field("num_threads", &self.current_num_threads())
- .field("id", &self.registry.id())
- .finish()
- }
-}
-
-/// If called from a Rayon worker thread, returns the index of that
-/// thread within its current pool; if not called from a Rayon thread,
-/// returns `None`.
-///
-/// The index for a given thread will not change over the thread's
-/// lifetime. However, multiple threads may share the same index if
-/// they are in distinct thread-pools.
-///
-/// See also: [the `ThreadPool::current_thread_index()` method].
-///
-/// [m]: struct.ThreadPool.html#method.current_thread_index
-///
-/// # Future compatibility note
-///
-/// Currently, every thread-pool (including the global
-/// thread-pool) has a fixed number of threads, but this may
-/// change in future Rayon versions (see [the `num_threads()` method
-/// for details][snt]). In that case, the index for a
-/// thread would not change during its lifetime, but thread
-/// indices may wind up being reused if threads are terminated and
-/// restarted.
-///
-/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
-#[inline]
-pub fn current_thread_index() -> Option<usize> {
- unsafe {
- let curr = WorkerThread::current().as_ref()?;
- Some(curr.index())
- }
-}
-
-/// If called from a Rayon worker thread, indicates whether that
-/// thread's local deque still has pending tasks. Otherwise, returns
-/// `None`. For more information, see [the
-/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
-///
-/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
-#[inline]
-pub fn current_thread_has_pending_tasks() -> Option<bool> {
- unsafe {
- let curr = WorkerThread::current().as_ref()?;
- Some(!curr.local_deque_is_empty())
- }
-}
-
-/// Cooperatively yields execution to Rayon.
-///
-/// If the current thread is part of a rayon thread pool, this looks for a
-/// single unit of pending work in the pool, then executes it. Completion of
-/// that work might include nested work or further work stealing.
-///
-/// This is similar to [`std::thread::yield_now()`], but does not literally make
-/// that call. If you are implementing a polling loop, you may want to also
-/// yield to the OS scheduler yourself if no Rayon work was found.
-///
-/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
-/// nothing was available, or `None` if this thread is not part of any pool at all.
-pub fn yield_now() -> Option<Yield> {
- unsafe {
- let thread = WorkerThread::current().as_ref()?;
- Some(thread.yield_now())
- }
-}
-
-/// Cooperatively yields execution to local Rayon work.
-///
-/// If the current thread is part of a rayon thread pool, this looks for a
-/// single unit of pending work in this thread's queue, then executes it.
-/// Completion of that work might include nested work or further work stealing.
-///
-/// This is similar to [`yield_now()`], but does not steal from other threads.
-///
-/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
-/// nothing was available, or `None` if this thread is not part of any pool at all.
-pub fn yield_local() -> Option<Yield> {
- unsafe {
- let thread = WorkerThread::current().as_ref()?;
- Some(thread.yield_local())
- }
-}
-
-/// Result of [`yield_now()`] or [`yield_local()`].
-#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-pub enum Yield {
- /// Work was found and executed.
- Executed,
- /// No available work was found.
- Idle,
-}
diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs
deleted file mode 100644
index 88b3628..0000000
--- a/vendor/rayon-core/src/thread_pool/test.rs
+++ /dev/null
@@ -1,418 +0,0 @@
-#![cfg(test)]
-
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::channel;
-use std::sync::{Arc, Mutex};
-
-use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate() {
- let thread_pool = ThreadPoolBuilder::new().build().unwrap();
- thread_pool.install(|| {
- panic!("Hello, world!");
- });
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn workers_stop() {
- let registry;
-
- {
- // once we exit this block, thread-pool will be dropped
- let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- registry = thread_pool.install(|| {
- // do some work on these threads
- join_a_lot(22);
-
- Arc::clone(&thread_pool.registry)
- });
- assert_eq!(registry.num_threads(), 22);
- }
-
- // once thread-pool is dropped, registry should terminate, which
- // should lead to worker threads stopping
- registry.wait_until_stopped();
-}
-
-fn join_a_lot(n: usize) {
- if n > 0 {
- join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
- }
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn sleeper_stop() {
- use std::{thread, time};
-
- let registry;
-
- {
- // once we exit this block, thread-pool will be dropped
- let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- registry = Arc::clone(&thread_pool.registry);
-
- // Give time for at least some of the thread pool to fall asleep.
- thread::sleep(time::Duration::from_secs(1));
- }
-
- // once thread-pool is dropped, registry should terminate, which
- // should lead to worker threads stopping
- registry.wait_until_stopped();
-}
-
-/// Creates a start/exit handler that increments an atomic counter.
-fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
- let count = Arc::new(AtomicUsize::new(0));
- (Arc::clone(&count), move |_| {
- count.fetch_add(1, Ordering::SeqCst);
- })
-}
-
-/// Wait until a counter is no longer shared, then return its value.
-fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
- use std::{thread, time};
-
- for _ in 0..60 {
- counter = match Arc::try_unwrap(counter) {
- Ok(counter) => return counter.into_inner(),
- Err(counter) => {
- thread::sleep(time::Duration::from_secs(1));
- counter
- }
- };
- }
-
- // That's too long!
- panic!("Counter is still shared!");
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn failed_thread_stack() {
- // Note: we first tried to force failure with a `usize::MAX` stack, but
- // macOS and Windows weren't fazed, or at least didn't fail the way we want.
- // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
- // 2GB stack, so it might not fail until the second thread.
- let stack_size = ::std::isize::MAX as usize;
-
- let (start_count, start_handler) = count_handler();
- let (exit_count, exit_handler) = count_handler();
- let builder = ThreadPoolBuilder::new()
- .num_threads(10)
- .stack_size(stack_size)
- .start_handler(start_handler)
- .exit_handler(exit_handler);
-
- let pool = builder.build();
- assert!(pool.is_err(), "thread stack should have failed!");
-
- // With such a huge stack, 64-bit will probably fail on the first thread;
- // 32-bit might manage the first 2GB, but certainly fail the second.
- let start_count = wait_for_counter(start_count);
- assert!(start_count <= 1);
- assert_eq!(start_count, wait_for_counter(exit_count));
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_thread_name() {
- let (start_count, start_handler) = count_handler();
- let (exit_count, exit_handler) = count_handler();
- let builder = ThreadPoolBuilder::new()
- .num_threads(10)
- .start_handler(start_handler)
- .exit_handler(exit_handler)
- .thread_name(|i| {
- if i >= 5 {
- panic!();
- }
- format!("panic_thread_name#{}", i)
- });
-
- let pool = crate::unwind::halt_unwinding(|| builder.build());
- assert!(pool.is_err(), "thread-name panic should propagate!");
-
- // Assuming they're created in order, threads 0 through 4 should have
- // been started already, and then terminated by the panic.
- assert_eq!(5, wait_for_counter(start_count));
- assert_eq!(5, wait_for_counter(exit_count));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn self_install() {
- let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
-
- // If the inner `install` blocks, then nothing will actually run it!
- assert!(pool.install(|| pool.install(|| true)));
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mutual_install() {
- let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
-
- let ok = pool1.install(|| {
- // This creates a dependency from `pool1` -> `pool2`
- pool2.install(|| {
- // This creates a dependency from `pool2` -> `pool1`
- pool1.install(|| {
- // If they blocked on inter-pool installs, there would be no
- // threads left to run this!
- true
- })
- })
- });
- assert!(ok);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn mutual_install_sleepy() {
- use std::{thread, time};
-
- let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
-
- let ok = pool1.install(|| {
- // This creates a dependency from `pool1` -> `pool2`
- pool2.install(|| {
- // Give `pool1` time to fall asleep.
- thread::sleep(time::Duration::from_secs(1));
-
- // This creates a dependency from `pool2` -> `pool1`
- pool1.install(|| {
- // Give `pool2` time to fall asleep.
- thread::sleep(time::Duration::from_secs(1));
-
- // If they blocked on inter-pool installs, there would be no
- // threads left to run this!
- true
- })
- })
- });
- assert!(ok);
-}
-
-#[test]
-#[allow(deprecated)]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn check_thread_pool_new() {
- let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
- assert_eq!(pool.current_num_threads(), 22);
-}
-
-macro_rules! test_scope_order {
- ($scope:ident => $spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = builder.build().unwrap();
- pool.install(|| {
- let vec = Mutex::new(vec![]);
- pool.$scope(|scope| {
- let vec = &vec;
- for i in 0..10 {
- scope.$spawn(move |_| {
- vec.lock().unwrap().push(i);
- });
- }
- });
- vec.into_inner().unwrap()
- })
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn scope_lifo_order() {
- let vec = test_scope_order!(scope => spawn);
- let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn scope_fifo_order() {
- let vec = test_scope_order!(scope_fifo => spawn_fifo);
- let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
- assert_eq!(vec, expected);
-}
-
-macro_rules! test_spawn_order {
- ($spawn:ident) => {{
- let builder = ThreadPoolBuilder::new().num_threads(1);
- let pool = &builder.build().unwrap();
- let (tx, rx) = channel();
- pool.install(move || {
- for i in 0..10 {
- let tx = tx.clone();
- pool.$spawn(move || {
- tx.send(i).unwrap();
- });
- }
- });
- rx.iter().collect::<Vec<i32>>()
- }};
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_lifo_order() {
- let vec = test_spawn_order!(spawn);
- let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn spawn_fifo_order() {
- let vec = test_spawn_order!(spawn_fifo);
- let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
- assert_eq!(vec, expected);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_scopes() {
- // Create matching scopes for every thread pool.
- fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
- where
- OP: FnOnce(&[&Scope<'scope>]) + Send,
- {
- if let Some((pool, tail)) = pools.split_first() {
- pool.scope(move |s| {
- // This move reduces the reference lifetimes by variance to match s,
- // but the actual scopes are still tied to the invariant 'scope.
- let mut scopes = scopes;
- scopes.push(s);
- nest(tail, scopes, op)
- })
- } else {
- (op)(&scopes)
- }
- }
-
- let pools: Vec<_> = (0..10)
- .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
- .collect();
-
- let counter = AtomicUsize::new(0);
- nest(&pools, vec![], |scopes| {
- for &s in scopes {
- s.spawn(|_| {
- // Our 'scope lets us borrow the counter in every pool.
- counter.fetch_add(1, Ordering::Relaxed);
- });
- }
- });
- assert_eq!(counter.into_inner(), pools.len());
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn nested_fifo_scopes() {
- // Create matching fifo scopes for every thread pool.
- fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
- where
- OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
- {
- if let Some((pool, tail)) = pools.split_first() {
- pool.scope_fifo(move |s| {
- // This move reduces the reference lifetimes by variance to match s,
- // but the actual scopes are still tied to the invariant 'scope.
- let mut scopes = scopes;
- scopes.push(s);
- nest(tail, scopes, op)
- })
- } else {
- (op)(&scopes)
- }
- }
-
- let pools: Vec<_> = (0..10)
- .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
- .collect();
-
- let counter = AtomicUsize::new(0);
- nest(&pools, vec![], |scopes| {
- for &s in scopes {
- s.spawn_fifo(|_| {
- // Our 'scope lets us borrow the counter in every pool.
- counter.fetch_add(1, Ordering::Relaxed);
- });
- }
- });
- assert_eq!(counter.into_inner(), pools.len());
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn in_place_scope_no_deadlock() {
- let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let (tx, rx) = channel();
- let rx_ref = &rx;
- pool.in_place_scope(move |s| {
- // With regular scopes this closure would never run because this scope op
- // itself would block the only worker thread.
- s.spawn(move |_| {
- tx.send(()).unwrap();
- });
- rx_ref.recv().unwrap();
- });
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn in_place_scope_fifo_no_deadlock() {
- let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let (tx, rx) = channel();
- let rx_ref = &rx;
- pool.in_place_scope_fifo(move |s| {
- // With regular scopes this closure would never run because this scope op
- // itself would block the only worker thread.
- s.spawn_fifo(move |_| {
- tx.send(()).unwrap();
- });
- rx_ref.recv().unwrap();
- });
-}
-
-#[test]
-fn yield_now_to_spawn() {
- let (tx, rx) = channel();
-
- // Queue a regular spawn.
- crate::spawn(move || tx.send(22).unwrap());
-
- // The single-threaded fallback mode (for wasm etc.) won't
- // get a chance to run the spawn if we never yield to it.
- crate::registry::in_worker(move |_, _| {
- crate::yield_now();
- });
-
- // The spawn **must** have started by now, but we still might have to wait
- // for it to finish if a different thread stole it first.
- assert_eq!(22, rx.recv().unwrap());
-}
-
-#[test]
-fn yield_local_to_spawn() {
- let (tx, rx) = channel();
-
- // Queue a regular spawn.
- crate::spawn(move || tx.send(22).unwrap());
-
- // The single-threaded fallback mode (for wasm etc.) won't
- // get a chance to run the spawn if we never yield to it.
- crate::registry::in_worker(move |_, _| {
- crate::yield_local();
- });
-
- // The spawn **must** have started by now, but we still might have to wait
- // for it to finish if a different thread stole it first.
- assert_eq!(22, rx.recv().unwrap());
-}
diff --git a/vendor/rayon-core/src/unwind.rs b/vendor/rayon-core/src/unwind.rs
deleted file mode 100644
index 9671fa5..0000000
--- a/vendor/rayon-core/src/unwind.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-//! Package up unwind recovery. Note that if you are in some sensitive
-//! place, you can use the `AbortIfPanic` helper to protect against
-//! accidental panics in the rayon code itself.
-
-use std::any::Any;
-use std::panic::{self, AssertUnwindSafe};
-use std::thread;
-
-/// Executes `f` and captures any panic, translating that panic into a
-/// `Err` result. The assumption is that any panic will be propagated
-/// later with `resume_unwinding`, and hence `f` can be treated as
-/// exception safe.
-pub(super) fn halt_unwinding<F, R>(func: F) -> thread::Result<R>
-where
- F: FnOnce() -> R,
-{
- panic::catch_unwind(AssertUnwindSafe(func))
-}
-
-pub(super) fn resume_unwinding(payload: Box<dyn Any + Send>) -> ! {
- panic::resume_unwind(payload)
-}
-
-pub(super) struct AbortIfPanic;
-
-impl Drop for AbortIfPanic {
- fn drop(&mut self) {
- eprintln!("Rayon: detected unexpected panic; aborting");
- ::std::process::abort();
- }
-}