aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/scope
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/scope')
-rw-r--r--vendor/rayon-core/src/scope/mod.rs769
-rw-r--r--vendor/rayon-core/src/scope/test.rs619
2 files changed, 1388 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
new file mode 100644
index 0000000..b7163d1
--- /dev/null
+++ b/vendor/rayon-core/src/scope/mod.rs
@@ -0,0 +1,769 @@
+//! Methods for custom fork-join scopes, created by the [`scope()`]
+//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
+//!
+//! [`scope()`]: fn.scope.html
+//! [`in_place_scope()`]: fn.in_place_scope.html
+//! [`join()`]: ../join/join.fn.html
+
+use crate::broadcast::BroadcastContext;
+use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
+use crate::latch::{CountLatch, Latch};
+use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
+use crate::unwind;
+use std::any::Any;
+use std::fmt;
+use std::marker::PhantomData;
+use std::mem::ManuallyDrop;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+use std::sync::Arc;
+
+#[cfg(test)]
+mod test;
+
+/// Represents a fork-join scope which can be used to spawn any number of tasks.
+/// See [`scope()`] for more information.
+///
+///[`scope()`]: fn.scope.html
+pub struct Scope<'scope> {
+ base: ScopeBase<'scope>,
+}
+
+/// Represents a fork-join scope which can be used to spawn any number of tasks.
+/// Those spawned from the same thread are prioritized in relative FIFO order.
+/// See [`scope_fifo()`] for more information.
+///
+///[`scope_fifo()`]: fn.scope_fifo.html
+pub struct ScopeFifo<'scope> {
+ base: ScopeBase<'scope>,
+ fifos: Vec<JobFifo>,
+}
+
+struct ScopeBase<'scope> {
+ /// thread registry where `scope()` was executed or where `in_place_scope()`
+ /// should spawn jobs.
+ registry: Arc<Registry>,
+
+ /// if some job panicked, the error is stored here; it will be
+ /// propagated to the one who created the scope
+ panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
+
+ /// latch to track job counts
+ job_completed_latch: CountLatch,
+
+ /// You can think of a scope as containing a list of closures to execute,
+ /// all of which outlive `'scope`. They're not actually required to be
+ /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
+ /// the closures are only *moved* across threads to be executed.
+ marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
+}
+
+/// Creates a "fork-join" scope `s` and invokes the closure with a
+/// reference to `s`. This closure can then spawn asynchronous tasks
+/// into `s`. Those tasks may run asynchronously with respect to the
+/// closure; they may themselves spawn additional tasks into `s`. When
+/// the closure returns, it will block until all tasks that have been
+/// spawned into `s` complete.
+///
+/// `scope()` is a more flexible building block compared to `join()`,
+/// since a loop can be used to spawn any number of tasks without
+/// recursing. However, that flexibility comes at a performance price:
+/// tasks spawned using `scope()` must be allocated onto the heap,
+/// whereas `join()` can make exclusive use of the stack. **Prefer
+/// `join()` (or, even better, parallel iterators) where possible.**
+///
+/// # Example
+///
+/// The Rayon `join()` function launches two closures and waits for them
+/// to stop. One could implement `join()` using a scope like so, although
+/// it would be less efficient than the real implementation:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
+/// where A: FnOnce() -> RA + Send,
+/// B: FnOnce() -> RB + Send,
+/// RA: Send,
+/// RB: Send,
+/// {
+/// let mut result_a: Option<RA> = None;
+/// let mut result_b: Option<RB> = None;
+/// rayon::scope(|s| {
+/// s.spawn(|_| result_a = Some(oper_a()));
+/// s.spawn(|_| result_b = Some(oper_b()));
+/// });
+/// (result_a.unwrap(), result_b.unwrap())
+/// }
+/// ```
+///
+/// # A note on threading
+///
+/// The closure given to `scope()` executes in the Rayon thread-pool,
+/// as do those given to `spawn()`. This means that you can't access
+/// thread-local variables (well, you can, but they may have
+/// unexpected values).
+///
+/// # Task execution
+///
+/// Task execution potentially starts as soon as `spawn()` is called.
+/// The task will end sometime before `scope()` returns. Note that the
+/// *closure* given to scope may return much earlier. In general
+/// the lifetime of a scope created like `scope(body)` goes something like this:
+///
+/// - Scope begins when `scope(body)` is called
+/// - Scope body `body()` is invoked
+/// - Scope tasks may be spawned
+/// - Scope body returns
+/// - Scope tasks execute, possibly spawning more tasks
+/// - Once all tasks are done, scope ends and `scope()` returns
+///
+/// To see how and when tasks are joined, consider this example:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// // point start
+/// rayon::scope(|s| {
+/// s.spawn(|s| { // task s.1
+/// s.spawn(|s| { // task s.1.1
+/// rayon::scope(|t| {
+/// t.spawn(|_| ()); // task t.1
+/// t.spawn(|_| ()); // task t.2
+/// });
+/// });
+/// });
+/// s.spawn(|s| { // task s.2
+/// });
+/// // point mid
+/// });
+/// // point end
+/// ```
+///
+/// The various tasks that are run will execute roughly like so:
+///
+/// ```notrust
+/// | (start)
+/// |
+/// | (scope `s` created)
+/// +-----------------------------------------------+ (task s.2)
+/// +-------+ (task s.1) |
+/// | | |
+/// | +---+ (task s.1.1) |
+/// | | | |
+/// | | | (scope `t` created) |
+/// | | +----------------+ (task t.2) |
+/// | | +---+ (task t.1) | |
+/// | (mid) | | | | |
+/// : | + <-+------------+ (scope `t` ends) |
+/// : | | |
+/// |<------+---+-----------------------------------+ (scope `s` ends)
+/// |
+/// | (end)
+/// ```
+///
+/// The point here is that everything spawned into scope `s` will
+/// terminate (at latest) at the same point -- right before the
+/// original call to `rayon::scope` returns. This includes new
+/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
+/// scope is created (such as `t`), the things spawned into that scope
+/// will be joined before that scope returns, which in turn occurs
+/// before the creating task (task `s.1.1` in this case) finishes.
+///
+/// There is no guaranteed order of execution for spawns in a scope,
+/// given that other threads may steal tasks at any time. However, they
+/// are generally prioritized in a LIFO order on the thread from which
+/// they were spawned. So in this example, absent any stealing, we can
+/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
+/// threads always steal from the other end of the deque, like FIFO
+/// order. The idea is that "recent" tasks are most likely to be fresh
+/// in the local CPU's cache, while other threads can steal older
+/// "stale" tasks. For an alternate approach, consider
+/// [`scope_fifo()`] instead.
+///
+/// [`scope_fifo()`]: fn.scope_fifo.html
+///
+/// # Accessing stack data
+///
+/// In general, spawned tasks may access stack data in place that
+/// outlives the scope itself. Other data must be fully owned by the
+/// spawned task.
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(|_| {
+/// // We can access `ok` because outlives the scope `s`.
+/// println!("ok: {:?}", ok);
+///
+/// // If we just try to use `bad` here, the closure will borrow `bad`
+/// // (because we are just printing it out, and that only requires a
+/// // borrow), which will result in a compilation error. Read on
+/// // for options.
+/// // println!("bad: {:?}", bad);
+/// });
+/// });
+/// ```
+///
+/// As the comments example above suggest, to reference `bad` we must
+/// take ownership of it. One way to do this is to detach the closure
+/// from the surrounding stack frame, using the `move` keyword. This
+/// will cause it to take ownership of *all* the variables it touches,
+/// in this case including both `ok` *and* `bad`:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(move |_| {
+/// println!("ok: {:?}", ok);
+/// println!("bad: {:?}", bad);
+/// });
+///
+/// // That closure is fine, but now we can't use `ok` anywhere else,
+/// // since it is owned by the previous task:
+/// // s.spawn(|_| println!("ok: {:?}", ok));
+/// });
+/// ```
+///
+/// While this works, it could be a problem if we want to use `ok` elsewhere.
+/// There are two choices. We can keep the closure as a `move` closure, but
+/// instead of referencing the variable `ok`, we create a shadowed variable that
+/// is a borrow of `ok` and capture *that*:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
+/// s.spawn(move |_| {
+/// println!("ok: {:?}", ok); // captures the shadowed version
+/// println!("bad: {:?}", bad);
+/// });
+///
+/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
+/// // can be shared freely. Note that we need a `move` closure here though,
+/// // because otherwise we'd be trying to borrow the shadowed `ok`,
+/// // and that doesn't outlive `scope`.
+/// s.spawn(move |_| println!("ok: {:?}", ok));
+/// });
+/// ```
+///
+/// Another option is not to use the `move` keyword but instead to take ownership
+/// of individual variables:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(|_| {
+/// // Transfer ownership of `bad` into a local variable (also named `bad`).
+/// // This will force the closure to take ownership of `bad` from the environment.
+/// let bad = bad;
+/// println!("ok: {:?}", ok); // `ok` is only borrowed.
+/// println!("bad: {:?}", bad); // refers to our local variable, above.
+/// });
+///
+/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
+/// });
+/// ```
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `scope()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `scope()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn()`, it will
+/// execute, even if the spawning task should later panic. `scope()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn scope<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R + Send,
+ R: Send,
+{
+ in_worker(|owner_thread, _| {
+ let scope = Scope::<'scope>::new(Some(owner_thread), None);
+ scope.base.complete(Some(owner_thread), || op(&scope))
+ })
+}
+
+/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
+/// closure with a reference to `s`. This closure can then spawn
+/// asynchronous tasks into `s`. Those tasks may run asynchronously with
+/// respect to the closure; they may themselves spawn additional tasks
+/// into `s`. When the closure returns, it will block until all tasks
+/// that have been spawned into `s` complete.
+///
+/// # Task execution
+///
+/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
+/// difference in the order of execution. Consider a similar example:
+///
+/// [`scope()`]: fn.scope.html
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// // point start
+/// rayon::scope_fifo(|s| {
+/// s.spawn_fifo(|s| { // task s.1
+/// s.spawn_fifo(|s| { // task s.1.1
+/// rayon::scope_fifo(|t| {
+/// t.spawn_fifo(|_| ()); // task t.1
+/// t.spawn_fifo(|_| ()); // task t.2
+/// });
+/// });
+/// });
+/// s.spawn_fifo(|s| { // task s.2
+/// });
+/// // point mid
+/// });
+/// // point end
+/// ```
+///
+/// The various tasks that are run will execute roughly like so:
+///
+/// ```notrust
+/// | (start)
+/// |
+/// | (FIFO scope `s` created)
+/// +--------------------+ (task s.1)
+/// +-------+ (task s.2) |
+/// | | +---+ (task s.1.1)
+/// | | | |
+/// | | | | (FIFO scope `t` created)
+/// | | | +----------------+ (task t.1)
+/// | | | +---+ (task t.2) |
+/// | (mid) | | | | |
+/// : | | + <-+------------+ (scope `t` ends)
+/// : | | |
+/// |<------+------------+---+ (scope `s` ends)
+/// |
+/// | (end)
+/// ```
+///
+/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
+/// the thread from which they were spawned, as opposed to `scope()`'s
+/// LIFO. So in this example, we can expect `s.1` to execute before
+/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
+/// FIFO order, as usual. Overall, this has roughly the same order as
+/// the now-deprecated [`breadth_first`] option, except the effect is
+/// isolated to a particular scope. If spawns are intermingled from any
+/// combination of `scope()` and `scope_fifo()`, or from different
+/// threads, their order is only specified with respect to spawns in the
+/// same scope and thread.
+///
+/// For more details on this design, see Rayon [RFC #1].
+///
+/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
+/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `scope_fifo()` or
+/// in any of the spawned jobs, that panic will be propagated and the
+/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
+/// will execute, even if the spawning task should later panic.
+/// `scope_fifo()` returns once all spawned jobs have completed, and any
+/// panics are propagated at that point.
+pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
+ R: Send,
+{
+ in_worker(|owner_thread, _| {
+ let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
+ scope.base.complete(Some(owner_thread), || op(&scope))
+ })
+}
+
+/// Creates a "fork-join" scope `s` and invokes the closure with a
+/// reference to `s`. This closure can then spawn asynchronous tasks
+/// into `s`. Those tasks may run asynchronously with respect to the
+/// closure; they may themselves spawn additional tasks into `s`. When
+/// the closure returns, it will block until all tasks that have been
+/// spawned into `s` complete.
+///
+/// This is just like `scope()` except the closure runs on the same thread
+/// that calls `in_place_scope()`. Only work that it spawns runs in the
+/// thread pool.
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `in_place_scope()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn()`, it will
+/// execute, even if the spawning task should later panic. `in_place_scope()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R,
+{
+ do_in_place_scope(None, op)
+}
+
+pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R,
+{
+ let thread = unsafe { WorkerThread::current().as_ref() };
+ let scope = Scope::<'scope>::new(thread, registry);
+ scope.base.complete(thread, || op(&scope))
+}
+
+/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
+/// closure with a reference to `s`. This closure can then spawn
+/// asynchronous tasks into `s`. Those tasks may run asynchronously with
+/// respect to the closure; they may themselves spawn additional tasks
+/// into `s`. When the closure returns, it will block until all tasks
+/// that have been spawned into `s` complete.
+///
+/// This is just like `scope_fifo()` except the closure runs on the same thread
+/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
+/// thread pool.
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
+/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+{
+ do_in_place_scope_fifo(None, op)
+}
+
+pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+{
+ let thread = unsafe { WorkerThread::current().as_ref() };
+ let scope = ScopeFifo::<'scope>::new(thread, registry);
+ scope.base.complete(thread, || op(&scope))
+}
+
+impl<'scope> Scope<'scope> {
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let base = ScopeBase::new(owner, registry);
+ Scope { base }
+ }
+
+ /// Spawns a job into the fork-join scope `self`. This job will
+ /// execute sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its
+ /// own reference to the scope `self` as argument. This can be
+ /// used to inject new jobs into `self`.
+ ///
+ /// # Returns
+ ///
+ /// Nothing. The spawned closures cannot pass back values to the
+ /// caller directly, though they can write to local variables on
+ /// the stack (if those variables outlive the scope) or
+ /// communicate through shared channels.
+ ///
+ /// (The intention is to eventually integrate with Rust futures to
+ /// support spawns of functions that compute a value.)
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # use rayon_core as rayon;
+ /// let mut value_a = None;
+ /// let mut value_b = None;
+ /// let mut value_c = None;
+ /// rayon::scope(|s| {
+ /// s.spawn(|s1| {
+ /// // ^ this is the same scope as `s`; this handle `s1`
+ /// // is intended for use by the spawned task,
+ /// // since scope handles cannot cross thread boundaries.
+ ///
+ /// value_a = Some(22);
+ ///
+ /// // the scope `s` will not end until all these tasks are done
+ /// s1.spawn(|_| {
+ /// value_b = Some(44);
+ /// });
+ /// });
+ ///
+ /// s.spawn(|_| {
+ /// value_c = Some(66);
+ /// });
+ /// });
+ /// assert_eq!(value_a, Some(22));
+ /// assert_eq!(value_b, Some(44));
+ /// assert_eq!(value_c, Some(66));
+ /// ```
+ ///
+ /// # See also
+ ///
+ /// The [`scope` function] has more extensive documentation about
+ /// task spawning.
+ ///
+ /// [`scope` function]: fn.scope.html
+ pub fn spawn<BODY>(&self, body: BODY)
+ where
+ BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // Since `Scope` implements `Sync`, we can't be sure that we're still in a
+ // thread of this pool, so we can't just push to the local worker thread.
+ // Also, this might be an in-place scope.
+ self.base.registry.inject_or_push(job_ref);
+ }
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
+ }
+}
+
+impl<'scope> ScopeFifo<'scope> {
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let base = ScopeBase::new(owner, registry);
+ let num_threads = base.registry.num_threads();
+ let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
+ ScopeFifo { base, fifos }
+ }
+
+ /// Spawns a job into the fork-join scope `self`. This job will
+ /// execute sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its
+ /// own reference to the scope `self` as argument. This can be
+ /// used to inject new jobs into `self`.
+ ///
+ /// # See also
+ ///
+ /// This method is akin to [`Scope::spawn()`], but with a FIFO
+ /// priority. The [`scope_fifo` function] has more details about
+ /// this distinction.
+ ///
+ /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
+ /// [`scope_fifo` function]: fn.scope_fifo.html
+ pub fn spawn_fifo<BODY>(&self, body: BODY)
+ where
+ BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // If we're in the pool, use our scope's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match self.base.registry.current_thread() {
+ Some(worker) => {
+ let fifo = &self.fifos[worker.index()];
+ // SAFETY: this job will execute before the scope ends.
+ unsafe { worker.push(fifo.push(job_ref)) };
+ }
+ None => self.base.registry.inject(job_ref),
+ }
+ }
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
+ }
+}
+
+impl<'scope> ScopeBase<'scope> {
+ /// Creates the base of a new scope for the given registry
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let registry = registry.unwrap_or_else(|| match owner {
+ Some(owner) => owner.registry(),
+ None => global_registry(),
+ });
+
+ ScopeBase {
+ registry: Arc::clone(registry),
+ panic: AtomicPtr::new(ptr::null_mut()),
+ job_completed_latch: CountLatch::new(owner),
+ marker: PhantomData,
+ }
+ }
+
+ fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
+ where
+ FUNC: FnOnce() + Send + 'scope,
+ {
+ unsafe {
+ self.job_completed_latch.increment();
+ job.into_job_ref()
+ }
+ }
+
+ fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
+ where
+ FUNC: Fn() + Send + Sync + 'scope,
+ {
+ let n_threads = self.registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| unsafe {
+ self.job_completed_latch.increment();
+ ArcJob::as_job_ref(&job)
+ });
+
+ self.registry.inject_broadcast(job_refs);
+ }
+
+ /// Executes `func` as a job, either aborting or executing as
+ /// appropriate.
+ fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
+ where
+ FUNC: FnOnce() -> R,
+ {
+ let result = unsafe { Self::execute_job_closure(self, func) };
+ self.job_completed_latch.wait(owner);
+ self.maybe_propagate_panic();
+ result.unwrap() // only None if `op` panicked, and that would have been propagated
+ }
+
+ /// Executes `func` as a job, either aborting or executing as
+ /// appropriate.
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
+ where
+ FUNC: FnOnce(),
+ {
+ let _: Option<()> = Self::execute_job_closure(this, func);
+ }
+
+ /// Executes `func` as a job in scope. Adjusts the "job completed"
+ /// counters and also catches any panic and stores it into
+ /// `scope`.
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
+ where
+ FUNC: FnOnce() -> R,
+ {
+ let result = match unwind::halt_unwinding(func) {
+ Ok(r) => Some(r),
+ Err(err) => {
+ (*this).job_panicked(err);
+ None
+ }
+ };
+ Latch::set(&(*this).job_completed_latch);
+ result
+ }
+
+ fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
+ // capture the first error we see, free the rest
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
+ }
+ }
+
+ fn maybe_propagate_panic(&self) {
+ // propagate panic, if any occurred; at this point, all
+ // outstanding jobs have completed, so we can use a relaxed
+ // ordering:
+ let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
+ if !panic.is_null() {
+ let value = unsafe { Box::from_raw(panic) };
+ unwind::resume_unwinding(*value);
+ }
+ }
+}
+
+impl<'scope> fmt::Debug for Scope<'scope> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Scope")
+ .field("pool_id", &self.base.registry.id())
+ .field("panic", &self.base.panic)
+ .field("job_completed_latch", &self.base.job_completed_latch)
+ .finish()
+ }
+}
+
+impl<'scope> fmt::Debug for ScopeFifo<'scope> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ScopeFifo")
+ .field("num_fifos", &self.fifos.len())
+ .field("pool_id", &self.base.registry.id())
+ .field("panic", &self.base.panic)
+ .field("job_completed_latch", &self.base.job_completed_latch)
+ .finish()
+ }
+}
+
+/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
+///
+/// Unsafe code is still required to dereference the pointer, but that's fine in
+/// scope jobs that are guaranteed to execute before the scope ends.
+struct ScopePtr<T>(*const T);
+
+// SAFETY: !Send for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Send for ScopePtr<T> {}
+
+// SAFETY: !Sync for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Sync for ScopePtr<T> {}
+
+impl<T> ScopePtr<T> {
+ // Helper to avoid disjoint captures of `scope_ptr.0`
+ unsafe fn as_ref(&self) -> &T {
+ &*self.0
+ }
+}
diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs
new file mode 100644
index 0000000..ad8c4af
--- /dev/null
+++ b/vendor/rayon-core/src/scope/test.rs
@@ -0,0 +1,619 @@
+use crate::unwind;
+use crate::ThreadPoolBuilder;
+use crate::{scope, scope_fifo, Scope, ScopeFifo};
+use rand::{Rng, SeedableRng};
+use rand_xorshift::XorShiftRng;
+use std::cmp;
+use std::iter::once;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Barrier, Mutex};
+use std::vec;
+
+#[test]
+fn scope_empty() {
+ scope(|_| {});
+}
+
+#[test]
+fn scope_result() {
+ let x = scope(|_| 22);
+ assert_eq!(x, 22);
+}
+
+#[test]
+fn scope_two() {
+ let counter = &AtomicUsize::new(0);
+ scope(|s| {
+ s.spawn(move |_| {
+ counter.fetch_add(1, Ordering::SeqCst);
+ });
+ s.spawn(move |_| {
+ counter.fetch_add(10, Ordering::SeqCst);
+ });
+ });
+
+ let v = counter.load(Ordering::SeqCst);
+ assert_eq!(v, 11);
+}
+
+#[test]
+fn scope_divide_and_conquer() {
+ let counter_p = &AtomicUsize::new(0);
+ scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
+
+ let counter_s = &AtomicUsize::new(0);
+ divide_and_conquer_seq(counter_s, 1024);
+
+ let p = counter_p.load(Ordering::SeqCst);
+ let s = counter_s.load(Ordering::SeqCst);
+ assert_eq!(p, s);
+}
+
+fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) {
+ if size > 1 {
+ scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
+ scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
+ } else {
+ // count the leaves
+ counter.fetch_add(1, Ordering::SeqCst);
+ }
+}
+
+fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) {
+ if size > 1 {
+ divide_and_conquer_seq(counter, size / 2);
+ divide_and_conquer_seq(counter, size / 2);
+ } else {
+ // count the leaves
+ counter.fetch_add(1, Ordering::SeqCst);
+ }
+}
+
+struct Tree<T: Send> {
+ value: T,
+ children: Vec<Tree<T>>,
+}
+
+impl<T: Send> Tree<T> {
+ fn iter(&self) -> vec::IntoIter<&T> {
+ once(&self.value)
+ .chain(self.children.iter().flat_map(Tree::iter))
+ .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
+ .into_iter()
+ }
+
+ fn update<OP>(&mut self, op: OP)
+ where
+ OP: Fn(&mut T) + Sync,
+ T: Send,
+ {
+ scope(|s| self.update_in_scope(&op, s));
+ }
+
+ fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>)
+ where
+ OP: Fn(&mut T) + Sync,
+ {
+ let Tree {
+ ref mut value,
+ ref mut children,
+ } = *self;
+ scope.spawn(move |scope| {
+ for child in children {
+ scope.spawn(move |scope| child.update_in_scope(op, scope));
+ }
+ });
+
+ op(value);
+ }
+}
+
+fn random_tree(depth: usize) -> Tree<u32> {
+ assert!(depth > 0);
+ let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
+ (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
+ let mut rng = XorShiftRng::from_seed(seed);
+ random_tree1(depth, &mut rng)
+}
+
+fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> {
+ let children = if depth == 0 {
+ vec![]
+ } else {
+ (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level
+ .map(|_| random_tree1(depth - 1, rng))
+ .collect()
+ };
+
+ Tree {
+ value: rng.gen_range(0..1_000_000),
+ children,
+ }
+}
+
+#[test]
+fn update_tree() {
+ let mut tree: Tree<u32> = random_tree(10);
+ let values: Vec<u32> = tree.iter().cloned().collect();
+ tree.update(|v| *v += 1);
+ let new_values: Vec<u32> = tree.iter().cloned().collect();
+ assert_eq!(values.len(), new_values.len());
+ for (&i, &j) in values.iter().zip(&new_values) {
+ assert_eq!(i + 1, j);
+ }
+}
+
+/// Check that if you have a chain of scoped tasks where T0 spawns T1
+/// spawns T2 and so forth down to Tn, the stack space should not grow
+/// linearly with N. We test this by some unsafe hackery and
+/// permitting an approx 10% change with a 10x input change.
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn linear_stack_growth() {
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let mut max_diff = Mutex::new(0);
+ let bottom_of_stack = 0;
+ scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5));
+ let diff_when_5 = *max_diff.get_mut().unwrap() as f64;
+
+ scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500));
+ let diff_when_500 = *max_diff.get_mut().unwrap() as f64;
+
+ let ratio = diff_when_5 / diff_when_500;
+ assert!(
+ ratio > 0.9 && ratio < 1.1,
+ "stack usage ratio out of bounds: {}",
+ ratio
+ );
+ });
+}
+
+fn the_final_countdown<'scope>(
+ s: &Scope<'scope>,
+ bottom_of_stack: &'scope i32,
+ max: &'scope Mutex<usize>,
+ n: usize,
+) {
+ let top_of_stack = 0;
+ let p = bottom_of_stack as *const i32 as usize;
+ let q = &top_of_stack as *const i32 as usize;
+ let diff = if p > q { p - q } else { q - p };
+
+ let mut data = max.lock().unwrap();
+ *data = cmp::max(diff, *data);
+
+ if n > 0 {
+ s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1));
+ }
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_scope() {
+ scope(|_| panic!("Hello, world!"));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_spawn() {
+ scope(|s| s.spawn(|_| panic!("Hello, world!")));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_nested_spawn() {
+ scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!")))));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_nested_scope_spawn() {
+ scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!")))));
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn panic_propagate_still_execute_1() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| panic!("Hello, world!")); // job A
+ s.spawn(|_| x = true); // job B, should still execute even though A panics
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "job b failed to execute"),
+ }
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn panic_propagate_still_execute_2() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| x = true); // job B, should still execute even though A panics
+ s.spawn(|_| panic!("Hello, world!")); // job A
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "job b failed to execute"),
+ }
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn panic_propagate_still_execute_3() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| x = true); // spawned job should still execute despite later panic
+ panic!("Hello, world!");
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "panic after spawn, spawn failed to execute"),
+ }
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn panic_propagate_still_execute_4() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| panic!("Hello, world!"));
+ x = true;
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "panic in spawn tainted scope"),
+ }
+}
+
+macro_rules! test_order {
+ ($scope:ident => $spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$spawn(move |scope| {
+ for j in 0..10 {
+ scope.$spawn(move |_| {
+ vec.lock().unwrap().push(i * 10 + j);
+ });
+ }
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn lifo_order() {
+ // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ let vec = test_order!(scope => spawn);
+ let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn fifo_order() {
+ // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ let vec = test_order!(scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+macro_rules! test_nested_order {
+ ($outer_scope:ident => $outer_spawn:ident,
+ $inner_scope:ident => $inner_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $outer_scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$outer_spawn(move |_| {
+ $inner_scope(|scope| {
+ for j in 0..10 {
+ scope.$inner_spawn(move |_| {
+ vec.lock().unwrap().push(i * 10 + j);
+ });
+ }
+ });
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_lifo_order() {
+ // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ let vec = test_nested_order!(scope => spawn, scope => spawn);
+ let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_fifo_order() {
+ // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_lifo_fifo_order() {
+ // LIFO on the outside, FIFO on the inside
+ let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..10)
+ .rev()
+ .flat_map(|i| (0..10).map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn nested_fifo_lifo_order() {
+ // FIFO on the outside, LIFO on the inside
+ let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
+ let expected: Vec<i32> = (0..10)
+ .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+macro_rules! spawn_push {
+ ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{
+ $scope.$spawn(move |_| $vec.lock().unwrap().push($i));
+ }};
+}
+
+/// Test spawns pushing a series of numbers, interleaved
+/// such that negative values are using an inner scope.
+macro_rules! test_mixed_order {
+ ($outer_scope:ident => $outer_spawn:ident,
+ $inner_scope:ident => $inner_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $outer_scope(|outer_scope| {
+ let vec = &vec;
+ spawn_push!(outer_scope.$outer_spawn, vec, 0);
+ $inner_scope(|inner_scope| {
+ spawn_push!(inner_scope.$inner_spawn, vec, -1);
+ spawn_push!(outer_scope.$outer_spawn, vec, 1);
+ spawn_push!(inner_scope.$inner_spawn, vec, -2);
+ spawn_push!(outer_scope.$outer_spawn, vec, 2);
+ spawn_push!(inner_scope.$inner_spawn, vec, -3);
+ });
+ spawn_push!(outer_scope.$outer_spawn, vec, 3);
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mixed_lifo_order() {
+ // NB: the end of the inner scope makes us execute some of the outer scope
+ // before they've all been spawned, so they're not perfectly LIFO.
+ let vec = test_mixed_order!(scope => spawn, scope => spawn);
+ let expected = vec![-3, 2, -2, 1, -1, 3, 0];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mixed_fifo_order() {
+ let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
+ let expected = vec![-1, 0, -2, 1, -3, 2, 3];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mixed_lifo_fifo_order() {
+ // NB: the end of the inner scope makes us execute some of the outer scope
+ // before they've all been spawned, so they're not perfectly LIFO.
+ let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo);
+ let expected = vec![-1, 2, -2, 1, -3, 3, 0];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn mixed_fifo_lifo_order() {
+ let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
+ let expected = vec![-3, 0, -2, 1, -1, 2, 3];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn static_scope() {
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
+
+ let mut range = 0..100;
+ let sum = range.clone().sum();
+ let iter = &mut range;
+
+ COUNTER.store(0, Ordering::Relaxed);
+ scope(|s: &Scope<'static>| {
+ // While we're allowed the locally borrowed iterator,
+ // the spawns must be static.
+ for i in iter {
+ s.spawn(move |_| {
+ COUNTER.fetch_add(i, Ordering::Relaxed);
+ });
+ }
+ });
+
+ assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
+}
+
+#[test]
+fn static_scope_fifo() {
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
+
+ let mut range = 0..100;
+ let sum = range.clone().sum();
+ let iter = &mut range;
+
+ COUNTER.store(0, Ordering::Relaxed);
+ scope_fifo(|s: &ScopeFifo<'static>| {
+ // While we're allowed the locally borrowed iterator,
+ // the spawns must be static.
+ for i in iter {
+ s.spawn_fifo(move |_| {
+ COUNTER.fetch_add(i, Ordering::Relaxed);
+ });
+ }
+ });
+
+ assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
+}
+
+#[test]
+fn mixed_lifetime_scope() {
+ fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
+ scope(move |s: &Scope<'counter>| {
+ // We can borrow 'slice here, but the spawns can only borrow 'counter.
+ for &c in counters {
+ s.spawn(move |_| {
+ c.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ }
+
+ let counter = AtomicUsize::new(0);
+ increment(&[&counter; 100]);
+ assert_eq!(counter.into_inner(), 100);
+}
+
+#[test]
+fn mixed_lifetime_scope_fifo() {
+ fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
+ scope_fifo(move |s: &ScopeFifo<'counter>| {
+ // We can borrow 'slice here, but the spawns can only borrow 'counter.
+ for &c in counters {
+ s.spawn_fifo(move |_| {
+ c.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ }
+
+ let counter = AtomicUsize::new(0);
+ increment(&[&counter; 100]);
+ assert_eq!(counter.into_inner(), 100);
+}
+
+#[test]
+fn scope_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_fifo_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope_fifo(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_spawn_broadcast_nested() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|s, _| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * n * (n - 1) / 2);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_barrier() {
+ let barrier = Barrier::new(8);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.in_place_scope(|s| {
+ s.spawn_broadcast(|_, _| {
+ barrier.wait();
+ });
+ barrier.wait();
+ });
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}