aboutsummaryrefslogtreecommitdiff
path: root/vendor/rayon-core/src/join
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/join')
-rw-r--r--vendor/rayon-core/src/join/mod.rs188
-rw-r--r--vendor/rayon-core/src/join/test.rs151
2 files changed, 0 insertions, 339 deletions
diff --git a/vendor/rayon-core/src/join/mod.rs b/vendor/rayon-core/src/join/mod.rs
deleted file mode 100644
index 5ab9f6b..0000000
--- a/vendor/rayon-core/src/join/mod.rs
+++ /dev/null
@@ -1,188 +0,0 @@
-use crate::job::StackJob;
-use crate::latch::SpinLatch;
-use crate::registry::{self, WorkerThread};
-use crate::unwind;
-use std::any::Any;
-
-use crate::FnContext;
-
-#[cfg(test)]
-mod test;
-
-/// Takes two closures and *potentially* runs them in parallel. It
-/// returns a pair of the results from those closures.
-///
-/// Conceptually, calling `join()` is similar to spawning two threads,
-/// one executing each of the two closures. However, the
-/// implementation is quite different and incurs very low
-/// overhead. The underlying technique is called "work stealing": the
-/// Rayon runtime uses a fixed pool of worker threads and attempts to
-/// only execute code in parallel when there are idle CPUs to handle
-/// it.
-///
-/// When `join` is called from outside the thread pool, the calling
-/// thread will block while the closures execute in the pool. When
-/// `join` is called within the pool, the calling thread still actively
-/// participates in the thread pool. It will begin by executing closure
-/// A (on the current thread). While it is doing that, it will advertise
-/// closure B as being available for other threads to execute. Once closure A
-/// has completed, the current thread will try to execute closure B;
-/// if however closure B has been stolen, then it will look for other work
-/// while waiting for the thief to fully execute closure B. (This is the
-/// typical work-stealing strategy).
-///
-/// # Examples
-///
-/// This example uses join to perform a quick-sort (note this is not a
-/// particularly optimized implementation: if you **actually** want to
-/// sort for real, you should prefer [the `par_sort` method] offered
-/// by Rayon).
-///
-/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
-///
-/// ```rust
-/// # use rayon_core as rayon;
-/// let mut v = vec![5, 1, 8, 22, 0, 44];
-/// quick_sort(&mut v);
-/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
-///
-/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
-/// if v.len() > 1 {
-/// let mid = partition(v);
-/// let (lo, hi) = v.split_at_mut(mid);
-/// rayon::join(|| quick_sort(lo),
-/// || quick_sort(hi));
-/// }
-/// }
-///
-/// // Partition rearranges all items `<=` to the pivot
-/// // item (arbitrary selected to be the last item in the slice)
-/// // to the first half of the slice. It then returns the
-/// // "dividing point" where the pivot is placed.
-/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
-/// let pivot = v.len() - 1;
-/// let mut i = 0;
-/// for j in 0..pivot {
-/// if v[j] <= v[pivot] {
-/// v.swap(i, j);
-/// i += 1;
-/// }
-/// }
-/// v.swap(i, pivot);
-/// i
-/// }
-/// ```
-///
-/// # Warning about blocking I/O
-///
-/// The assumption is that the closures given to `join()` are
-/// CPU-bound tasks that do not perform I/O or other blocking
-/// operations. If you do perform I/O, and that I/O should block
-/// (e.g., waiting for a network request), the overall performance may
-/// be poor. Moreover, if you cause one closure to be blocked waiting
-/// on another (for example, using a channel), that could lead to a
-/// deadlock.
-///
-/// # Panics
-///
-/// No matter what happens, both closures will always be executed. If
-/// a single closure panics, whether it be the first or second
-/// closure, that panic will be propagated and hence `join()` will
-/// panic with the same panic value. If both closures panic, `join()`
-/// will panic with the panic value from the first closure.
-pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
-where
- A: FnOnce() -> RA + Send,
- B: FnOnce() -> RB + Send,
- RA: Send,
- RB: Send,
-{
- #[inline]
- fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
- move |_| f()
- }
-
- join_context(call(oper_a), call(oper_b))
-}
-
-/// Identical to `join`, except that the closures have a parameter
-/// that provides context for the way the closure has been called,
-/// especially indicating whether they're executing on a different
-/// thread than where `join_context` was called. This will occur if
-/// the second job is stolen by a different thread, or if
-/// `join_context` was called from outside the thread pool to begin
-/// with.
-pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
-where
- A: FnOnce(FnContext) -> RA + Send,
- B: FnOnce(FnContext) -> RB + Send,
- RA: Send,
- RB: Send,
-{
- #[inline]
- fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
- move || f(FnContext::new(injected))
- }
-
- #[inline]
- fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
- move |migrated| f(FnContext::new(migrated))
- }
-
- registry::in_worker(|worker_thread, injected| unsafe {
- // Create virtual wrapper for task b; this all has to be
- // done here so that the stack frame can keep it all live
- // long enough.
- let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
- let job_b_ref = job_b.as_job_ref();
- let job_b_id = job_b_ref.id();
- worker_thread.push(job_b_ref);
-
- // Execute task a; hopefully b gets stolen in the meantime.
- let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
- let result_a = match status_a {
- Ok(v) => v,
- Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
- };
-
- // Now that task A has finished, try to pop job B from the
- // local stack. It may already have been popped by job A; it
- // may also have been stolen. There may also be some tasks
- // pushed on top of it in the stack, and we will have to pop
- // those off to get to it.
- while !job_b.latch.probe() {
- if let Some(job) = worker_thread.take_local_job() {
- if job_b_id == job.id() {
- // Found it! Let's run it.
- //
- // Note that this could panic, but it's ok if we unwind here.
- let result_b = job_b.run_inline(injected);
- return (result_a, result_b);
- } else {
- worker_thread.execute(job);
- }
- } else {
- // Local deque is empty. Time to steal from other
- // threads.
- worker_thread.wait_until(&job_b.latch);
- debug_assert!(job_b.latch.probe());
- break;
- }
- }
-
- (result_a, job_b.into_result())
- })
-}
-
-/// If job A panics, we still cannot return until we are sure that job
-/// B is complete. This is because it may contain references into the
-/// enclosing stack frame(s).
-#[cold] // cold path
-unsafe fn join_recover_from_panic(
- worker_thread: &WorkerThread,
- job_b_latch: &SpinLatch<'_>,
- err: Box<dyn Any + Send>,
-) -> ! {
- worker_thread.wait_until(job_b_latch);
- unwind::resume_unwinding(err)
-}
diff --git a/vendor/rayon-core/src/join/test.rs b/vendor/rayon-core/src/join/test.rs
deleted file mode 100644
index b303dbc..0000000
--- a/vendor/rayon-core/src/join/test.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-//! Tests for the join code.
-
-use crate::join::*;
-use crate::unwind;
-use crate::ThreadPoolBuilder;
-use rand::distributions::Standard;
-use rand::{Rng, SeedableRng};
-use rand_xorshift::XorShiftRng;
-
-fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) {
- if v.len() <= 1 {
- return;
- }
-
- let mid = partition(v);
- let (lo, hi) = v.split_at_mut(mid);
- join(|| quick_sort(lo), || quick_sort(hi));
-}
-
-fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize {
- let pivot = v.len() - 1;
- let mut i = 0;
- for j in 0..pivot {
- if v[j] <= v[pivot] {
- v.swap(i, j);
- i += 1;
- }
- }
- v.swap(i, pivot);
- i
-}
-
-fn seeded_rng() -> XorShiftRng {
- let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
- (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
- XorShiftRng::from_seed(seed)
-}
-
-#[test]
-fn sort() {
- let rng = seeded_rng();
- let mut data: Vec<u32> = rng.sample_iter(&Standard).take(6 * 1024).collect();
- let mut sorted_data = data.clone();
- sorted_data.sort();
- quick_sort(&mut data);
- assert_eq!(data, sorted_data);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn sort_in_pool() {
- let rng = seeded_rng();
- let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
-
- let pool = ThreadPoolBuilder::new().build().unwrap();
- let mut sorted_data = data.clone();
- sorted_data.sort();
- pool.install(|| quick_sort(&mut data));
- assert_eq!(data, sorted_data);
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_a() {
- join(|| panic!("Hello, world!"), || ());
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_b() {
- join(|| (), || panic!("Hello, world!"));
-}
-
-#[test]
-#[should_panic(expected = "Hello, world!")]
-fn panic_propagate_both() {
- join(|| panic!("Hello, world!"), || panic!("Goodbye, world!"));
-}
-
-#[test]
-#[cfg_attr(not(panic = "unwind"), ignore)]
-fn panic_b_still_executes() {
- let mut x = false;
- match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
- Ok(_) => panic!("failed to propagate panic from closure A,"),
- Err(_) => assert!(x, "closure b failed to execute"),
- }
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_both() {
- // If we're not in a pool, both should be marked stolen as they're injected.
- let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
- assert!(a_migrated);
- assert!(b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_neither() {
- // If we're already in a 1-thread pool, neither job should be stolen.
- let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
- let (a_migrated, b_migrated) =
- pool.install(|| join_context(|a| a.migrated(), |b| b.migrated()));
- assert!(!a_migrated);
- assert!(!b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_context_second() {
- use std::sync::Barrier;
-
- // If we're already in a 2-thread pool, the second job should be stolen.
- let barrier = Barrier::new(2);
- let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
- let (a_migrated, b_migrated) = pool.install(|| {
- join_context(
- |a| {
- barrier.wait();
- a.migrated()
- },
- |b| {
- barrier.wait();
- b.migrated()
- },
- )
- });
- assert!(!a_migrated);
- assert!(b_migrated);
-}
-
-#[test]
-#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
-fn join_counter_overflow() {
- const MAX: u32 = 500_000;
-
- let mut i = 0;
- let mut j = 0;
- let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
-
- // Hammer on join a bunch of times -- used to hit overflow debug-assertions
- // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797
- for _ in 0..MAX {
- pool.join(|| i += 1, || j += 1);
- }
-
- assert_eq!(i, MAX);
- assert_eq!(j, MAX);
-}