diff options
Diffstat (limited to 'vendor/rayon-core/src/scope')
-rw-r--r-- | vendor/rayon-core/src/scope/mod.rs | 769 | ||||
-rw-r--r-- | vendor/rayon-core/src/scope/test.rs | 619 |
2 files changed, 1388 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs new file mode 100644 index 0000000..b7163d1 --- /dev/null +++ b/vendor/rayon-core/src/scope/mod.rs @@ -0,0 +1,769 @@ +//! Methods for custom fork-join scopes, created by the [`scope()`] +//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. +//! +//! [`scope()`]: fn.scope.html +//! [`in_place_scope()`]: fn.in_place_scope.html +//! [`join()`]: ../join/join.fn.html + +use crate::broadcast::BroadcastContext; +use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; +use crate::latch::{CountLatch, Latch}; +use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; +use crate::unwind; +use std::any::Any; +use std::fmt; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::Arc; + +#[cfg(test)] +mod test; + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// See [`scope()`] for more information. +/// +///[`scope()`]: fn.scope.html +pub struct Scope<'scope> { + base: ScopeBase<'scope>, +} + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// Those spawned from the same thread are prioritized in relative FIFO order. +/// See [`scope_fifo()`] for more information. +/// +///[`scope_fifo()`]: fn.scope_fifo.html +pub struct ScopeFifo<'scope> { + base: ScopeBase<'scope>, + fifos: Vec<JobFifo>, +} + +struct ScopeBase<'scope> { + /// thread registry where `scope()` was executed or where `in_place_scope()` + /// should spawn jobs. + registry: Arc<Registry>, + + /// if some job panicked, the error is stored here; it will be + /// propagated to the one who created the scope + panic: AtomicPtr<Box<dyn Any + Send + 'static>>, + + /// latch to track job counts + job_completed_latch: CountLatch, + + /// You can think of a scope as containing a list of closures to execute, + /// all of which outlive `'scope`. They're not actually required to be + /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because + /// the closures are only *moved* across threads to be executed. + marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>, +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// `scope()` is a more flexible building block compared to `join()`, +/// since a loop can be used to spawn any number of tasks without +/// recursing. However, that flexibility comes at a performance price: +/// tasks spawned using `scope()` must be allocated onto the heap, +/// whereas `join()` can make exclusive use of the stack. **Prefer +/// `join()` (or, even better, parallel iterators) where possible.** +/// +/// # Example +/// +/// The Rayon `join()` function launches two closures and waits for them +/// to stop. One could implement `join()` using a scope like so, although +/// it would be less efficient than the real implementation: +/// +/// ```rust +/// # use rayon_core as rayon; +/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB) +/// where A: FnOnce() -> RA + Send, +/// B: FnOnce() -> RB + Send, +/// RA: Send, +/// RB: Send, +/// { +/// let mut result_a: Option<RA> = None; +/// let mut result_b: Option<RB> = None; +/// rayon::scope(|s| { +/// s.spawn(|_| result_a = Some(oper_a())); +/// s.spawn(|_| result_b = Some(oper_b())); +/// }); +/// (result_a.unwrap(), result_b.unwrap()) +/// } +/// ``` +/// +/// # A note on threading +/// +/// The closure given to `scope()` executes in the Rayon thread-pool, +/// as do those given to `spawn()`. This means that you can't access +/// thread-local variables (well, you can, but they may have +/// unexpected values). +/// +/// # Task execution +/// +/// Task execution potentially starts as soon as `spawn()` is called. +/// The task will end sometime before `scope()` returns. Note that the +/// *closure* given to scope may return much earlier. In general +/// the lifetime of a scope created like `scope(body)` goes something like this: +/// +/// - Scope begins when `scope(body)` is called +/// - Scope body `body()` is invoked +/// - Scope tasks may be spawned +/// - Scope body returns +/// - Scope tasks execute, possibly spawning more tasks +/// - Once all tasks are done, scope ends and `scope()` returns +/// +/// To see how and when tasks are joined, consider this example: +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope(|s| { +/// s.spawn(|s| { // task s.1 +/// s.spawn(|s| { // task s.1.1 +/// rayon::scope(|t| { +/// t.spawn(|_| ()); // task t.1 +/// t.spawn(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (scope `s` created) +/// +-----------------------------------------------+ (task s.2) +/// +-------+ (task s.1) | +/// | | | +/// | +---+ (task s.1.1) | +/// | | | | +/// | | | (scope `t` created) | +/// | | +----------------+ (task t.2) | +/// | | +---+ (task t.1) | | +/// | (mid) | | | | | +/// : | + <-+------------+ (scope `t` ends) | +/// : | | | +/// |<------+---+-----------------------------------+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// The point here is that everything spawned into scope `s` will +/// terminate (at latest) at the same point -- right before the +/// original call to `rayon::scope` returns. This includes new +/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new +/// scope is created (such as `t`), the things spawned into that scope +/// will be joined before that scope returns, which in turn occurs +/// before the creating task (task `s.1.1` in this case) finishes. +/// +/// There is no guaranteed order of execution for spawns in a scope, +/// given that other threads may steal tasks at any time. However, they +/// are generally prioritized in a LIFO order on the thread from which +/// they were spawned. So in this example, absent any stealing, we can +/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other +/// threads always steal from the other end of the deque, like FIFO +/// order. The idea is that "recent" tasks are most likely to be fresh +/// in the local CPU's cache, while other threads can steal older +/// "stale" tasks. For an alternate approach, consider +/// [`scope_fifo()`] instead. +/// +/// [`scope_fifo()`]: fn.scope_fifo.html +/// +/// # Accessing stack data +/// +/// In general, spawned tasks may access stack data in place that +/// outlives the scope itself. Other data must be fully owned by the +/// spawned task. +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // We can access `ok` because outlives the scope `s`. +/// println!("ok: {:?}", ok); +/// +/// // If we just try to use `bad` here, the closure will borrow `bad` +/// // (because we are just printing it out, and that only requires a +/// // borrow), which will result in a compilation error. Read on +/// // for options. +/// // println!("bad: {:?}", bad); +/// }); +/// }); +/// ``` +/// +/// As the comments example above suggest, to reference `bad` we must +/// take ownership of it. One way to do this is to detach the closure +/// from the surrounding stack frame, using the `move` keyword. This +/// will cause it to take ownership of *all* the variables it touches, +/// in this case including both `ok` *and* `bad`: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); +/// println!("bad: {:?}", bad); +/// }); +/// +/// // That closure is fine, but now we can't use `ok` anywhere else, +/// // since it is owned by the previous task: +/// // s.spawn(|_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// While this works, it could be a problem if we want to use `ok` elsewhere. +/// There are two choices. We can keep the closure as a `move` closure, but +/// instead of referencing the variable `ok`, we create a shadowed variable that +/// is a borrow of `ok` and capture *that*: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// let ok: &Vec<i32> = &ok; // shadow the original `ok` +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); // captures the shadowed version +/// println!("bad: {:?}", bad); +/// }); +/// +/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references +/// // can be shared freely. Note that we need a `move` closure here though, +/// // because otherwise we'd be trying to borrow the shadowed `ok`, +/// // and that doesn't outlive `scope`. +/// s.spawn(move |_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// Another option is not to use the `move` keyword but instead to take ownership +/// of individual variables: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // Transfer ownership of `bad` into a local variable (also named `bad`). +/// // This will force the closure to take ownership of `bad` from the environment. +/// let bad = bad; +/// println!("ok: {:?}", ok); // `ok` is only borrowed. +/// println!("bad: {:?}", bad); // refers to our local variable, above. +/// }); +/// +/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// }); +/// ``` +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = Scope::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// # Task execution +/// +/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a +/// difference in the order of execution. Consider a similar example: +/// +/// [`scope()`]: fn.scope.html +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope_fifo(|s| { +/// s.spawn_fifo(|s| { // task s.1 +/// s.spawn_fifo(|s| { // task s.1.1 +/// rayon::scope_fifo(|t| { +/// t.spawn_fifo(|_| ()); // task t.1 +/// t.spawn_fifo(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn_fifo(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (FIFO scope `s` created) +/// +--------------------+ (task s.1) +/// +-------+ (task s.2) | +/// | | +---+ (task s.1.1) +/// | | | | +/// | | | | (FIFO scope `t` created) +/// | | | +----------------+ (task t.1) +/// | | | +---+ (task t.2) | +/// | (mid) | | | | | +/// : | | + <-+------------+ (scope `t` ends) +/// : | | | +/// |<------+------------+---+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on +/// the thread from which they were spawned, as opposed to `scope()`'s +/// LIFO. So in this example, we can expect `s.1` to execute before +/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in +/// FIFO order, as usual. Overall, this has roughly the same order as +/// the now-deprecated [`breadth_first`] option, except the effect is +/// isolated to a particular scope. If spawns are intermingled from any +/// combination of `scope()` and `scope_fifo()`, or from different +/// threads, their order is only specified with respect to spawns in the +/// same scope and thread. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope_fifo()` or +/// in any of the spawned jobs, that panic will be propagated and the +/// call to `scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it +/// will execute, even if the spawning task should later panic. +/// `scope_fifo()` returns once all spawned jobs have completed, and any +/// panics are propagated at that point. +pub fn scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// This is just like `scope()` except the closure runs on the same thread +/// that calls `in_place_scope()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + do_in_place_scope(None, op) +} + +pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = Scope::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// This is just like `scope_fifo()` except the closure runs on the same thread +/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope_fifo()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + do_in_place_scope_fifo(None, op) +} + +pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = ScopeFifo::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +impl<'scope> Scope<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + Scope { base } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # Returns + /// + /// Nothing. The spawned closures cannot pass back values to the + /// caller directly, though they can write to local variables on + /// the stack (if those variables outlive the scope) or + /// communicate through shared channels. + /// + /// (The intention is to eventually integrate with Rust futures to + /// support spawns of functions that compute a value.) + /// + /// # Examples + /// + /// ```rust + /// # use rayon_core as rayon; + /// let mut value_a = None; + /// let mut value_b = None; + /// let mut value_c = None; + /// rayon::scope(|s| { + /// s.spawn(|s1| { + /// // ^ this is the same scope as `s`; this handle `s1` + /// // is intended for use by the spawned task, + /// // since scope handles cannot cross thread boundaries. + /// + /// value_a = Some(22); + /// + /// // the scope `s` will not end until all these tasks are done + /// s1.spawn(|_| { + /// value_b = Some(44); + /// }); + /// }); + /// + /// s.spawn(|_| { + /// value_c = Some(66); + /// }); + /// }); + /// assert_eq!(value_a, Some(22)); + /// assert_eq!(value_b, Some(44)); + /// assert_eq!(value_c, Some(66)); + /// ``` + /// + /// # See also + /// + /// The [`scope` function] has more extensive documentation about + /// task spawning. + /// + /// [`scope` function]: fn.scope.html + pub fn spawn<BODY>(&self, body: BODY) + where + BODY: FnOnce(&Scope<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an in-place scope. + self.base.registry.inject_or_push(job_ref); + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast<BODY>(&self, body: BODY) + where + BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + ScopeBase::execute_job(&scope.base, func) + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeFifo<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + let num_threads = base.registry.num_threads(); + let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); + ScopeFifo { base, fifos } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # See also + /// + /// This method is akin to [`Scope::spawn()`], but with a FIFO + /// priority. The [`scope_fifo` function] has more details about + /// this distinction. + /// + /// [`Scope::spawn()`]: struct.Scope.html#method.spawn + /// [`scope_fifo` function]: fn.scope_fifo.html + pub fn spawn_fifo<BODY>(&self, body: BODY) + where + BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // If we're in the pool, use our scope's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match self.base.registry.current_thread() { + Some(worker) => { + let fifo = &self.fifos[worker.index()]; + // SAFETY: this job will execute before the scope ends. + unsafe { worker.push(fifo.push(job_ref)) }; + } + None => self.base.registry.inject(job_ref), + } + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast<BODY>(&self, body: BODY) + where + BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + ScopeBase::execute_job(&scope.base, func) + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeBase<'scope> { + /// Creates the base of a new scope for the given registry + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let registry = registry.unwrap_or_else(|| match owner { + Some(owner) => owner.registry(), + None => global_registry(), + }); + + ScopeBase { + registry: Arc::clone(registry), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(owner), + marker: PhantomData, + } + } + + fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef + where + FUNC: FnOnce() + Send + 'scope, + { + unsafe { + self.job_completed_latch.increment(); + job.into_job_ref() + } + } + + fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>) + where + FUNC: Fn() + Send + Sync + 'scope, + { + let n_threads = self.registry.num_threads(); + let job_refs = (0..n_threads).map(|_| unsafe { + self.job_completed_latch.increment(); + ArcJob::as_job_ref(&job) + }); + + self.registry.inject_broadcast(job_refs); + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R + where + FUNC: FnOnce() -> R, + { + let result = unsafe { Self::execute_job_closure(self, func) }; + self.job_completed_latch.wait(owner); + self.maybe_propagate_panic(); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) + where + FUNC: FnOnce(), + { + let _: Option<()> = Self::execute_job_closure(this, func); + } + + /// Executes `func` as a job in scope. Adjusts the "job completed" + /// counters and also catches any panic and stores it into + /// `scope`. + unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> + where + FUNC: FnOnce() -> R, + { + let result = match unwind::halt_unwinding(func) { + Ok(r) => Some(r), + Err(err) => { + (*this).job_panicked(err); + None + } + }; + Latch::set(&(*this).job_completed_latch); + result + } + + fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { + // capture the first error we see, free the rest + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr + let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // ownership now transferred into self.panic + } else { + // another panic raced in ahead of us, so drop ours + let _: Box<Box<_>> = ManuallyDrop::into_inner(err); + } + } + } + + fn maybe_propagate_panic(&self) { + // propagate panic, if any occurred; at this point, all + // outstanding jobs have completed, so we can use a relaxed + // ordering: + let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); + if !panic.is_null() { + let value = unsafe { Box::from_raw(panic) }; + unwind::resume_unwinding(*value); + } + } +} + +impl<'scope> fmt::Debug for Scope<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Scope") + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +impl<'scope> fmt::Debug for ScopeFifo<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ScopeFifo") + .field("num_fifos", &self.fifos.len()) + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. +/// +/// Unsafe code is still required to dereference the pointer, but that's fine in +/// scope jobs that are guaranteed to execute before the scope ends. +struct ScopePtr<T>(*const T); + +// SAFETY: !Send for raw pointers is not for safety, just as a lint +unsafe impl<T: Sync> Send for ScopePtr<T> {} + +// SAFETY: !Sync for raw pointers is not for safety, just as a lint +unsafe impl<T: Sync> Sync for ScopePtr<T> {} + +impl<T> ScopePtr<T> { + // Helper to avoid disjoint captures of `scope_ptr.0` + unsafe fn as_ref(&self) -> &T { + &*self.0 + } +} diff --git a/vendor/rayon-core/src/scope/test.rs b/vendor/rayon-core/src/scope/test.rs new file mode 100644 index 0000000..ad8c4af --- /dev/null +++ b/vendor/rayon-core/src/scope/test.rs @@ -0,0 +1,619 @@ +use crate::unwind; +use crate::ThreadPoolBuilder; +use crate::{scope, scope_fifo, Scope, ScopeFifo}; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; +use std::cmp; +use std::iter::once; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Barrier, Mutex}; +use std::vec; + +#[test] +fn scope_empty() { + scope(|_| {}); +} + +#[test] +fn scope_result() { + let x = scope(|_| 22); + assert_eq!(x, 22); +} + +#[test] +fn scope_two() { + let counter = &AtomicUsize::new(0); + scope(|s| { + s.spawn(move |_| { + counter.fetch_add(1, Ordering::SeqCst); + }); + s.spawn(move |_| { + counter.fetch_add(10, Ordering::SeqCst); + }); + }); + + let v = counter.load(Ordering::SeqCst); + assert_eq!(v, 11); +} + +#[test] +fn scope_divide_and_conquer() { + let counter_p = &AtomicUsize::new(0); + scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024))); + + let counter_s = &AtomicUsize::new(0); + divide_and_conquer_seq(counter_s, 1024); + + let p = counter_p.load(Ordering::SeqCst); + let s = counter_s.load(Ordering::SeqCst); + assert_eq!(p, s); +} + +fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) { + if size > 1 { + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) { + if size > 1 { + divide_and_conquer_seq(counter, size / 2); + divide_and_conquer_seq(counter, size / 2); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +struct Tree<T: Send> { + value: T, + children: Vec<Tree<T>>, +} + +impl<T: Send> Tree<T> { + fn iter(&self) -> vec::IntoIter<&T> { + once(&self.value) + .chain(self.children.iter().flat_map(Tree::iter)) + .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow + .into_iter() + } + + fn update<OP>(&mut self, op: OP) + where + OP: Fn(&mut T) + Sync, + T: Send, + { + scope(|s| self.update_in_scope(&op, s)); + } + + fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>) + where + OP: Fn(&mut T) + Sync, + { + let Tree { + ref mut value, + ref mut children, + } = *self; + scope.spawn(move |scope| { + for child in children { + scope.spawn(move |scope| child.update_in_scope(op, scope)); + } + }); + + op(value); + } +} + +fn random_tree(depth: usize) -> Tree<u32> { + assert!(depth > 0); + let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); + (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); + let mut rng = XorShiftRng::from_seed(seed); + random_tree1(depth, &mut rng) +} + +fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> { + let children = if depth == 0 { + vec![] + } else { + (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level + .map(|_| random_tree1(depth - 1, rng)) + .collect() + }; + + Tree { + value: rng.gen_range(0..1_000_000), + children, + } +} + +#[test] +fn update_tree() { + let mut tree: Tree<u32> = random_tree(10); + let values: Vec<u32> = tree.iter().cloned().collect(); + tree.update(|v| *v += 1); + let new_values: Vec<u32> = tree.iter().cloned().collect(); + assert_eq!(values.len(), new_values.len()); + for (&i, &j) in values.iter().zip(&new_values) { + assert_eq!(i + 1, j); + } +} + +/// Check that if you have a chain of scoped tasks where T0 spawns T1 +/// spawns T2 and so forth down to Tn, the stack space should not grow +/// linearly with N. We test this by some unsafe hackery and +/// permitting an approx 10% change with a 10x input change. +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn linear_stack_growth() { + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let mut max_diff = Mutex::new(0); + let bottom_of_stack = 0; + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5)); + let diff_when_5 = *max_diff.get_mut().unwrap() as f64; + + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500)); + let diff_when_500 = *max_diff.get_mut().unwrap() as f64; + + let ratio = diff_when_5 / diff_when_500; + assert!( + ratio > 0.9 && ratio < 1.1, + "stack usage ratio out of bounds: {}", + ratio + ); + }); +} + +fn the_final_countdown<'scope>( + s: &Scope<'scope>, + bottom_of_stack: &'scope i32, + max: &'scope Mutex<usize>, + n: usize, +) { + let top_of_stack = 0; + let p = bottom_of_stack as *const i32 as usize; + let q = &top_of_stack as *const i32 as usize; + let diff = if p > q { p - q } else { q - p }; + + let mut data = max.lock().unwrap(); + *data = cmp::max(diff, *data); + + if n > 0 { + s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); + } +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_scope() { + scope(|_| panic!("Hello, world!")); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_spawn() { + scope(|s| s.spawn(|_| panic!("Hello, world!"))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_spawn() { + scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_scope_spawn() { + scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_1() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); // job A + s.spawn(|_| x = true); // job B, should still execute even though A panics + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_2() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // job B, should still execute even though A panics + s.spawn(|_| panic!("Hello, world!")); // job A + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_3() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // spawned job should still execute despite later panic + panic!("Hello, world!"); + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_4() { + let mut x = false; + match unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); + x = true; + }); + }) { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic in spawn tainted scope"), + } +} + +macro_rules! test_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |scope| { + for j in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_order!(scope => spawn); + let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_order!(scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_nested_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$outer_spawn(move |_| { + $inner_scope(|scope| { + for j in 0..10 { + scope.$inner_spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_nested_order!(scope => spawn, scope => spawn); + let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected: Vec<i32> = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_push { + ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{ + $scope.$spawn(move |_| $vec.lock().unwrap().push($i)); + }}; +} + +/// Test spawns pushing a series of numbers, interleaved +/// such that negative values are using an inner scope. +macro_rules! test_mixed_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|outer_scope| { + let vec = &vec; + spawn_push!(outer_scope.$outer_spawn, vec, 0); + $inner_scope(|inner_scope| { + spawn_push!(inner_scope.$inner_spawn, vec, -1); + spawn_push!(outer_scope.$outer_spawn, vec, 1); + spawn_push!(inner_scope.$inner_spawn, vec, -2); + spawn_push!(outer_scope.$outer_spawn, vec, 2); + spawn_push!(inner_scope.$inner_spawn, vec, -3); + }); + spawn_push!(outer_scope.$outer_spawn, vec, 3); + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_lifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope => spawn); + let expected = vec![-3, 2, -2, 1, -1, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_fifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected = vec![-1, 0, -2, 1, -3, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_lifo_fifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected = vec![-1, 2, -2, 1, -3, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected = vec![-3, 0, -2, 1, -1, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +fn static_scope() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope(|s: &Scope<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn static_scope_fifo() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope_fifo(|s: &ScopeFifo<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn_fifo(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn mixed_lifetime_scope() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope(move |s: &Scope<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn mixed_lifetime_scope_fifo() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope_fifo(move |s: &ScopeFifo<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn_fifo(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn scope_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_fifo_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope_fifo(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_spawn_broadcast_nested() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|s, _| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * n * (n - 1) / 2); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_barrier() { + let barrier = Barrier::new(8); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.in_place_scope(|s| { + s.spawn_broadcast(|_, _| { + barrier.wait(); + }); + barrier.wait(); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_panic_one() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_panic_many() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} |