aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/thread_pool
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/thread_pool')
-rw-r--r--vendor/rayon-core/src/thread_pool/mod.rs471
-rw-r--r--vendor/rayon-core/src/thread_pool/test.rs418
2 files changed, 889 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs
new file mode 100644
index 0000000..c37826e
--- /dev/null
+++ b/vendor/rayon-core/src/thread_pool/mod.rs
@@ -0,0 +1,471 @@
+//! Contains support for user-managed thread pools, represented by the
+//! the [`ThreadPool`] type (see that struct for details).
+//!
+//! [`ThreadPool`]: struct.ThreadPool.html
+
+use crate::broadcast::{self, BroadcastContext};
+use crate::join;
+use crate::registry::{Registry, ThreadSpawn, WorkerThread};
+use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
+use crate::spawn;
+use crate::{scope, Scope};
+use crate::{scope_fifo, ScopeFifo};
+use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
+use std::error::Error;
+use std::fmt;
+use std::sync::Arc;
+
+mod test;
+
+/// Represents a user created [thread-pool].
+///
+/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
+/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
+/// execute functions explicitly within this [`ThreadPool`] using
+/// [`ThreadPool::install()`]. By contrast, top level rayon functions
+/// (like `join()`) will execute implicitly within the current thread-pool.
+///
+///
+/// ## Creating a ThreadPool
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
+/// ```
+///
+/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
+/// threads. In addition, any other rayon operations called inside of `install()` will also
+/// execute in the context of the `ThreadPool`.
+///
+/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
+/// they will complete executing any remaining work that you have spawned, and automatically
+/// terminate.
+///
+///
+/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
+/// [`ThreadPool`]: struct.ThreadPool.html
+/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
+/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
+/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
+/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
+pub struct ThreadPool {
+ registry: Arc<Registry>,
+}
+
+impl ThreadPool {
+ #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
+ #[allow(deprecated)]
+ /// Deprecated in favor of `ThreadPoolBuilder::build`.
+ pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
+ Self::build(configuration.into_builder()).map_err(Box::from)
+ }
+
+ pub(super) fn build<S>(
+ builder: ThreadPoolBuilder<S>,
+ ) -> Result<ThreadPool, ThreadPoolBuildError>
+ where
+ S: ThreadSpawn,
+ {
+ let registry = Registry::new(builder)?;
+ Ok(ThreadPool { registry })
+ }
+
+ /// Executes `op` within the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate
+ /// within that threadpool.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic, that panic will be propagated.
+ ///
+ /// ## Using `install()`
+ ///
+ /// ```rust
+ /// # use rayon_core as rayon;
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
+ /// let n = pool.install(|| fib(20));
+ /// println!("{}", n);
+ /// }
+ ///
+ /// fn fib(n: usize) -> usize {
+ /// if n == 0 || n == 1 {
+ /// return n;
+ /// }
+ /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
+ /// return a + b;
+ /// }
+ /// ```
+ pub fn install<OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce() -> R + Send,
+ R: Send,
+ {
+ self.registry.in_worker(|_, _| op())
+ }
+
+ /// Executes `op` within every thread in the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate within that
+ /// threadpool.
+ ///
+ /// Broadcasts are executed on each thread after they have exhausted their
+ /// local work queue, before they attempt work-stealing from other threads.
+ /// The goal of that strategy is to run everywhere in a timely manner
+ /// *without* being too disruptive to current work. There may be alternative
+ /// broadcast styles added in the future for more or less aggressive
+ /// injection, if the need arises.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic on one or more threads, exactly one panic
+ /// will be propagated, only after all threads have completed
+ /// (or panicked) their own `op`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// use std::sync::atomic::{AtomicUsize, Ordering};
+ ///
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
+ ///
+ /// // The argument gives context, including the index of each thread.
+ /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
+ /// assert_eq!(v, &[0, 1, 4, 9, 16]);
+ ///
+ /// // The closure can reference the local stack
+ /// let count = AtomicUsize::new(0);
+ /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
+ /// assert_eq!(count.into_inner(), 5);
+ /// }
+ /// ```
+ pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
+ where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::broadcast_in(op, &self.registry) }
+ }
+
+ /// Returns the (current) number of threads in the thread pool.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Note that unless this thread-pool was created with a
+ /// [`ThreadPoolBuilder`] that specifies the number of threads,
+ /// then this number may vary over time in future versions (see [the
+ /// `num_threads()` method for details][snt]).
+ ///
+ /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+ /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
+ #[inline]
+ pub fn current_num_threads(&self) -> usize {
+ self.registry.num_threads()
+ }
+
+ /// If called from a Rayon worker thread in this thread-pool,
+ /// returns the index of that thread; if not called from a Rayon
+ /// thread, or called from a Rayon thread that belongs to a
+ /// different thread-pool, returns `None`.
+ ///
+ /// The index for a given thread will not change over the thread's
+ /// lifetime. However, multiple threads may share the same index if
+ /// they are in distinct thread-pools.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Currently, every thread-pool (including the global
+ /// thread-pool) has a fixed number of threads, but this may
+ /// change in future Rayon versions (see [the `num_threads()` method
+ /// for details][snt]). In that case, the index for a
+ /// thread would not change during its lifetime, but thread
+ /// indices may wind up being reused if threads are terminated and
+ /// restarted.
+ ///
+ /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+ #[inline]
+ pub fn current_thread_index(&self) -> Option<usize> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.index())
+ }
+
+ /// Returns true if the current worker thread currently has "local
+ /// tasks" pending. This can be useful as part of a heuristic for
+ /// deciding whether to spawn a new task or execute code on the
+ /// current thread, particularly in breadth-first
+ /// schedulers. However, keep in mind that this is an inherently
+ /// racy check, as other worker threads may be actively "stealing"
+ /// tasks from our local deque.
+ ///
+ /// **Background:** Rayon's uses a [work-stealing] scheduler. The
+ /// key idea is that each thread has its own [deque] of
+ /// tasks. Whenever a new task is spawned -- whether through
+ /// `join()`, `Scope::spawn()`, or some other means -- that new
+ /// task is pushed onto the thread's *local* deque. Worker threads
+ /// have a preference for executing their own tasks; if however
+ /// they run out of tasks, they will go try to "steal" tasks from
+ /// other threads. This function therefore has an inherent race
+ /// with other active worker threads, which may be removing items
+ /// from the local deque.
+ ///
+ /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
+ /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
+ #[inline]
+ pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
+ let curr = self.registry.current_thread()?;
+ Some(!curr.local_deque_is_empty())
+ }
+
+ /// Execute `oper_a` and `oper_b` in the thread-pool and return
+ /// the results. Equivalent to `self.install(|| join(oper_a,
+ /// oper_b))`.
+ pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
+ where
+ A: FnOnce() -> RA + Send,
+ B: FnOnce() -> RB + Send,
+ RA: Send,
+ RB: Send,
+ {
+ self.install(|| join(oper_a, oper_b))
+ }
+
+ /// Creates a scope that executes within this thread-pool.
+ /// Equivalent to `self.install(|| scope(...))`.
+ ///
+ /// See also: [the `scope()` function][scope].
+ ///
+ /// [scope]: fn.scope.html
+ pub fn scope<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&Scope<'scope>) -> R + Send,
+ R: Send,
+ {
+ self.install(|| scope(op))
+ }
+
+ /// Creates a scope that executes within this thread-pool.
+ /// Spawns from the same thread are prioritized in relative FIFO order.
+ /// Equivalent to `self.install(|| scope_fifo(...))`.
+ ///
+ /// See also: [the `scope_fifo()` function][scope_fifo].
+ ///
+ /// [scope_fifo]: fn.scope_fifo.html
+ pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
+ R: Send,
+ {
+ self.install(|| scope_fifo(op))
+ }
+
+ /// Creates a scope that spawns work into this thread-pool.
+ ///
+ /// See also: [the `in_place_scope()` function][in_place_scope].
+ ///
+ /// [in_place_scope]: fn.in_place_scope.html
+ pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&Scope<'scope>) -> R,
+ {
+ do_in_place_scope(Some(&self.registry), op)
+ }
+
+ /// Creates a scope that spawns work into this thread-pool in FIFO order.
+ ///
+ /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
+ ///
+ /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
+ pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+ {
+ do_in_place_scope_fifo(Some(&self.registry), op)
+ }
+
+ /// Spawns an asynchronous task in this thread-pool. This task will
+ /// run in the implicit, global scope, which means that it may outlast
+ /// the current stack frame -- therefore, it cannot capture any references
+ /// onto the stack (you will likely need a `move` closure).
+ ///
+ /// See also: [the `spawn()` function defined on scopes][spawn].
+ ///
+ /// [spawn]: struct.Scope.html#method.spawn
+ pub fn spawn<OP>(&self, op: OP)
+ where
+ OP: FnOnce() + Send + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { spawn::spawn_in(op, &self.registry) }
+ }
+
+ /// Spawns an asynchronous task in this thread-pool. This task will
+ /// run in the implicit, global scope, which means that it may outlast
+ /// the current stack frame -- therefore, it cannot capture any references
+ /// onto the stack (you will likely need a `move` closure).
+ ///
+ /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
+ ///
+ /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
+ pub fn spawn_fifo<OP>(&self, op: OP)
+ where
+ OP: FnOnce() + Send + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { spawn::spawn_fifo_in(op, &self.registry) }
+ }
+
+ /// Spawns an asynchronous task on every thread in this thread-pool. This task
+ /// will run in the implicit, global scope, which means that it may outlast the
+ /// current stack frame -- therefore, it cannot capture any references onto the
+ /// stack (you will likely need a `move` closure).
+ pub fn spawn_broadcast<OP>(&self, op: OP)
+ where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
+ }
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.registry.terminate();
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool")
+ .field("num_threads", &self.current_num_threads())
+ .field("id", &self.registry.id())
+ .finish()
+ }
+}
+
+/// If called from a Rayon worker thread, returns the index of that
+/// thread within its current pool; if not called from a Rayon thread,
+/// returns `None`.
+///
+/// The index for a given thread will not change over the thread's
+/// lifetime. However, multiple threads may share the same index if
+/// they are in distinct thread-pools.
+///
+/// See also: [the `ThreadPool::current_thread_index()` method].
+///
+/// [m]: struct.ThreadPool.html#method.current_thread_index
+///
+/// # Future compatibility note
+///
+/// Currently, every thread-pool (including the global
+/// thread-pool) has a fixed number of threads, but this may
+/// change in future Rayon versions (see [the `num_threads()` method
+/// for details][snt]). In that case, the index for a
+/// thread would not change during its lifetime, but thread
+/// indices may wind up being reused if threads are terminated and
+/// restarted.
+///
+/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+#[inline]
+pub fn current_thread_index() -> Option<usize> {
+ unsafe {
+ let curr = WorkerThread::current().as_ref()?;
+ Some(curr.index())
+ }
+}
+
+/// If called from a Rayon worker thread, indicates whether that
+/// thread's local deque still has pending tasks. Otherwise, returns
+/// `None`. For more information, see [the
+/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
+///
+/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
+#[inline]
+pub fn current_thread_has_pending_tasks() -> Option<bool> {
+ unsafe {
+ let curr = WorkerThread::current().as_ref()?;
+ Some(!curr.local_deque_is_empty())
+ }
+}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs
new file mode 100644
index 0000000..88b3628
--- /dev/null
+++ b/vendor/rayon-core/src/thread_pool/test.rs
@@ -0,0 +1,418 @@
+#![cfg(test)]
+
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::channel;
+use std::sync::{Arc, Mutex};
+
+use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate() {
+ let thread_pool = ThreadPoolBuilder::new().build().unwrap();
+ thread_pool.install(|| {
+ panic!("Hello, world!");
+ });
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn workers_stop() {
+ let registry;
+
+ {
+ // once we exit this block, thread-pool will be dropped
+ let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ registry = thread_pool.install(|| {
+ // do some work on these threads
+ join_a_lot(22);
+
+ Arc::clone(&thread_pool.registry)
+ });
+ assert_eq!(registry.num_threads(), 22);
+ }
+
+ // once thread-pool is dropped, registry should terminate, which
+ // should lead to worker threads stopping
+ registry.wait_until_stopped();
+}
+
+fn join_a_lot(n: usize) {
+ if n > 0 {
+ join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
+ }
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn sleeper_stop() {
+ use std::{thread, time};
+
+ let registry;
+
+ {
+ // once we exit this block, thread-pool will be dropped
+ let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ registry = Arc::clone(&thread_pool.registry);
+
+ // Give time for at least some of the thread pool to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+ }
+
+ // once thread-pool is dropped, registry should terminate, which
+ // should lead to worker threads stopping
+ registry.wait_until_stopped();
+}
+
+/// Creates a start/exit handler that increments an atomic counter.
+fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
+ let count = Arc::new(AtomicUsize::new(0));
+ (Arc::clone(&count), move |_| {
+ count.fetch_add(1, Ordering::SeqCst);
+ })
+}
+
+/// Wait until a counter is no longer shared, then return its value.
+fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
+ use std::{thread, time};
+
+ for _ in 0..60 {
+ counter = match Arc::try_unwrap(counter) {
+ Ok(counter) => return counter.into_inner(),
+ Err(counter) => {
+ thread::sleep(time::Duration::from_secs(1));
+ counter
+ }
+ };
+ }
+
+ // That's too long!
+ panic!("Counter is still shared!");
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn failed_thread_stack() {
+ // Note: we first tried to force failure with a `usize::MAX` stack, but
+ // macOS and Windows weren't fazed, or at least didn't fail the way we want.
+ // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
+ // 2GB stack, so it might not fail until the second thread.
+ let stack_size = ::std::isize::MAX as usize;
+
+ let (start_count, start_handler) = count_handler();
+ let (exit_count, exit_handler) = count_handler();
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(10)
+ .stack_size(stack_size)
+ .start_handler(start_handler)
+ .exit_handler(exit_handler);
+
+ let pool = builder.build();
+ assert!(pool.is_err(), "thread stack should have failed!");
+
+ // With such a huge stack, 64-bit will probably fail on the first thread;
+ // 32-bit might manage the first 2GB, but certainly fail the second.
+ let start_count = wait_for_counter(start_count);
+ assert!(start_count <= 1);
+ assert_eq!(start_count, wait_for_counter(exit_count));
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn panic_thread_name() {
+ let (start_count, start_handler) = count_handler();
+ let (exit_count, exit_handler) = count_handler();
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(10)
+ .start_handler(start_handler)
+ .exit_handler(exit_handler)
+ .thread_name(|i| {
+ if i >= 5 {
+ panic!();
+ }
+ format!("panic_thread_name#{}", i)
+ });
+
+ let pool = crate::unwind::halt_unwinding(|| builder.build());
+ assert!(pool.is_err(), "thread-name panic should propagate!");
+
+ // Assuming they're created in order, threads 0 through 4 should have
+ // been started already, and then terminated by the panic.
+ assert_eq!(5, wait_for_counter(start_count));
+ assert_eq!(5, wait_for_counter(exit_count));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn self_install() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ // If the inner `install` blocks, then nothing will actually run it!
+ assert!(pool.install(|| pool.install(|| true)));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mutual_install() {
+ let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ let ok = pool1.install(|| {
+ // This creates a dependency from `pool1` -> `pool2`
+ pool2.install(|| {
+ // This creates a dependency from `pool2` -> `pool1`
+ pool1.install(|| {
+ // If they blocked on inter-pool installs, there would be no
+ // threads left to run this!
+ true
+ })
+ })
+ });
+ assert!(ok);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mutual_install_sleepy() {
+ use std::{thread, time};
+
+ let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ let ok = pool1.install(|| {
+ // This creates a dependency from `pool1` -> `pool2`
+ pool2.install(|| {
+ // Give `pool1` time to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+
+ // This creates a dependency from `pool2` -> `pool1`
+ pool1.install(|| {
+ // Give `pool2` time to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+
+ // If they blocked on inter-pool installs, there would be no
+ // threads left to run this!
+ true
+ })
+ })
+ });
+ assert!(ok);
+}
+
+#[test]
+#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn check_thread_pool_new() {
+ let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
+ assert_eq!(pool.current_num_threads(), 22);
+}
+
+macro_rules! test_scope_order {
+ ($scope:ident => $spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ pool.$scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$spawn(move |_| {
+ vec.lock().unwrap().push(i);
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_lifo_order() {
+ let vec = test_scope_order!(scope => spawn);
+ let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_fifo_order() {
+ let vec = test_scope_order!(scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+macro_rules! test_spawn_order {
+ ($spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = &builder.build().unwrap();
+ let (tx, rx) = channel();
+ pool.install(move || {
+ for i in 0..10 {
+ let tx = tx.clone();
+ pool.$spawn(move || {
+ tx.send(i).unwrap();
+ });
+ }
+ });
+ rx.iter().collect::<Vec<i32>>()
+ }};
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_lifo_order() {
+ let vec = test_spawn_order!(spawn);
+ let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_fifo_order() {
+ let vec = test_spawn_order!(spawn_fifo);
+ let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_scopes() {
+ // Create matching scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&Scope<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_fifo_scopes() {
+ // Create matching fifo scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope_fifo(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn_fifo(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn in_place_scope_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = &rx;
+ pool.in_place_scope(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn in_place_scope_fifo_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = &rx;
+ pool.in_place_scope_fifo(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn_fifo(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = channel();
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = channel();
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}