aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/scope/mod.rs
diff options
context:
space:
mode:
authorValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
committerValentin Popov <valentin@popov.link>2024-07-19 15:37:58 +0300
commita990de90fe41456a23e58bd087d2f107d321f3a1 (patch)
tree15afc392522a9e85dc3332235e311b7d39352ea9 /vendor/rayon-core/src/scope/mod.rs
parent3d48cd3f81164bbfc1a755dc1d4a9a02f98c8ddd (diff)
downloadfparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.tar.xz
fparkan-a990de90fe41456a23e58bd087d2f107d321f3a1.zip
Deleted vendor folder
Diffstat (limited to 'vendor/rayon-core/src/scope/mod.rs')
-rw-r--r--vendor/rayon-core/src/scope/mod.rs769
1 files changed, 0 insertions, 769 deletions
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
deleted file mode 100644
index b7163d1..0000000
--- a/vendor/rayon-core/src/scope/mod.rs
+++ /dev/null
@@ -1,769 +0,0 @@
-//! Methods for custom fork-join scopes, created by the [`scope()`]
-//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
-//!
-//! [`scope()`]: fn.scope.html
-//! [`in_place_scope()`]: fn.in_place_scope.html
-//! [`join()`]: ../join/join.fn.html
-
-use crate::broadcast::BroadcastContext;
-use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
-use crate::latch::{CountLatch, Latch};
-use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
-use crate::unwind;
-use std::any::Any;
-use std::fmt;
-use std::marker::PhantomData;
-use std::mem::ManuallyDrop;
-use std::ptr;
-use std::sync::atomic::{AtomicPtr, Ordering};
-use std::sync::Arc;
-
-#[cfg(test)]
-mod test;
-
-/// Represents a fork-join scope which can be used to spawn any number of tasks.
-/// See [`scope()`] for more information.
-///
-///[`scope()`]: fn.scope.html
-pub struct Scope<'scope> {
- base: ScopeBase<'scope>,
-}
-
-/// Represents a fork-join scope which can be used to spawn any number of tasks.
-/// Those spawned from the same thread are prioritized in relative FIFO order.
-/// See [`scope_fifo()`] for more information.
-///
-///[`scope_fifo()`]: fn.scope_fifo.html
-pub struct ScopeFifo<'scope> {
- base: ScopeBase<'scope>,
- fifos: Vec<JobFifo>,
-}
-
-struct ScopeBase<'scope> {
- /// thread registry where `scope()` was executed or where `in_place_scope()`
- /// should spawn jobs.
- registry: Arc<Registry>,
-
- /// if some job panicked, the error is stored here; it will be
- /// propagated to the one who created the scope
- panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
-
- /// latch to track job counts
- job_completed_latch: CountLatch,
-
- /// You can think of a scope as containing a list of closures to execute,
- /// all of which outlive `'scope`. They're not actually required to be
- /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
- /// the closures are only *moved* across threads to be executed.
- marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
-}
-
-/// Creates a "fork-join" scope `s` and invokes the closure with a
-/// reference to `s`. This closure can then spawn asynchronous tasks
-/// into `s`. Those tasks may run asynchronously with respect to the
-/// closure; they may themselves spawn additional tasks into `s`. When
-/// the closure returns, it will block until all tasks that have been
-/// spawned into `s` complete.
-///
-/// `scope()` is a more flexible building block compared to `join()`,
-/// since a loop can be used to spawn any number of tasks without
-/// recursing. However, that flexibility comes at a performance price:
-/// tasks spawned using `scope()` must be allocated onto the heap,
-/// whereas `join()` can make exclusive use of the stack. **Prefer
-/// `join()` (or, even better, parallel iterators) where possible.**
-///
-/// # Example
-///
-/// The Rayon `join()` function launches two closures and waits for them
-/// to stop. One could implement `join()` using a scope like so, although
-/// it would be less efficient than the real implementation:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
-/// where A: FnOnce() -> RA + Send,
-/// B: FnOnce() -> RB + Send,
-/// RA: Send,
-/// RB: Send,
-/// {
-/// let mut result_a: Option<RA> = None;
-/// let mut result_b: Option<RB> = None;
-/// rayon::scope(|s| {
-/// s.spawn(|_| result_a = Some(oper_a()));
-/// s.spawn(|_| result_b = Some(oper_b()));
-/// });
-/// (result_a.unwrap(), result_b.unwrap())
-/// }
-/// ```
-///
-/// # A note on threading
-///
-/// The closure given to `scope()` executes in the Rayon thread-pool,
-/// as do those given to `spawn()`. This means that you can't access
-/// thread-local variables (well, you can, but they may have
-/// unexpected values).
-///
-/// # Task execution
-///
-/// Task execution potentially starts as soon as `spawn()` is called.
-/// The task will end sometime before `scope()` returns. Note that the
-/// *closure* given to scope may return much earlier. In general
-/// the lifetime of a scope created like `scope(body)` goes something like this:
-///
-/// - Scope begins when `scope(body)` is called
-/// - Scope body `body()` is invoked
-/// - Scope tasks may be spawned
-/// - Scope body returns
-/// - Scope tasks execute, possibly spawning more tasks
-/// - Once all tasks are done, scope ends and `scope()` returns
-///
-/// To see how and when tasks are joined, consider this example:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// // point start
-/// rayon::scope(|s| {
-/// s.spawn(|s| { // task s.1
-/// s.spawn(|s| { // task s.1.1
-/// rayon::scope(|t| {
-/// t.spawn(|_| ()); // task t.1
-/// t.spawn(|_| ()); // task t.2
-/// });
-/// });
-/// });
-/// s.spawn(|s| { // task s.2
-/// });
-/// // point mid
-/// });
-/// // point end
-/// ```
-///
-/// The various tasks that are run will execute roughly like so:
-///
-/// ```notrust
-/// | (start)
-/// |
-/// | (scope `s` created)
-/// +-----------------------------------------------+ (task s.2)
-/// +-------+ (task s.1) |
-/// | | |
-/// | +---+ (task s.1.1) |
-/// | | | |
-/// | | | (scope `t` created) |
-/// | | +----------------+ (task t.2) |
-/// | | +---+ (task t.1) | |
-/// | (mid) | | | | |
-/// : | + <-+------------+ (scope `t` ends) |
-/// : | | |
-/// |<------+---+-----------------------------------+ (scope `s` ends)
-/// |
-/// | (end)
-/// ```
-///
-/// The point here is that everything spawned into scope `s` will
-/// terminate (at latest) at the same point -- right before the
-/// original call to `rayon::scope` returns. This includes new
-/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
-/// scope is created (such as `t`), the things spawned into that scope
-/// will be joined before that scope returns, which in turn occurs
-/// before the creating task (task `s.1.1` in this case) finishes.
-///
-/// There is no guaranteed order of execution for spawns in a scope,
-/// given that other threads may steal tasks at any time. However, they
-/// are generally prioritized in a LIFO order on the thread from which
-/// they were spawned. So in this example, absent any stealing, we can
-/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
-/// threads always steal from the other end of the deque, like FIFO
-/// order. The idea is that "recent" tasks are most likely to be fresh
-/// in the local CPU's cache, while other threads can steal older
-/// "stale" tasks. For an alternate approach, consider
-/// [`scope_fifo()`] instead.
-///
-/// [`scope_fifo()`]: fn.scope_fifo.html
-///
-/// # Accessing stack data
-///
-/// In general, spawned tasks may access stack data in place that
-/// outlives the scope itself. Other data must be fully owned by the
-/// spawned task.
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(|_| {
-/// // We can access `ok` because outlives the scope `s`.
-/// println!("ok: {:?}", ok);
-///
-/// // If we just try to use `bad` here, the closure will borrow `bad`
-/// // (because we are just printing it out, and that only requires a
-/// // borrow), which will result in a compilation error. Read on
-/// // for options.
-/// // println!("bad: {:?}", bad);
-/// });
-/// });
-/// ```
-///
-/// As the comments example above suggest, to reference `bad` we must
-/// take ownership of it. One way to do this is to detach the closure
-/// from the surrounding stack frame, using the `move` keyword. This
-/// will cause it to take ownership of *all* the variables it touches,
-/// in this case including both `ok` *and* `bad`:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(move |_| {
-/// println!("ok: {:?}", ok);
-/// println!("bad: {:?}", bad);
-/// });
-///
-/// // That closure is fine, but now we can't use `ok` anywhere else,
-/// // since it is owned by the previous task:
-/// // s.spawn(|_| println!("ok: {:?}", ok));
-/// });
-/// ```
-///
-/// While this works, it could be a problem if we want to use `ok` elsewhere.
-/// There are two choices. We can keep the closure as a `move` closure, but
-/// instead of referencing the variable `ok`, we create a shadowed variable that
-/// is a borrow of `ok` and capture *that*:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
-/// s.spawn(move |_| {
-/// println!("ok: {:?}", ok); // captures the shadowed version
-/// println!("bad: {:?}", bad);
-/// });
-///
-/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
-/// // can be shared freely. Note that we need a `move` closure here though,
-/// // because otherwise we'd be trying to borrow the shadowed `ok`,
-/// // and that doesn't outlive `scope`.
-/// s.spawn(move |_| println!("ok: {:?}", ok));
-/// });
-/// ```
-///
-/// Another option is not to use the `move` keyword but instead to take ownership
-/// of individual variables:
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let ok: Vec<i32> = vec![1, 2, 3];
-/// rayon::scope(|s| {
-/// let bad: Vec<i32> = vec![4, 5, 6];
-/// s.spawn(|_| {
-/// // Transfer ownership of `bad` into a local variable (also named `bad`).
-/// // This will force the closure to take ownership of `bad` from the environment.
-/// let bad = bad;
-/// println!("ok: {:?}", ok); // `ok` is only borrowed.
-/// println!("bad: {:?}", bad); // refers to our local variable, above.
-/// });
-///
-/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
-/// });
-/// ```
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `scope()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `scope()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn()`, it will
-/// execute, even if the spawning task should later panic. `scope()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn scope<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R + Send,
- R: Send,
-{
- in_worker(|owner_thread, _| {
- let scope = Scope::<'scope>::new(Some(owner_thread), None);
- scope.base.complete(Some(owner_thread), || op(&scope))
- })
-}
-
-/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
-/// closure with a reference to `s`. This closure can then spawn
-/// asynchronous tasks into `s`. Those tasks may run asynchronously with
-/// respect to the closure; they may themselves spawn additional tasks
-/// into `s`. When the closure returns, it will block until all tasks
-/// that have been spawned into `s` complete.
-///
-/// # Task execution
-///
-/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
-/// difference in the order of execution. Consider a similar example:
-///
-/// [`scope()`]: fn.scope.html
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// // point start
-/// rayon::scope_fifo(|s| {
-/// s.spawn_fifo(|s| { // task s.1
-/// s.spawn_fifo(|s| { // task s.1.1
-/// rayon::scope_fifo(|t| {
-/// t.spawn_fifo(|_| ()); // task t.1
-/// t.spawn_fifo(|_| ()); // task t.2
-/// });
-/// });
-/// });
-/// s.spawn_fifo(|s| { // task s.2
-/// });
-/// // point mid
-/// });
-/// // point end
-/// ```
-///
-/// The various tasks that are run will execute roughly like so:
-///
-/// ```notrust
-/// | (start)
-/// |
-/// | (FIFO scope `s` created)
-/// +--------------------+ (task s.1)
-/// +-------+ (task s.2) |
-/// | | +---+ (task s.1.1)
-/// | | | |
-/// | | | | (FIFO scope `t` created)
-/// | | | +----------------+ (task t.1)
-/// | | | +---+ (task t.2) |
-/// | (mid) | | | | |
-/// : | | + <-+------------+ (scope `t` ends)
-/// : | | |
-/// |<------+------------+---+ (scope `s` ends)
-/// |
-/// | (end)
-/// ```
-///
-/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
-/// the thread from which they were spawned, as opposed to `scope()`'s
-/// LIFO. So in this example, we can expect `s.1` to execute before
-/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
-/// FIFO order, as usual. Overall, this has roughly the same order as
-/// the now-deprecated [`breadth_first`] option, except the effect is
-/// isolated to a particular scope. If spawns are intermingled from any
-/// combination of `scope()` and `scope_fifo()`, or from different
-/// threads, their order is only specified with respect to spawns in the
-/// same scope and thread.
-///
-/// For more details on this design, see Rayon [RFC #1].
-///
-/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
-/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `scope_fifo()` or
-/// in any of the spawned jobs, that panic will be propagated and the
-/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
-/// will execute, even if the spawning task should later panic.
-/// `scope_fifo()` returns once all spawned jobs have completed, and any
-/// panics are propagated at that point.
-pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
- R: Send,
-{
- in_worker(|owner_thread, _| {
- let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
- scope.base.complete(Some(owner_thread), || op(&scope))
- })
-}
-
-/// Creates a "fork-join" scope `s` and invokes the closure with a
-/// reference to `s`. This closure can then spawn asynchronous tasks
-/// into `s`. Those tasks may run asynchronously with respect to the
-/// closure; they may themselves spawn additional tasks into `s`. When
-/// the closure returns, it will block until all tasks that have been
-/// spawned into `s` complete.
-///
-/// This is just like `scope()` except the closure runs on the same thread
-/// that calls `in_place_scope()`. Only work that it spawns runs in the
-/// thread pool.
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `in_place_scope()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn()`, it will
-/// execute, even if the spawning task should later panic. `in_place_scope()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R,
-{
- do_in_place_scope(None, op)
-}
-
-pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
-where
- OP: FnOnce(&Scope<'scope>) -> R,
-{
- let thread = unsafe { WorkerThread::current().as_ref() };
- let scope = Scope::<'scope>::new(thread, registry);
- scope.base.complete(thread, || op(&scope))
-}
-
-/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
-/// closure with a reference to `s`. This closure can then spawn
-/// asynchronous tasks into `s`. Those tasks may run asynchronously with
-/// respect to the closure; they may themselves spawn additional tasks
-/// into `s`. When the closure returns, it will block until all tasks
-/// that have been spawned into `s` complete.
-///
-/// This is just like `scope_fifo()` except the closure runs on the same thread
-/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
-/// thread pool.
-///
-/// # Panics
-///
-/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
-/// any of the spawned jobs, that panic will be propagated and the
-/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
-/// non-deterministic which of their panic values will propagate.
-/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
-/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
-/// returns once all spawned jobs have completed, and any panics are
-/// propagated at that point.
-pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R,
-{
- do_in_place_scope_fifo(None, op)
-}
-
-pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
-where
- OP: FnOnce(&ScopeFifo<'scope>) -> R,
-{
- let thread = unsafe { WorkerThread::current().as_ref() };
- let scope = ScopeFifo::<'scope>::new(thread, registry);
- scope.base.complete(thread, || op(&scope))
-}
-
-impl<'scope> Scope<'scope> {
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let base = ScopeBase::new(owner, registry);
- Scope { base }
- }
-
- /// Spawns a job into the fork-join scope `self`. This job will
- /// execute sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its
- /// own reference to the scope `self` as argument. This can be
- /// used to inject new jobs into `self`.
- ///
- /// # Returns
- ///
- /// Nothing. The spawned closures cannot pass back values to the
- /// caller directly, though they can write to local variables on
- /// the stack (if those variables outlive the scope) or
- /// communicate through shared channels.
- ///
- /// (The intention is to eventually integrate with Rust futures to
- /// support spawns of functions that compute a value.)
- ///
- /// # Examples
- ///
- /// ```rust
- /// # use rayon_core as rayon;
- /// let mut value_a = None;
- /// let mut value_b = None;
- /// let mut value_c = None;
- /// rayon::scope(|s| {
- /// s.spawn(|s1| {
- /// // ^ this is the same scope as `s`; this handle `s1`
- /// // is intended for use by the spawned task,
- /// // since scope handles cannot cross thread boundaries.
- ///
- /// value_a = Some(22);
- ///
- /// // the scope `s` will not end until all these tasks are done
- /// s1.spawn(|_| {
- /// value_b = Some(44);
- /// });
- /// });
- ///
- /// s.spawn(|_| {
- /// value_c = Some(66);
- /// });
- /// });
- /// assert_eq!(value_a, Some(22));
- /// assert_eq!(value_b, Some(44));
- /// assert_eq!(value_c, Some(66));
- /// ```
- ///
- /// # See also
- ///
- /// The [`scope` function] has more extensive documentation about
- /// task spawning.
- ///
- /// [`scope` function]: fn.scope.html
- pub fn spawn<BODY>(&self, body: BODY)
- where
- BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- ScopeBase::execute_job(&scope.base, move || body(scope))
- });
- let job_ref = self.base.heap_job_ref(job);
-
- // Since `Scope` implements `Sync`, we can't be sure that we're still in a
- // thread of this pool, so we can't just push to the local worker thread.
- // Also, this might be an in-place scope.
- self.base.registry.inject_or_push(job_ref);
- }
-
- /// Spawns a job into every thread of the fork-join scope `self`. This job will
- /// execute on each thread sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its own reference
- /// to the scope `self` as argument, as well as a `BroadcastContext`.
- pub fn spawn_broadcast<BODY>(&self, body: BODY)
- where
- BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- let body = &body;
- let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- ScopeBase::execute_job(&scope.base, func)
- });
- self.base.inject_broadcast(job)
- }
-}
-
-impl<'scope> ScopeFifo<'scope> {
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let base = ScopeBase::new(owner, registry);
- let num_threads = base.registry.num_threads();
- let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
- ScopeFifo { base, fifos }
- }
-
- /// Spawns a job into the fork-join scope `self`. This job will
- /// execute sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its
- /// own reference to the scope `self` as argument. This can be
- /// used to inject new jobs into `self`.
- ///
- /// # See also
- ///
- /// This method is akin to [`Scope::spawn()`], but with a FIFO
- /// priority. The [`scope_fifo` function] has more details about
- /// this distinction.
- ///
- /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
- /// [`scope_fifo` function]: fn.scope_fifo.html
- pub fn spawn_fifo<BODY>(&self, body: BODY)
- where
- BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- ScopeBase::execute_job(&scope.base, move || body(scope))
- });
- let job_ref = self.base.heap_job_ref(job);
-
- // If we're in the pool, use our scope's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match self.base.registry.current_thread() {
- Some(worker) => {
- let fifo = &self.fifos[worker.index()];
- // SAFETY: this job will execute before the scope ends.
- unsafe { worker.push(fifo.push(job_ref)) };
- }
- None => self.base.registry.inject(job_ref),
- }
- }
-
- /// Spawns a job into every thread of the fork-join scope `self`. This job will
- /// execute on each thread sometime before the fork-join scope completes. The
- /// job is specified as a closure, and this closure receives its own reference
- /// to the scope `self` as argument, as well as a `BroadcastContext`.
- pub fn spawn_broadcast<BODY>(&self, body: BODY)
- where
- BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
- {
- let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || unsafe {
- // SAFETY: this job will execute before the scope ends.
- let scope = scope_ptr.as_ref();
- let body = &body;
- let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- ScopeBase::execute_job(&scope.base, func)
- });
- self.base.inject_broadcast(job)
- }
-}
-
-impl<'scope> ScopeBase<'scope> {
- /// Creates the base of a new scope for the given registry
- fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
- let registry = registry.unwrap_or_else(|| match owner {
- Some(owner) => owner.registry(),
- None => global_registry(),
- });
-
- ScopeBase {
- registry: Arc::clone(registry),
- panic: AtomicPtr::new(ptr::null_mut()),
- job_completed_latch: CountLatch::new(owner),
- marker: PhantomData,
- }
- }
-
- fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
- where
- FUNC: FnOnce() + Send + 'scope,
- {
- unsafe {
- self.job_completed_latch.increment();
- job.into_job_ref()
- }
- }
-
- fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
- where
- FUNC: Fn() + Send + Sync + 'scope,
- {
- let n_threads = self.registry.num_threads();
- let job_refs = (0..n_threads).map(|_| unsafe {
- self.job_completed_latch.increment();
- ArcJob::as_job_ref(&job)
- });
-
- self.registry.inject_broadcast(job_refs);
- }
-
- /// Executes `func` as a job, either aborting or executing as
- /// appropriate.
- fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
- where
- FUNC: FnOnce() -> R,
- {
- let result = unsafe { Self::execute_job_closure(self, func) };
- self.job_completed_latch.wait(owner);
- self.maybe_propagate_panic();
- result.unwrap() // only None if `op` panicked, and that would have been propagated
- }
-
- /// Executes `func` as a job, either aborting or executing as
- /// appropriate.
- unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
- where
- FUNC: FnOnce(),
- {
- let _: Option<()> = Self::execute_job_closure(this, func);
- }
-
- /// Executes `func` as a job in scope. Adjusts the "job completed"
- /// counters and also catches any panic and stores it into
- /// `scope`.
- unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
- where
- FUNC: FnOnce() -> R,
- {
- let result = match unwind::halt_unwinding(func) {
- Ok(r) => Some(r),
- Err(err) => {
- (*this).job_panicked(err);
- None
- }
- };
- Latch::set(&(*this).job_completed_latch);
- result
- }
-
- fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
- // capture the first error we see, free the rest
- if self.panic.load(Ordering::Relaxed).is_null() {
- let nil = ptr::null_mut();
- let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
- let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
- if self
- .panic
- .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- // ownership now transferred into self.panic
- } else {
- // another panic raced in ahead of us, so drop ours
- let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
- }
- }
- }
-
- fn maybe_propagate_panic(&self) {
- // propagate panic, if any occurred; at this point, all
- // outstanding jobs have completed, so we can use a relaxed
- // ordering:
- let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
- if !panic.is_null() {
- let value = unsafe { Box::from_raw(panic) };
- unwind::resume_unwinding(*value);
- }
- }
-}
-
-impl<'scope> fmt::Debug for Scope<'scope> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scope")
- .field("pool_id", &self.base.registry.id())
- .field("panic", &self.base.panic)
- .field("job_completed_latch", &self.base.job_completed_latch)
- .finish()
- }
-}
-
-impl<'scope> fmt::Debug for ScopeFifo<'scope> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("ScopeFifo")
- .field("num_fifos", &self.fifos.len())
- .field("pool_id", &self.base.registry.id())
- .field("panic", &self.base.panic)
- .field("job_completed_latch", &self.base.job_completed_latch)
- .finish()
- }
-}
-
-/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
-///
-/// Unsafe code is still required to dereference the pointer, but that's fine in
-/// scope jobs that are guaranteed to execute before the scope ends.
-struct ScopePtr<T>(*const T);
-
-// SAFETY: !Send for raw pointers is not for safety, just as a lint
-unsafe impl<T: Sync> Send for ScopePtr<T> {}
-
-// SAFETY: !Sync for raw pointers is not for safety, just as a lint
-unsafe impl<T: Sync> Sync for ScopePtr<T> {}
-
-impl<T> ScopePtr<T> {
- // Helper to avoid disjoint captures of `scope_ptr.0`
- unsafe fn as_ref(&self) -> &T {
- &*self.0
- }
-}