diff options
Diffstat (limited to 'vendor/rayon-core/src/thread_pool')
-rw-r--r-- | vendor/rayon-core/src/thread_pool/mod.rs | 471 | ||||
-rw-r--r-- | vendor/rayon-core/src/thread_pool/test.rs | 418 |
2 files changed, 0 insertions, 889 deletions
diff --git a/vendor/rayon-core/src/thread_pool/mod.rs b/vendor/rayon-core/src/thread_pool/mod.rs deleted file mode 100644 index c37826e..0000000 --- a/vendor/rayon-core/src/thread_pool/mod.rs +++ /dev/null @@ -1,471 +0,0 @@ -//! Contains support for user-managed thread pools, represented by the -//! the [`ThreadPool`] type (see that struct for details). -//! -//! [`ThreadPool`]: struct.ThreadPool.html - -use crate::broadcast::{self, BroadcastContext}; -use crate::join; -use crate::registry::{Registry, ThreadSpawn, WorkerThread}; -use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; -use crate::spawn; -use crate::{scope, Scope}; -use crate::{scope_fifo, ScopeFifo}; -use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; -use std::error::Error; -use std::fmt; -use std::sync::Arc; - -mod test; - -/// Represents a user created [thread-pool]. -/// -/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads -/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then -/// execute functions explicitly within this [`ThreadPool`] using -/// [`ThreadPool::install()`]. By contrast, top level rayon functions -/// (like `join()`) will execute implicitly within the current thread-pool. -/// -/// -/// ## Creating a ThreadPool -/// -/// ```rust -/// # use rayon_core as rayon; -/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); -/// ``` -/// -/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s -/// threads. In addition, any other rayon operations called inside of `install()` will also -/// execute in the context of the `ThreadPool`. -/// -/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, -/// they will complete executing any remaining work that you have spawned, and automatically -/// terminate. -/// -/// -/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool -/// [`ThreadPool`]: struct.ThreadPool.html -/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new -/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html -/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build -/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install -pub struct ThreadPool { - registry: Arc<Registry>, -} - -impl ThreadPool { - #[deprecated(note = "Use `ThreadPoolBuilder::build`")] - #[allow(deprecated)] - /// Deprecated in favor of `ThreadPoolBuilder::build`. - pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { - Self::build(configuration.into_builder()).map_err(Box::from) - } - - pub(super) fn build<S>( - builder: ThreadPoolBuilder<S>, - ) -> Result<ThreadPool, ThreadPoolBuildError> - where - S: ThreadSpawn, - { - let registry = Registry::new(builder)?; - Ok(ThreadPool { registry }) - } - - /// Executes `op` within the threadpool. Any attempts to use - /// `join`, `scope`, or parallel iterators will then operate - /// within that threadpool. - /// - /// # Warning: thread-local data - /// - /// Because `op` is executing within the Rayon thread-pool, - /// thread-local data from the current thread will not be - /// accessible. - /// - /// # Panics - /// - /// If `op` should panic, that panic will be propagated. - /// - /// ## Using `install()` - /// - /// ```rust - /// # use rayon_core as rayon; - /// fn main() { - /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); - /// let n = pool.install(|| fib(20)); - /// println!("{}", n); - /// } - /// - /// fn fib(n: usize) -> usize { - /// if n == 0 || n == 1 { - /// return n; - /// } - /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` - /// return a + b; - /// } - /// ``` - pub fn install<OP, R>(&self, op: OP) -> R - where - OP: FnOnce() -> R + Send, - R: Send, - { - self.registry.in_worker(|_, _| op()) - } - - /// Executes `op` within every thread in the threadpool. Any attempts to use - /// `join`, `scope`, or parallel iterators will then operate within that - /// threadpool. - /// - /// Broadcasts are executed on each thread after they have exhausted their - /// local work queue, before they attempt work-stealing from other threads. - /// The goal of that strategy is to run everywhere in a timely manner - /// *without* being too disruptive to current work. There may be alternative - /// broadcast styles added in the future for more or less aggressive - /// injection, if the need arises. - /// - /// # Warning: thread-local data - /// - /// Because `op` is executing within the Rayon thread-pool, - /// thread-local data from the current thread will not be - /// accessible. - /// - /// # Panics - /// - /// If `op` should panic on one or more threads, exactly one panic - /// will be propagated, only after all threads have completed - /// (or panicked) their own `op`. - /// - /// # Examples - /// - /// ``` - /// # use rayon_core as rayon; - /// use std::sync::atomic::{AtomicUsize, Ordering}; - /// - /// fn main() { - /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); - /// - /// // The argument gives context, including the index of each thread. - /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); - /// assert_eq!(v, &[0, 1, 4, 9, 16]); - /// - /// // The closure can reference the local stack - /// let count = AtomicUsize::new(0); - /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); - /// assert_eq!(count.into_inner(), 5); - /// } - /// ``` - pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> - where - OP: Fn(BroadcastContext<'_>) -> R + Sync, - R: Send, - { - // We assert that `self.registry` has not terminated. - unsafe { broadcast::broadcast_in(op, &self.registry) } - } - - /// Returns the (current) number of threads in the thread pool. - /// - /// # Future compatibility note - /// - /// Note that unless this thread-pool was created with a - /// [`ThreadPoolBuilder`] that specifies the number of threads, - /// then this number may vary over time in future versions (see [the - /// `num_threads()` method for details][snt]). - /// - /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads - /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html - #[inline] - pub fn current_num_threads(&self) -> usize { - self.registry.num_threads() - } - - /// If called from a Rayon worker thread in this thread-pool, - /// returns the index of that thread; if not called from a Rayon - /// thread, or called from a Rayon thread that belongs to a - /// different thread-pool, returns `None`. - /// - /// The index for a given thread will not change over the thread's - /// lifetime. However, multiple threads may share the same index if - /// they are in distinct thread-pools. - /// - /// # Future compatibility note - /// - /// Currently, every thread-pool (including the global - /// thread-pool) has a fixed number of threads, but this may - /// change in future Rayon versions (see [the `num_threads()` method - /// for details][snt]). In that case, the index for a - /// thread would not change during its lifetime, but thread - /// indices may wind up being reused if threads are terminated and - /// restarted. - /// - /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads - #[inline] - pub fn current_thread_index(&self) -> Option<usize> { - let curr = self.registry.current_thread()?; - Some(curr.index()) - } - - /// Returns true if the current worker thread currently has "local - /// tasks" pending. This can be useful as part of a heuristic for - /// deciding whether to spawn a new task or execute code on the - /// current thread, particularly in breadth-first - /// schedulers. However, keep in mind that this is an inherently - /// racy check, as other worker threads may be actively "stealing" - /// tasks from our local deque. - /// - /// **Background:** Rayon's uses a [work-stealing] scheduler. The - /// key idea is that each thread has its own [deque] of - /// tasks. Whenever a new task is spawned -- whether through - /// `join()`, `Scope::spawn()`, or some other means -- that new - /// task is pushed onto the thread's *local* deque. Worker threads - /// have a preference for executing their own tasks; if however - /// they run out of tasks, they will go try to "steal" tasks from - /// other threads. This function therefore has an inherent race - /// with other active worker threads, which may be removing items - /// from the local deque. - /// - /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing - /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue - #[inline] - pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { - let curr = self.registry.current_thread()?; - Some(!curr.local_deque_is_empty()) - } - - /// Execute `oper_a` and `oper_b` in the thread-pool and return - /// the results. Equivalent to `self.install(|| join(oper_a, - /// oper_b))`. - pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) - where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, - { - self.install(|| join(oper_a, oper_b)) - } - - /// Creates a scope that executes within this thread-pool. - /// Equivalent to `self.install(|| scope(...))`. - /// - /// See also: [the `scope()` function][scope]. - /// - /// [scope]: fn.scope.html - pub fn scope<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&Scope<'scope>) -> R + Send, - R: Send, - { - self.install(|| scope(op)) - } - - /// Creates a scope that executes within this thread-pool. - /// Spawns from the same thread are prioritized in relative FIFO order. - /// Equivalent to `self.install(|| scope_fifo(...))`. - /// - /// See also: [the `scope_fifo()` function][scope_fifo]. - /// - /// [scope_fifo]: fn.scope_fifo.html - pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, - R: Send, - { - self.install(|| scope_fifo(op)) - } - - /// Creates a scope that spawns work into this thread-pool. - /// - /// See also: [the `in_place_scope()` function][in_place_scope]. - /// - /// [in_place_scope]: fn.in_place_scope.html - pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&Scope<'scope>) -> R, - { - do_in_place_scope(Some(&self.registry), op) - } - - /// Creates a scope that spawns work into this thread-pool in FIFO order. - /// - /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. - /// - /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html - pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R - where - OP: FnOnce(&ScopeFifo<'scope>) -> R, - { - do_in_place_scope_fifo(Some(&self.registry), op) - } - - /// Spawns an asynchronous task in this thread-pool. This task will - /// run in the implicit, global scope, which means that it may outlast - /// the current stack frame -- therefore, it cannot capture any references - /// onto the stack (you will likely need a `move` closure). - /// - /// See also: [the `spawn()` function defined on scopes][spawn]. - /// - /// [spawn]: struct.Scope.html#method.spawn - pub fn spawn<OP>(&self, op: OP) - where - OP: FnOnce() + Send + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { spawn::spawn_in(op, &self.registry) } - } - - /// Spawns an asynchronous task in this thread-pool. This task will - /// run in the implicit, global scope, which means that it may outlast - /// the current stack frame -- therefore, it cannot capture any references - /// onto the stack (you will likely need a `move` closure). - /// - /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. - /// - /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo - pub fn spawn_fifo<OP>(&self, op: OP) - where - OP: FnOnce() + Send + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { spawn::spawn_fifo_in(op, &self.registry) } - } - - /// Spawns an asynchronous task on every thread in this thread-pool. This task - /// will run in the implicit, global scope, which means that it may outlast the - /// current stack frame -- therefore, it cannot capture any references onto the - /// stack (you will likely need a `move` closure). - pub fn spawn_broadcast<OP>(&self, op: OP) - where - OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, - { - // We assert that `self.registry` has not terminated. - unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } - } - - /// Cooperatively yields execution to Rayon. - /// - /// This is similar to the general [`yield_now()`], but only if the current - /// thread is part of *this* thread pool. - /// - /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if - /// nothing was available, or `None` if the current thread is not part this pool. - pub fn yield_now(&self) -> Option<Yield> { - let curr = self.registry.current_thread()?; - Some(curr.yield_now()) - } - - /// Cooperatively yields execution to local Rayon work. - /// - /// This is similar to the general [`yield_local()`], but only if the current - /// thread is part of *this* thread pool. - /// - /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if - /// nothing was available, or `None` if the current thread is not part this pool. - pub fn yield_local(&self) -> Option<Yield> { - let curr = self.registry.current_thread()?; - Some(curr.yield_local()) - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - self.registry.terminate(); - } -} - -impl fmt::Debug for ThreadPool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ThreadPool") - .field("num_threads", &self.current_num_threads()) - .field("id", &self.registry.id()) - .finish() - } -} - -/// If called from a Rayon worker thread, returns the index of that -/// thread within its current pool; if not called from a Rayon thread, -/// returns `None`. -/// -/// The index for a given thread will not change over the thread's -/// lifetime. However, multiple threads may share the same index if -/// they are in distinct thread-pools. -/// -/// See also: [the `ThreadPool::current_thread_index()` method]. -/// -/// [m]: struct.ThreadPool.html#method.current_thread_index -/// -/// # Future compatibility note -/// -/// Currently, every thread-pool (including the global -/// thread-pool) has a fixed number of threads, but this may -/// change in future Rayon versions (see [the `num_threads()` method -/// for details][snt]). In that case, the index for a -/// thread would not change during its lifetime, but thread -/// indices may wind up being reused if threads are terminated and -/// restarted. -/// -/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads -#[inline] -pub fn current_thread_index() -> Option<usize> { - unsafe { - let curr = WorkerThread::current().as_ref()?; - Some(curr.index()) - } -} - -/// If called from a Rayon worker thread, indicates whether that -/// thread's local deque still has pending tasks. Otherwise, returns -/// `None`. For more information, see [the -/// `ThreadPool::current_thread_has_pending_tasks()` method][m]. -/// -/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks -#[inline] -pub fn current_thread_has_pending_tasks() -> Option<bool> { - unsafe { - let curr = WorkerThread::current().as_ref()?; - Some(!curr.local_deque_is_empty()) - } -} - -/// Cooperatively yields execution to Rayon. -/// -/// If the current thread is part of a rayon thread pool, this looks for a -/// single unit of pending work in the pool, then executes it. Completion of -/// that work might include nested work or further work stealing. -/// -/// This is similar to [`std::thread::yield_now()`], but does not literally make -/// that call. If you are implementing a polling loop, you may want to also -/// yield to the OS scheduler yourself if no Rayon work was found. -/// -/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if -/// nothing was available, or `None` if this thread is not part of any pool at all. -pub fn yield_now() -> Option<Yield> { - unsafe { - let thread = WorkerThread::current().as_ref()?; - Some(thread.yield_now()) - } -} - -/// Cooperatively yields execution to local Rayon work. -/// -/// If the current thread is part of a rayon thread pool, this looks for a -/// single unit of pending work in this thread's queue, then executes it. -/// Completion of that work might include nested work or further work stealing. -/// -/// This is similar to [`yield_now()`], but does not steal from other threads. -/// -/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if -/// nothing was available, or `None` if this thread is not part of any pool at all. -pub fn yield_local() -> Option<Yield> { - unsafe { - let thread = WorkerThread::current().as_ref()?; - Some(thread.yield_local()) - } -} - -/// Result of [`yield_now()`] or [`yield_local()`]. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Yield { - /// Work was found and executed. - Executed, - /// No available work was found. - Idle, -} diff --git a/vendor/rayon-core/src/thread_pool/test.rs b/vendor/rayon-core/src/thread_pool/test.rs deleted file mode 100644 index 88b3628..0000000 --- a/vendor/rayon-core/src/thread_pool/test.rs +++ /dev/null @@ -1,418 +0,0 @@ -#![cfg(test)] - -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; - -use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; - -#[test] -#[should_panic(expected = "Hello, world!")] -fn panic_propagate() { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - thread_pool.install(|| { - panic!("Hello, world!"); - }); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn workers_stop() { - let registry; - - { - // once we exit this block, thread-pool will be dropped - let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - registry = thread_pool.install(|| { - // do some work on these threads - join_a_lot(22); - - Arc::clone(&thread_pool.registry) - }); - assert_eq!(registry.num_threads(), 22); - } - - // once thread-pool is dropped, registry should terminate, which - // should lead to worker threads stopping - registry.wait_until_stopped(); -} - -fn join_a_lot(n: usize) { - if n > 0 { - join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn sleeper_stop() { - use std::{thread, time}; - - let registry; - - { - // once we exit this block, thread-pool will be dropped - let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); - registry = Arc::clone(&thread_pool.registry); - - // Give time for at least some of the thread pool to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - } - - // once thread-pool is dropped, registry should terminate, which - // should lead to worker threads stopping - registry.wait_until_stopped(); -} - -/// Creates a start/exit handler that increments an atomic counter. -fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) { - let count = Arc::new(AtomicUsize::new(0)); - (Arc::clone(&count), move |_| { - count.fetch_add(1, Ordering::SeqCst); - }) -} - -/// Wait until a counter is no longer shared, then return its value. -fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { - use std::{thread, time}; - - for _ in 0..60 { - counter = match Arc::try_unwrap(counter) { - Ok(counter) => return counter.into_inner(), - Err(counter) => { - thread::sleep(time::Duration::from_secs(1)); - counter - } - }; - } - - // That's too long! - panic!("Counter is still shared!"); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn failed_thread_stack() { - // Note: we first tried to force failure with a `usize::MAX` stack, but - // macOS and Windows weren't fazed, or at least didn't fail the way we want. - // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a - // 2GB stack, so it might not fail until the second thread. - let stack_size = ::std::isize::MAX as usize; - - let (start_count, start_handler) = count_handler(); - let (exit_count, exit_handler) = count_handler(); - let builder = ThreadPoolBuilder::new() - .num_threads(10) - .stack_size(stack_size) - .start_handler(start_handler) - .exit_handler(exit_handler); - - let pool = builder.build(); - assert!(pool.is_err(), "thread stack should have failed!"); - - // With such a huge stack, 64-bit will probably fail on the first thread; - // 32-bit might manage the first 2GB, but certainly fail the second. - let start_count = wait_for_counter(start_count); - assert!(start_count <= 1); - assert_eq!(start_count, wait_for_counter(exit_count)); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore)] -fn panic_thread_name() { - let (start_count, start_handler) = count_handler(); - let (exit_count, exit_handler) = count_handler(); - let builder = ThreadPoolBuilder::new() - .num_threads(10) - .start_handler(start_handler) - .exit_handler(exit_handler) - .thread_name(|i| { - if i >= 5 { - panic!(); - } - format!("panic_thread_name#{}", i) - }); - - let pool = crate::unwind::halt_unwinding(|| builder.build()); - assert!(pool.is_err(), "thread-name panic should propagate!"); - - // Assuming they're created in order, threads 0 through 4 should have - // been started already, and then terminated by the panic. - assert_eq!(5, wait_for_counter(start_count)); - assert_eq!(5, wait_for_counter(exit_count)); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn self_install() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - // If the inner `install` blocks, then nothing will actually run it! - assert!(pool.install(|| pool.install(|| true))); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mutual_install() { - let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - let ok = pool1.install(|| { - // This creates a dependency from `pool1` -> `pool2` - pool2.install(|| { - // This creates a dependency from `pool2` -> `pool1` - pool1.install(|| { - // If they blocked on inter-pool installs, there would be no - // threads left to run this! - true - }) - }) - }); - assert!(ok); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn mutual_install_sleepy() { - use std::{thread, time}; - - let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - - let ok = pool1.install(|| { - // This creates a dependency from `pool1` -> `pool2` - pool2.install(|| { - // Give `pool1` time to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - - // This creates a dependency from `pool2` -> `pool1` - pool1.install(|| { - // Give `pool2` time to fall asleep. - thread::sleep(time::Duration::from_secs(1)); - - // If they blocked on inter-pool installs, there would be no - // threads left to run this! - true - }) - }) - }); - assert!(ok); -} - -#[test] -#[allow(deprecated)] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn check_thread_pool_new() { - let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); - assert_eq!(pool.current_num_threads(), 22); -} - -macro_rules! test_scope_order { - ($scope:ident => $spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = builder.build().unwrap(); - pool.install(|| { - let vec = Mutex::new(vec![]); - pool.$scope(|scope| { - let vec = &vec; - for i in 0..10 { - scope.$spawn(move |_| { - vec.lock().unwrap().push(i); - }); - } - }); - vec.into_inner().unwrap() - }) - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_lifo_order() { - let vec = test_scope_order!(scope => spawn); - let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn scope_fifo_order() { - let vec = test_scope_order!(scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -macro_rules! test_spawn_order { - ($spawn:ident) => {{ - let builder = ThreadPoolBuilder::new().num_threads(1); - let pool = &builder.build().unwrap(); - let (tx, rx) = channel(); - pool.install(move || { - for i in 0..10 { - let tx = tx.clone(); - pool.$spawn(move || { - tx.send(i).unwrap(); - }); - } - }); - rx.iter().collect::<Vec<i32>>() - }}; -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_lifo_order() { - let vec = test_spawn_order!(spawn); - let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn spawn_fifo_order() { - let vec = test_spawn_order!(spawn_fifo); - let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order - assert_eq!(vec, expected); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_scopes() { - // Create matching scopes for every thread pool. - fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) - where - OP: FnOnce(&[&Scope<'scope>]) + Send, - { - if let Some((pool, tail)) = pools.split_first() { - pool.scope(move |s| { - // This move reduces the reference lifetimes by variance to match s, - // but the actual scopes are still tied to the invariant 'scope. - let mut scopes = scopes; - scopes.push(s); - nest(tail, scopes, op) - }) - } else { - (op)(&scopes) - } - } - - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); - - let counter = AtomicUsize::new(0); - nest(&pools, vec![], |scopes| { - for &s in scopes { - s.spawn(|_| { - // Our 'scope lets us borrow the counter in every pool. - counter.fetch_add(1, Ordering::Relaxed); - }); - } - }); - assert_eq!(counter.into_inner(), pools.len()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_fifo_scopes() { - // Create matching fifo scopes for every thread pool. - fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) - where - OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, - { - if let Some((pool, tail)) = pools.split_first() { - pool.scope_fifo(move |s| { - // This move reduces the reference lifetimes by variance to match s, - // but the actual scopes are still tied to the invariant 'scope. - let mut scopes = scopes; - scopes.push(s); - nest(tail, scopes, op) - }) - } else { - (op)(&scopes) - } - } - - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); - - let counter = AtomicUsize::new(0); - nest(&pools, vec![], |scopes| { - for &s in scopes { - s.spawn_fifo(|_| { - // Our 'scope lets us borrow the counter in every pool. - counter.fetch_add(1, Ordering::Relaxed); - }); - } - }); - assert_eq!(counter.into_inner(), pools.len()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn in_place_scope_no_deadlock() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (tx, rx) = channel(); - let rx_ref = ℞ - pool.in_place_scope(move |s| { - // With regular scopes this closure would never run because this scope op - // itself would block the only worker thread. - s.spawn(move |_| { - tx.send(()).unwrap(); - }); - rx_ref.recv().unwrap(); - }); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn in_place_scope_fifo_no_deadlock() { - let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - let (tx, rx) = channel(); - let rx_ref = ℞ - pool.in_place_scope_fifo(move |s| { - // With regular scopes this closure would never run because this scope op - // itself would block the only worker thread. - s.spawn_fifo(move |_| { - tx.send(()).unwrap(); - }); - rx_ref.recv().unwrap(); - }); -} - -#[test] -fn yield_now_to_spawn() { - let (tx, rx) = channel(); - - // Queue a regular spawn. - crate::spawn(move || tx.send(22).unwrap()); - - // The single-threaded fallback mode (for wasm etc.) won't - // get a chance to run the spawn if we never yield to it. - crate::registry::in_worker(move |_, _| { - crate::yield_now(); - }); - - // The spawn **must** have started by now, but we still might have to wait - // for it to finish if a different thread stole it first. - assert_eq!(22, rx.recv().unwrap()); -} - -#[test] -fn yield_local_to_spawn() { - let (tx, rx) = channel(); - - // Queue a regular spawn. - crate::spawn(move || tx.send(22).unwrap()); - - // The single-threaded fallback mode (for wasm etc.) won't - // get a chance to run the spawn if we never yield to it. - crate::registry::in_worker(move |_, _| { - crate::yield_local(); - }); - - // The spawn **must** have started by now, but we still might have to wait - // for it to finish if a different thread stole it first. - assert_eq!(22, rx.recv().unwrap()); -} |